RabbitMQ 파이썬 튜토리얼 04 - Routing


튜토리얼 리스트

전제조건

이 튜토리얼에서는 RabbitMQ가 표준 포트(5672)의 localhost에 설치되어 실행되고 있다고 가정합니다. 다른 호스트, 포트 또는 자격 증명을 사용하는 경우 연결 설정을 변경해야 합니다.

이 포스트에서는

이전 포스트 - Pub/Sub에서는 간단한 로깅 시스템을 만들었습니다. 우리는 많은 수신자에게 로그 메시지를 브로드 캐스트 할 수 있었습니다. 이 튜토리얼에서 우리는 그것에 특징을 추가 할 것입니다. 메시지의 서브셋에 대해서만 구독 할 수 있도록 할 것입니다. 예를 들어, 콘솔에서 모든 로그 메시지를 계속 출력 할 수 있는 동안 콘솔 공간을 절약하기 위해 중요한 오류 메시지만 로그 파일로 보낼 수 있도록 구성할 수 있습니다.

바인딩

이전 예제에서 우리는 이미 바인딩을 생성하고 있었습니다. 다음과 같은 코드를 생각해보십시오 :

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

바인딩은 exchange 과 큐 간의 관계입니다. 이것은 단순히 다음과 같이 여길 수 있습니다. 큐는 이 exchange의 메시지에 관심이 있습니다.

바인딩은 추가 routing_key 매개 변수를 사용할 수 있습니다. basic_publish 매개 변수와의 혼동을 피하기 위해 바인딩 키라고 부릅니다. 다음은 바인딩 키를 사용하여 바인딩을 생성하는 방법입니다.

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

바인딩 키의 의미는 exchange 유형에 따라 다릅니다. 이전에 사용했던 fanout exchange은 routing_key 값을 무시했습니다.

Direct exchange

이전 튜토리얼의 로깅 시스템은 모든 메시지를 모든 사용자에게 브로드캐스트합니다. 심각도에 따라 메시지를 필터링 할 수 있도록 이를 확장하려고 합니다. 예를 들어, 디스크에 로그 메시지를 쓰는 스크립트가 심각한 오류만 수신하고 ‘warning’ 또는 ‘info’ 로그 메시지에 디스크 공간을 낭비하지 않도록 할 수 있습니다. 우리는 fanout exchange을 사용했는데, 이는 우리에게 너무 많은 유연성을 주지는 못합니다. 우리는 direct exchange을 대신 사용할 것입니다. direct exchange의 배후에 있는 라우팅 알고리즘은 간단합니다. 메시지는 바인딩 키가 메시지의 라우팅 키와 정확히 일치하는 큐로 이동합니다.

이를 설명하기 위해 다음 설정을 고려하십시오.

직접 교환기

이 설정에서는 두 개의 큐가 바인딩 된 direct exchange X를 볼 수 있습니다. 첫 번째 큐은 ‘orange’ 바인딩 키로 바인딩되고 두 번째 큐은 바인딩 키가 ‘black’이고 다른 하나는 ‘green’ 인 두 개의 바인딩이 있습니다. 이러한 설정에서 라우팅 키가 ‘orange’ 인 exchange에 발행된 메시지는 큐 Q1으로 라우팅 됩니다. ‘black’ 또는 ‘green’의 라우팅 키가 있는 메시지는 Q2로 이동합니다. 이외의 라우트 키를 가진 모든 메시지는 무시됩니다.

Multiple bindings

동일한 바인딩 키를 사용하여 여러 큐를 바인드하는 것이 가능합니다. 위 예에서는 X와 Q1 사이에 바인딩 키를 ‘black’으로 바인딩을 추가할 수 있습니다. 이 경우 direct exchange는 fanout 처럼 작동하고 모든 일치하는 큐에 메시지를 브로드캐스트합니다. 라우팅 키가 ‘black’ 메시지는 Q1과 Q2 모두에 전달됩니다.

로그 보내기 - Emitting logs

우리는 로깅 시스템에 이 모델을 사용할 것입니다. fanout 대신 우리는 direct으로 메시지를 보냅니다. 로그 심각도를 라우팅 키로 제공합니다. 그러면 수신 스크립트가 수신하려는 심각도를 선택할 수 있습니다. 먼저 로그를 내보내는 것에 집중하겠습니다.

항상 그렇듯이 exchange를 먼저 만들어야합니다.

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

이제 메시지를 보낼 준비가되었습니다.

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

상황을 단순화하기 위해 ‘심각도’는 ‘info’, ‘warning’, ‘error’ 중 하나 일 수 있다고 가정합니다.

구독 - Subscribing

메시지 수신은 이전 튜토리얼에서와 마찬가지로 작동합니다. 한 가지 예외는 관심 대상인 각 심각도에 대해 새로운 바인딩을 생성하는 것입니다.

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

최종코드

emit_log_direct.py의 코드는 다음과 같습니다.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py의 코드는 다음과 같습니다.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

‘warning’과 ‘error’(그리고 ‘info’가 아닌) 로그 메시지 만 파일에 저장하려면 콘솔을 열고 다음을 입력하십시오.

python receive_logs_direct.py warning error > logs_from_rabbit.log

화면에 모든 로그 메시지를 표시하려면 새 터미널을 열고 다음을 수행하십시오.

python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C

예를 들어 오류 로그 메시지를 출력하려면 다음을 입력하십시오.

python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

튜토리얼 리스트