RabbitMQ廣播:direct模式
阿新 • • 發佈:2018-11-08
一、
訊息的廣播需要exchange:exchange是一個轉發器,其實把訊息發給RabbitMQ裡的exchange
fanout: 所有bind到此exchange的queue都可以接收訊息,廣播
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
headers:通過headers來決定把訊息發給哪些queue,用的比較少
原理:訊息釋出端分別傳送INFO,WARNING,ERROR型別的訊息,C1 C2 C3訂閱了不同型別的訊息
訊息傳送端:
''' 釋出者publisher ''' import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') # 1、改成type='direct' # 2、預設傳送的訊息級別為info,可以帶引數,warning error等severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or " Hello World!" channel.basic_publish(exchange='direct_logs', routing_key=severity, # 3、把上面的訊息發到這個queue中 body=message) print("send :", message) connection.close()
訊息訂閱者:
''' 訂閱者subscriber ''' import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') # 4、改exchange的型別 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 5、啟動訂閱端的時候,severities存放訂閱端訂閱了哪些級別 # 然後用routing_key把這些級別繫結到queue上,這些queue就放這些級別的訊息 severities = sys.argv[1] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print("Wait for logs...") # 6、使用method.routing_key可以得到訊息的級別 def callback(ch, method, properties, body): print("received:", method.routing_key, body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
執行過程:
''' 啟動訂閱者1: python subscriber.py info 啟動訂閱者2:python subscriber.py info error 啟動釋出者1:python publisher.py info hello 啟動釋出者2:python publisher.py error servicesdown 訂閱者1收到訊息:hello 訂閱者2收到訊息:hello servicesdown '''