RabbitMQ使用介紹(python)
在我們的專案開發過程中,我們有時會有時候有兩個或者多個程式互動的情況,當然就會使用到這裡的訊息佇列來實現。現在比較火的就是RabbitMQ,還有一些ZeroMQ ,ActiveMQ 等等,著名的openstack預設用的RabbitMQ來實現的。
python中我們使用pika模組來操作訊息佇列,當然Celery也是python中比較火的做分散式訊息佇列的模組。
1,RabbitMQ的安裝
參考連結https://www.cnblogs.com/zzqit/p/10158923.html
2,最簡單的發收事例
傳送端(producer)
1 import pika 2View Code3 # 建立一個例項 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters('localhost') # 預設埠5672,可不寫 6 ) 7 8 # 宣告一個管道,在管道里發訊息 9 channel = connection.channel() 10 11 # 在管道里宣告queue名字為test 12 channel.queue_declare(queue='test') 13 14 # 指明發送佇列的名字跟內容 15 channel.basic_publish(exchange='',16 routing_key='test', # queue名字 17 body='Hello World!' # 訊息內容 18 ) 19 20 print(" [x] Sent 'Hello World!'") 21 22 connection.close() # 佇列關閉
消費端(consumer)
1 import pika 2 3 # 建立例項 4 connection = pika.BlockingConnection(View Code5 pika.ConnectionParameters('localhost') 6 ) 7 8 # 宣告管道 9 channel = connection.channel() 10 11 # 這裡宣告queue的名字防止消費端先跑,服務端還沒開啟報錯 12 channel.queue_declare(queue='test') 13 14 # 消費端的回撥函式 15 def callback(ch, method, properties, body): 16 print(" [x] Received %r" % body) 17 ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,訊息處理完成 18 19 channel.basic_consume( # 消費訊息 20 callback, # 如果收到訊息,就呼叫callback函式來處理訊息 21 queue='hello', # 你要從那個佇列裡收訊息 22 # no_ack=True # 為True不管消費者消費的時候是否處理完成,這條訊息在佇列中就沒有了 23 ) 24 25 print(' [*] Waiting for messages. To exit press CTRL+C') 26 27 channel.start_consuming() # 開始消費訊息
當然這裡的連線方式是針對在本地連線,如果需要連線遠端就採用下面的方式進行連線
credentials = pika.PlainCredentials('test', '123456') # rabbitmq登入賬號 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.12',5672,'/',credentials)) # 前面為ip地址,5672為埠號
3,訊息持久化及公平分發
我們可能會遇到生產者伺服器意外宕機了,這樣我們生成的訊息佇列跟裡面的訊息就會全部沒有,當然這樣肯定是不行的,所以我們可以通過下面的設定訊息持久化跟佇列持久化來達到最終的目的。
上面的問題解決了以後,在我們實際的生產過程中,每個消費者伺服器的效能是不均等的,所以我們需要根據不同伺服器的效能做負載均衡,實現公平分發。當然下面已經解決了提到的這兩個問題。
傳送端(producer)
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.queue_declare(queue='test', durable=True) # 佇列持久化 11 12 message = ' '.join(sys.argv[1:]) or "Hello World!" # sys.argv獲取輸入命令後面的引數 13 14 channel.basic_publish(exchange='', 15 routing_key='test', # 佇列名稱 16 body=message, # 訊息體 17 properties=pika.BasicProperties( 18 delivery_mode=2, # 訊息持久化 19 )) 20 21 print(" [x] Sent %r" % message) 22 23 connection.close()View Code
消費端(consumer)
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(host='localhost') 5 ) 6 7 channel = connection.channel() 8 9 channel.queue_declare(queue='test', durable=True) # 消費端也要寫durable=True 10 11 print(' [*] Waiting for messages. To exit press CTRL+C') 12 13 14 def callback(ch, method, properties, body): 15 print(" [x] Received %r" % body) 16 ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生產者處理完成 17 18 channel.basic_qos(prefetch_count=1) # 做公平分發,如果有一個訊息就不給你發了 19 20 channel.basic_consume(callback, 21 queue='test') 22 23 channel.start_consuming()View Code
4,廣播模式
前面的效果都是生產者生產了訊息以後把訊息放入佇列中然後等消費者來消費,當然也可以做成廣播的形式,傳送端發的同時,繫結的消費端就實時接受到資料,當然如果發訊息的時候沒繫結就不會把這個訊息存下來。下面會簡單介紹三種模式:fanout,direct,topic
(1)訊息釋出訂閱(fanout)
傳送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='logs', # 宣告廣播管道 11 type='fanout') # 型別為fanout只要是綁定了exchange為logs的都可以接受 12 13 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 14 15 channel.basic_publish(exchange='logs', # 傳送的廣播管道 16 routing_key='', # 需要等消費者接入,為空,必須有 17 body=message) 18 19 print(" [x] Sent %r" % message) 20 21 connection.close()View Code
接收端
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(host='localhost') 5 ) 6 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='logs', # 宣告和生產者一樣 10 type='fanout') 11 12 res = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 13 queue_name = res.method.queue # 分配的queue名字 14 15 channel.queue_bind(exchange='logs', 16 queue=queue_name) 17 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) # 廣播是實時的,訊息不儲存 25 26 channel.start_consuming()View Code
(2)有選擇的接收(direct)
前面的fanout消費者是預設接收所有的資料,但是有時候需要篩選一下,只接收自己想要的資料,就需要用到這裡的direct了。
傳送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='direct_logs', # 跟前面的fanout差不多 11 type='direct') 12 13 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' # 傳送訊息級別,預設為info 14 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' # 傳送訊息內容 16 17 channel.basic_publish(exchange='direct_logs', 18 routing_key=severity, # 傳送訊息的級別 19 body=message) 20 21 print(" [x] Sent %r:%r" % (severity, message)) 22 23 connection.close()View Code
接收端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct') 12 13 result = channel.queue_declare(exclusive=True) # #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 14 queue_name = result.method.queue 15 16 severities = sys.argv[1:] # 獲取執行指令碼的引數 17 if not severities: 18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for severity in severities: # 迴圈繫結所有想接收的級別 22 channel.queue_bind(exchange='direct_logs', 23 queue=queue_name, 24 routing_key=severity) 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()View Code
執行程式碼示例
python producer.py info hello # 執行傳送端,級別為info,內容為hello python consumer.py info error # 接收端,接收級別為info跟error python consumer.py info warning # 接收端,接收級別為info跟warning python consumer.py error warning # 接收端,接收級別為error跟warning # 最終收到訊息的有1,2兩個,第三個收不到
(3)更加細緻的過濾(topic)
傳送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') 12 13 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 14 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' 16 17 channel.basic_publish(exchange='topic_logs', 18 routing_key=routing_key, 19 body=message) 20 21 print(" [x] Sent %r:%r" % (routing_key, message)) 22 23 connection.close()View Code
接收端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') 12 13 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 14 queue_name = result.method.queue 15 16 binding_keys = sys.argv[1:] 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys: 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming()View Code
在使用topic的情況下,接收端可以通過下面幾種寫法篩選自己想要接收的資料
python consumer.py *.info # 所有info結尾的 python consumer.py *.info nginx.* # 所有info結尾的和nginx開始的 python consumer.py '#' # 接收所有