RabbitMQ 파이썬 튜토리얼 03 - Pub/Sub


튜토리얼 리스트

전제조건

이 튜토리얼에서는 RabbitMQ가 표준 포트 (5672)의 localhost에 설치되어 실행되고 있다고 가정합니다. 다른 호스트, 포트 또는 자격 증명을 사용하는 경우 연결 설정을 조정해야 합니다

우리는 Pika RabbitMQ client version 0.11.0 을 사용합니다.

이 포스트에서는

이전 튜토리얼에서는 작업 큐를 만들었습니다. 작업 큐 뒤에 있는 가정은 각 작업이 정확하게 하나의 작업자에게 전달 된다는 것입니다. 이 부분에서는 완전히 다른 것을 할 것입니다 - 우리는 여러 소비자(컨슈머)에게 메시지를 전달할 것입니다. 이 패턴을 “pub/sub” 이라고합니다. 패턴을 설명하기 위해 간단한 로깅 시스템을 구축 할 것입니다. 이 프로그램은 두 가지 프로그램으로 구성됩니다. 첫 번째 프로그램은 로그 메시지를 내보내고 두 번째 프로그램은 로그 메시지를 수신하고 출력합니다. 로깅 시스템에서는 실행 중인 모든 수신 프로그램에 메시지가 표시됩니다. 그러면 우리는 하나의 컨슈머를 실행하고 로그를 디스크로 보낼 수 있습니다. 동시에 다른 컨슈머를 실행하고 화면의 로그를 볼 수 있습니다. 기본적으로 발행된 로그 메시지는 모든 컨슈머에게 브로드캐스팅됩니다.

Exchanges

튜토리얼의 이전 부분에서는 큐에서 주고받은 메시지를 수신했습니다. 이제 RabbitMQ에서 전체 메시징 모델을 소개 할 차례입니다.

이전 튜토리얼에서 다룬 내용을 빠르게 살펴 보겠습니다.

  • 프로듀서(생성자)는 메시지를 보내는 사용자 응용 프로그램입니다.
  • 큐는 메시지를 저장하는 버퍼입니다.
  • 컨슈머(소비자)는 메시지를받는 사용자 응용 프로그램입니다.

RabbitMQ의 메시징 모델에서 핵심 아이디어는 프로듀서가 어떤 메시지도 큐로 직접 보내지 않는다는 것입니다. 사실, 프로듀서는 메시지가 어떤 큐에 전달되는지 전혀 모릅니다.

대신 프로듀서는 메시지를 exchange에만 보낼 수 있습니다. exchange는 아주 간단한 일입니다. 한 쪽에서는 프로듀서로부터 메시지를 받고 다른 쪽에서는 메시지를 큐로 보냅니다. exchange는 받은 메시지를 어떻게 처리해야 하는지 정확히 알아야 합니다.

  • 특정 큐에 추가해야 하나?
  • 여러 큐에 추가해야 하나?
  • 폐기해야 하나?

그 규칙은 exchange 유형에 의해 정의됩니다.

익스체인지

사용할 수 있는 exchange 유형은 direct, topic, headers 그리고 fanout 입니다. 마지막 부분인 fanout에 집중하겠습니다. 그 유형의 exchange를 만들고 ‘logs’라고 부릅시다.

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

fanout exchange은 매우 간단합니다. 이름에서 추측 할 수 있듯이, 수신 한 모든 메시지를 알고 있는 모든 큐에 브로드캐스트합니다. 그리고 그것은 우리가 로거에 필요한 기능입니다.

exchanges 리스트 확인하기

서버에서 exchange를 나열하기 위해 이제까지 유용한 rabbitmqctl을 실행할 수 있습니다 :

sudo rabbitmqctl list_exchanges

이 목록에는 amq. * exchange와 default (unnamed) exchange가 있습니다. 이것들은 기본적으로 만들어지지만, 지금은 사용할 필요가 없을 것입니다.

기본 exchange

튜토리얼의 이전 부분에서는 exchange에 대해서는 알지 못했지만 여전히 큐에 메시지를 보낼 수 있었습니다. 우리가 빈 문자열 (““)로 식별하는 기본 exchange를 사용했기 때문에 가능했습니다. 이전에 메시지를 발행한 방법을 상기 해보십시오.

