[問題記錄]解決RabbitMQ訊息丟失與重複消費問題
本文僅記錄排查和問題定位、解決的過程。
1. 背景
最近使用者反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。
現象:集市SQL控制檯提交10個簡單SQL查詢 -> 訊息傳送方:傳送10條訊息至訊息佇列 -> 訊息消費方:只消費了7條訊息
2. 現狀
2.1. 當前SQL查詢的整體流程
- 生產者:PHP:
- 將使用者的SQL查詢記錄在DB表,標識查詢任務狀態(f_status)為執行中;
- 將DB表中的任務id、提交人等資訊傳送到RabbitMQ;
- 訊息佇列:RabbitMQ:
- PHP訊息提交到了交換機;
- 交換機再把訊息分發給指定的訊息佇列;
- 消費者:Python:
- 主程序監聽訊息佇列,一旦有訊息就不停拉取;
- 拉取一條訊息,就從程序池調起一個空閒程序來處理訊息;
- 隨後反饋ACK給訊息佇列,將訊息從訊息佇列中移除;
2.2. 訊息傳送方:Web端
結論:訊息傳送正常
排查步驟:檢視log
2.3. 訊息佇列
結論:訊息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue外掛,用於dump佇列的訊息;
1. 執行機:停止服務;
2. 使用者:提交10個SQL查詢:
3. 傳送方:檢視Web服務端的輸出日誌,確定10個訊息已經往訊息佇列寫;
4. 執行機:通過rabbitmq-dump-queue檢視佇列的訊息,確認是正常10個訊息寫入;
watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:[email protected]:5672" -queue ph_open_task'
5. 執行機:啟動服務,訊息佇列中的訊息全部被接收;
2.4. 訊息接收方
程式碼邏輯:
try:
pool = Pool(processes=40)
def callback(ch, method, properties, body):
try:
doSomething...
pool.apply_async(process)
except Exception as e:
print traceback.format_exc()
logger_msg.info(traceback.format_exc())
finally:
// 這裡會有問題,即使訊息未被處理也會反饋ACK給RabbitMQ
ch.basic_ack(delivery_tag=method.delivery_tag)
while True:
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='xxxxxxxx'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
except pika.exceptions.ConnectionClosed as e:
continue
except Exception as e:
logger_msg.info(traceback.format_exc())
finally:
channel.basic_ack(delivery_tag=method.delivery_tag)
pool.close()
pool.join()
結論:本例中消費者主程序將持續監聽MQ,一旦MQ有訊息將會拉取,隨後從程序池中啟動子程序來處理訊息,但是從程序池啟動子程序的過程並不一定成功(若當前程序池沒有空閒子程序),而主程序不管任何情況下都給MQ傳送ACK狀態碼,從而MQ將未處理的訊息移除掉,導致訊息丟失
3. 方案
問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:
* 原來邏輯:使用程序池技術,主程序負責監聽、接收MQ的訊息,子程序負責執行MQ的訊息,缺點是單一的主程序無法簡單處理ACK狀態碼,不易維護;
* 現有邏輯:使用RabbitMQ自身特性(work_queue),消費者不再維護程序池,是單程序,負責監聽、接收、處理MQ的訊息,處理完了以後再反饋ACK狀態碼,程序與程序之間互不干擾,易維護,併發量大時可隨時增加消費者程序;
目前方案的問題以及解決方案:
問題1:訊息重複消費
描述:使用者在頁面停止查詢時,會導致消費者程序被殺死,因此ACK狀態碼未反饋至MQ,從而訊息一直存留在MQ中,當新的消費者啟動時會重新消費;
解決方案:消費者每次執行查詢前,首先在DB上查詢任務的執行狀態,若處於「取消/失敗/成功」則表示已經由其它消費者消費過,那麼直接返回ACK狀態碼給MQ,將訊息從MQ中移除;問題2:程序池如何維護?
描述:使用者在頁面停止查詢時,會導致消費者程序被殺死,導致消費者數量減少;
解決方案:維護一個監控指令碼,每分鐘輪詢消費者程序數,若少於40個程序,則新啟動一個消費者,直到數量足夠;