1. 程式人生 > >RabbitMQ廣播:fanout模式

RabbitMQ廣播:fanout模式

一、

訊息的廣播需要exchange:exchange是一個轉發器,其實把訊息發給RabbitMQ裡的exchange

fanout: 所有bind到此exchange的queue都可以接收訊息,廣播

direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息

topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息

headers:通過headers來決定把訊息發給哪些queue,用的比較少

原理圖:

釋出者端:

'''
釋出者publisher
'''
import
pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', # exchange名字為logs type='fanout') # 通過命令列自己輸入訊息,沒輸入就是hello world message = ' '.join(sys.argv[1:]) or "info: Hello World!
" # 廣播不需要寫queue,routing_key為空 channel.basic_publish(exchange='logs', routing_key='', body=message) print("send :", message) connection.close()

訂閱者端:

'''
訂閱者subscriber
'''
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost
')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') # 不指定queue名字,rabbit會隨機分配一個唯一的queue, # exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 # 傳送端沒有宣告queue,為什麼接收端需要queue?看上面原理圖就明白 result = channel.queue_declare(exclusive=True) # 拿到的隨機的queue名字 queue_name = result.method.queue # 需要知道從哪個轉發器上去收所以需要繫結 channel.queue_bind(exchange='logs', queue=queue_name) print("Wait for logs...") def callback(ch, method, properties, body): print("received:", body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

 執行結果:

'''
先啟動釋出者,再啟動訂閱者,為什麼訂閱者收不到資訊?
原理類似於收音機收聽廣播:
訂閱者相當於收音機,釋出者相當於廣播訊號
所以這個接收是實時的,訂閱者啟動之後,才能收到釋出者發出的廣播
'''