channel.basic_publish(exchange='',
                     routing_key='hello',
                     body=message)

exchange 매개 변수는 exchange의 이름입니다. 빈 문자열은 기본 또는 이름이 없는 exchange를 나타냅니다. 메시지는 routing_key에 지정된 이름으로 큐가 존재하는 경우에 라우팅됩니다. 이제 우리는 명명된 exchange로 발행할 수 있습니다.

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

임시 큐

이전에 기억할 수 있듯이 지정된 이름을 가진 hellotask_queue 큐를 사용했습니다 . 큐의 이름을 지정할 수 있다는 것이 우리에게는 매우 중요했습니다. 직원들을 동일한 큐로 안내해야 했습니다. 프로듀서와 컨슈머간에 큐를 공유하려면 큐에 이름을 지정하는 것이 중요합니다.

그러나 그것은 우리의 로거의 경우가 아닙니다. 우리는 모든 로그 메시지를 듣고 싶습니다. 우리는 또한 오래된 메시지가 아닌 현재 흐르는 메시지에만 관심이 있습니다. 이를 해결하기 위해서는 두 가지가 필요합니다.

첫째, 우리가 Rabbit에 연결할 때마다 우리는 새롭고 빈 큐가 필요합니다. 이를 위해 우리는 임의의 이름으로 큐를 생성 할 수 있습니다. 서버가 임의의 큐 이름을 선택하도록 할 수도 있습니다. queue 매개 변수를 queue_declare에 제공하지 않으면이 작업을 수행 할 수 있습니다.

 result = channel.queue_declare()

이 시점에서 result.method.queue는 임의 큐 이름을 포함합니다. 예를 들어 amq.gen-randomstring와 같이 보일 수 있습니다.

둘째, 컨슈머 연결이 닫히면 큐를 삭제해야합니다. 거기에 대한 exclusive flag 이 있습니다 :

result = channel.queue_declare(exclusive=True)

큐에 있는 가이드에서 배타 플래그 및 기타 큐 속성에 대해 자세히 배울 수 있습니다.

Bindings

익스체인지

이미 fanout exchange 및 큐를 만들었습니다. 이제 exchange에 큐에 메시지를 보내라고 알릴 필요가 있습니다. exchange과 큐 간의 관계를 바인딩이라고합니다.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

이제부터 ‘logs’ exchange를 통해 큐에 메시지가 추가됩니다.

bindings 리스트 확인하기

짐작할 수 있겠지만 기존 바인딩을 나열 할 수 있습니다.

rabbitmqctl list_bindings

최종 코드

익스체인지

로그 메시지를 발행 하는 프로듀서 프로그램은 이전 튜토리얼과 크게 다르지 않습니다. 가장 중요한 변화는 이름없는 exchagne 대신 logs exchange에 메시지를 발행하는 것입니다. 전송할 때 routing_key를 제공해야 하지만, fanout exchange의 경우이 값은 무시됩니다. emit_log.py 스크립트의 코드는 다음과 같습니다.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

보시다시피, 연결을 설정 한 후 exchange를 선언했습니다. 이 단계는 존재하지 않는 exchange에 대한 발행이 금지되어 있으므로 필요합니다. exchange에 큐이 아직 없다면 메시지는 사라지겠지만, 우리에게는 괜찮습니다. 컨슈머가 듣지 않는 경우에도 안전하게 메시지를 삭제할 수 있습니다.

receive_logs.py의 코드는 다음과 같습니다.

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

다 끝났습니다. 로그를 파일에 저장하려면 콘솔을 열고 다음을 입력하십시오.

python receive_logs.py > logs_from_rabbit.log

화면에 로그를 표시하려면 새 터미널을 생성하고 다음을 실행하십시오.

python receive_logs.py

물론 로그 유형을 내보내는 방법은 다음과 같습니다.

python emit_log.py

rabbitmqctl list_bindings를 사용하면 원하는 대로 코드가 실제로 바인딩과 큐를 생성하는지 확인할 수 있습니다. 두 개의 receive_logs.py 프로그램을 실행하면 다음과 같은 내용이 표시됩니다.

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

logs exchange의 데이터는 할당 이름이 있는 두 개의 큐로 이동합니다. 그게 바로 우리가 의도 한 것입니다.

튜토리얼 리스트