RabbitMQ廣播:fanout模式
阿新 • • 發佈:2018-11-08
一、
訊息的廣播需要exchange:exchange是一個轉發器,其實把訊息發給RabbitMQ裡的exchange
fanout: 所有bind到此exchange的queue都可以接收訊息,廣播
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
headers:通過headers來決定把訊息發給哪些queue,用的比較少
原理圖:
釋出者端:
''' 釋出者publisher ''' importpika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', # exchange名字為logs type='fanout') # 通過命令列自己輸入訊息,沒輸入就是hello world message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 廣播不需要寫queue,routing_key為空 channel.basic_publish(exchange='logs', routing_key='', body=message) print("send :", message) connection.close()
訂閱者端:
''' 訂閱者subscriber ''' import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') # 不指定queue名字,rabbit會隨機分配一個唯一的queue, # exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 # 傳送端沒有宣告queue,為什麼接收端需要queue?看上面原理圖就明白 result = channel.queue_declare(exclusive=True) # 拿到的隨機的queue名字 queue_name = result.method.queue # 需要知道從哪個轉發器上去收所以需要繫結 channel.queue_bind(exchange='logs', queue=queue_name) print("Wait for logs...") def callback(ch, method, properties, body): print("received:", body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
執行結果:
''' 先啟動釋出者,再啟動訂閱者,為什麼訂閱者收不到資訊? 原理類似於收音機收聽廣播: 訂閱者相當於收音機,釋出者相當於廣播訊號 所以這個接收是實時的,訂閱者啟動之後,才能收到釋出者發出的廣播 '''