RabbitMQ入門(三)訂閱模式
在之前的文章RabbitMQ入門(二)工作佇列中,我們建立了一個工作佇列。工作佇列背後的假設是每一項任務都被準確地傳送至一個worker。在本文中,我們將會做一些不同的事情——我們將會把一個訊息傳送至許多消費者中。這種模式被稱為訂閱模式(publish/subscribe)
。
為了解釋這種模式,我們將會構建一個簡單的日誌系統。它包含兩個程式——第一個將會產生訊息,第二個將會接收並輸出這些訊息。
在我們的日誌系統中,每一個正在執行的接收程式都會收到訊息。在這種方式下,我們可以執行一個接收程式來接收並將日誌儲存至硬碟;同時,我們還能執行另一個接收程式,在螢幕上觀察到日誌的輸出。
特別地,傳送的這些訊息都會被廣播到所有的接收程式。
交換(Exchanges)
在之前的文章中,我們向佇列傳送訊息,從佇列中接受訊息。現在是時候介紹RabbitMQ中的全部訊息轉發模式。
讓我們快速地瀏覽下之前文章中講了些什麼:
- 一個生產者(Producer)是用於產生訊息的使用者應用程式;
- 一個佇列(Queue)是快取區,用於儲存訊息;
- 一個消費者(Consumer)是用於接收訊息的使用者應用程式。
RabbitMQ中訊息傳輸模式的核心思想是生產者絕不會直接向佇列傳送任何訊息。實際上,通常情況下生產者甚至都不會知道訊息是否會被髮送至佇列。
生產者會將訊息傳送至交換(exchange)
。交換
並不複雜。一方面它從生產者中接受訊息,另一方面將訊息推送至佇列。交換
交換型別(exchange type)
來定義。
有一些可用的交換型別
:直接分發(direct)
,通配分發(topic)
,headers
和複製分發(fanout)
。我們將會集中講最後一個——fanout。我們建立一個交換
,型別為fanout,並取名為logs:
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
fanout交換非常簡單。顧名思義,它會將所有它知道的接收佇列的訊息都廣播出去。而這也正是我們的日誌系統所需要的。
現在,我們可以釋出已經命名好的隊列了:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
臨時佇列
你也許還記得在之前的文章中,我們需要給佇列取名。但是呢,給佇列命名太麻煩了——我們需要將workers指定到同一個佇列。當你需要在生產者和消費者之間共享佇列的時候,給佇列命名又是很重要的。
這種情形並不適合我們的日誌系統。我們想要監聽所有的訊息,而不是部分訊息。同時,我們僅對當前的流動訊息感興趣,而不是之前的訊息。為了解決這個問題,我們需要做兩件事情。
首先,無論何時我們連線到RabbitMQ,我們需要一個新的空佇列。為此,我們建立一個隨機命名的佇列,或者更好的是,讓RabbitMQ Server來給我們建立一個隨機命名的佇列。因此,我們可以利用queue_declare
命令,設定queuq
引數為空:
result = channel.queue_declare(queue='')
此時,result.method.queue
會包含一個隨機命名的佇列,比如說,它會和amq.gen-JzTY20BRgKO-HjmUJj0wLg
類似。
其次,一旦訊息者的連線關閉,我們需要刪除佇列。這可以用exclusive
引數搞定:
result = channel.queue_declare(queue='', exclusive=True)
繫結(Bindings)
我們已經建立了一個fanout 交換和佇列。現在我們需要告訴交換,將訊息傳送至佇列。交換與佇列之間的關係叫做繫結(Bindings)
。
channel.queue_bind(exchange='logs',
queue=result.method.queue)
從現在開始,logs
交換將會在我們的佇列後追加訊息。
程式碼
生產者程式碼(emit_log.py):
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
消費者程式碼(receive_log.py):
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
開啟四個終端,其中一個用於儲存日誌:
python3 receive_log.py > logs_from_rabbit.log
另一個用於觀察日誌輸出:
python3 receive_log.py
日誌產生:
python3 emit_log.py
監聽繫結:
sudo rabbitmqctl list_bindings
執行截圖如下:
本次分享到此結束,感謝大家閱