Python-RabbitMQ消息分發機制
阿新 • • 發佈:2018-01-10
連接 處的 code top exclusive exc 但是 現在 pika
上一篇中的例子是一個生產者對應一個消費者,那能不能一個生產者對應一個消費者呢? 下面來測試一下,順便觀察一下它的分發策略。。。
步驟一:先編輯生產者代碼(rabbit_send.py)
#top1:導入pika模塊 import os BASE_DIR = os.path.dirname(os.path.abspath(__file__)) import pika #top2:建立socket connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #top3:聲明管道 channel = connection.channel() #top4:在管道中聲明Queue,Queue的名字是‘exclusive‘(隨意) channel.queue_declare(queue=‘exclusive‘) #top5:在管道內發送消息 channel.basic_publish(exchange=‘‘, routing_key=‘exclusive‘, #queue名稱 body=‘Let s go!‘) #消息內容 #top6:關閉隊列 connection.close()
步驟二:編輯消費者代碼(rabbit_receive.py)
#top1:導入pika模塊 import os BASE_DIR = os.path.dirname(os.path.abspath(__file__)) import pika #top2:建立scoket connection = pika.BlockingConnection(pika.ConnectionParameters( ‘localhost‘)) #top3:聲明管道 channel = connection.channel() #top4:聲明Queue channel.queue_declare(queue=‘exclusive‘) #top5:定義一個處理消息的函數(所說的回調函數) def callback(ch, method, properties, body): print(" [x] Received %r" % body) #top6:接收消息 channel.basic_consume(#消費消息 callback, #如果收到消息,就調用callback函數來處理消息 queue=‘exclusive‘, no_ack=True) #top7:此處的start只要一起動就一直運行了,因為它不止收一條 channel.start_consuming()
定義好生產者和消費者後,執行一個生產者多個消費者進行測試。測試結果是消息的接收機制是輪詢的,生產者每發送一次消息,都由消費者輪流來接收。 接下來考慮一個情況,現在的代碼是消費者接收到消息後調用callback函數去處理消息立刻打印,但是如果我的處理過程需要30秒的時間,恰好在這30秒的時間內消費者宕機了,這個消息還沒有處理完,比如我有一個轉賬的業務,那轉到一半宕機了,那咋整?應該有一個確認機制來確定到底是不是處理完了,消費者應該發送一個確認給生產者,然後生產者才把消息從消息隊列裏刪除;還是糾結。。。。那消費者處理到一半宕機了,還怎麽給生產者發確認。。。。 還用剛才的代碼來測試,把在消費者處理消息的函數中加入一個time.sleep(30),再print一句話來模擬處理時間,再執行生產者和多個消費者,假如第一個消費者接收到消息我們把它停止,再觀察別的消費者,沒反應。。。。什麽鬼?消息丟了!!! 那我們回過頭來把no_ack=True註釋掉,這個的意思是"不確認",再測試。結果是把第一個消費者斷了,第二個消費者繼續處理消息,保證消息被處理完,那為什麽生產者知道消費者宕機了呢?因為socket斷了,它是連接RabbitMQ的,它斷了自然而然就知道消費者宕機了。。 一般我們不需要加no_ack=True參數,只有那些對生產者不關心的消息可以加上。
Python-RabbitMQ消息分發機制