1. 程式人生 > >python3 RabbitMQ ( Routing !)

python3 RabbitMQ ( Routing !)

預備知識

What This Tutorial Focuses On

在前面的教程中,我們構建了一個簡單的日誌系統。我們能夠向許多接收器廣播日誌訊息。

在本教程中,我們將為它新增一個特性——我們將使訂閱訊息的一個子整合為可能。例如,我們將能夠僅將關鍵錯誤訊息直接指向日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯列印所有日誌訊息。

繫結

在前面的例子中,我們已經建立了繫結。您可能會回憶程式碼如下:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

繫結是交換器和佇列之間的關係。這可以簡單地理解為:佇列對來自此交換器的訊息感興趣。

繫結可以使用額外的routing_key引數。為了避免與basic_publish引數混淆,我們將它稱為繫結鍵。這就是我們如何建立一個鍵繫結:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

繫結鍵的含義取決於交換型別。我們以前使用過的fanout交換交換機,只是忽略了它的價值。

Direct exchange

我們的日誌系統從以前的教程廣播所有訊息到所有消費者。我們希望擴充套件此功能,以允許根據訊息的嚴重程度對其進行過濾。例如,我們可能希望將日誌訊息寫入磁碟的指令碼只接收關鍵錯誤,而不是在警告或資訊日誌訊息上浪費磁碟空間。

我們使用的是fanout交換器,這並沒有給我們太多的靈活性——它只能夠進行盲目的廣播。

我們將使用直接交換。直接交換背後的路由演算法很簡單——訊息轉到佇列,其繫結鍵與訊息的路由鍵完全匹配。

為了說明這一點,考慮以下設定: 在這裡插入圖片描述

在這個設定中,我們可以看到直接exchange X,它綁定了兩個佇列。第一個佇列用繫結鍵橙色繫結,第二個佇列有兩個繫結,一個用繫結鍵黑色繫結,另一個用綠色繫結。

在這樣的設定中,釋出到交換器的帶有路由關鍵字橙色的訊息將被路由到佇列Q1。帶有黑色或綠色路由鍵的訊息將轉到Q2。所有其他訊息將被丟棄。

多個繫結

在這裡插入圖片描述

使用相同的繫結鍵繫結多個佇列是完全合法的。在我們的示例中,我們可以使用繫結鍵black在X和Q1之間新增繫結。在這種情況下,直接交換將表現得像扇出,並將訊息廣播到所有匹配的佇列。帶有路由金鑰黑色的訊息將同時傳送到Q1和Q2。

傳送日誌

我們將在日誌系統中使用這個模型。我們將傳送訊息到一個直接交換器,而不是fanout。我們將提供日誌嚴重性作為路由鍵。這樣,接收指令碼就能夠選擇它希望接收的嚴重性。讓我們首先關注發出日誌。

像往常一樣,我們需要首先建立一個交換:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

我們準備傳送一個訊息:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

為了簡化問題,我們假設“嚴重性”可以是“資訊”、“警告”、“錯誤”。

訂閱

接收訊息的工作方式將與上一教程一樣,只有一個例外——我們將為感興趣的每個嚴重性建立一個新的繫結。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

將程式碼放在一起來看下

在這裡插入圖片描述 The code for emit_log_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

The code for receive_logs_direct.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()