python3 RabbitMQ ( Routing !)
預備知識
What This Tutorial Focuses On
在前面的教程中,我們構建了一個簡單的日誌系統。我們能夠向許多接收器廣播日誌訊息。
在本教程中,我們將為它新增一個特性——我們將使訂閱訊息的一個子整合為可能。例如,我們將能夠僅將關鍵錯誤訊息直接指向日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯列印所有日誌訊息。
繫結
在前面的例子中,我們已經建立了繫結。您可能會回憶程式碼如下:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
繫結是交換器和佇列之間的關係。這可以簡單地理解為:佇列對來自此交換器的訊息感興趣。
繫結可以使用額外的routing_key引數。為了避免與basic_publish引數混淆,我們將它稱為繫結鍵。這就是我們如何建立一個鍵繫結:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
繫結鍵的含義取決於交換型別。我們以前使用過的fanout交換交換機,只是忽略了它的價值。
Direct exchange
我們的日誌系統從以前的教程廣播所有訊息到所有消費者。我們希望擴充套件此功能,以允許根據訊息的嚴重程度對其進行過濾。例如,我們可能希望將日誌訊息寫入磁碟的指令碼只接收關鍵錯誤,而不是在警告或資訊日誌訊息上浪費磁碟空間。
我們使用的是fanout交換器,這並沒有給我們太多的靈活性——它只能夠進行盲目的廣播。
我們將使用直接交換。直接交換背後的路由演算法很簡單——訊息轉到佇列,其繫結鍵與訊息的路由鍵完全匹配。
為了說明這一點,考慮以下設定:
在這個設定中,我們可以看到直接exchange X,它綁定了兩個佇列。第一個佇列用繫結鍵橙色繫結,第二個佇列有兩個繫結,一個用繫結鍵黑色繫結,另一個用綠色繫結。
在這樣的設定中,釋出到交換器的帶有路由關鍵字橙色的訊息將被路由到佇列Q1。帶有黑色或綠色路由鍵的訊息將轉到Q2。所有其他訊息將被丟棄。
多個繫結
使用相同的繫結鍵繫結多個佇列是完全合法的。在我們的示例中,我們可以使用繫結鍵black在X和Q1之間新增繫結。在這種情況下,直接交換將表現得像扇出,並將訊息廣播到所有匹配的佇列。帶有路由金鑰黑色的訊息將同時傳送到Q1和Q2。
傳送日誌
我們將在日誌系統中使用這個模型。我們將傳送訊息到一個直接交換器,而不是fanout。我們將提供日誌嚴重性作為路由鍵。這樣,接收指令碼就能夠選擇它希望接收的嚴重性。讓我們首先關注發出日誌。
像往常一樣,我們需要首先建立一個交換:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
我們準備傳送一個訊息:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
為了簡化問題,我們假設“嚴重性”可以是“資訊”、“警告”、“錯誤”。
訂閱
接收訊息的工作方式將與上一教程一樣,只有一個例外——我們將為感興趣的每個嚴重性建立一個新的繫結。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
將程式碼放在一起來看下
The code for emit_log_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
The code for receive_logs_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()