1. 程式人生 > >[問題記錄]解決RabbitMQ訊息丟失與重複消費問題

[問題記錄]解決RabbitMQ訊息丟失與重複消費問題

本文僅記錄排查和問題定位、解決的過程。

1. 背景

最近使用者反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。

現象:集市SQL控制檯提交10個簡單SQL查詢 -> 訊息傳送方:傳送10條訊息至訊息佇列 -> 訊息消費方:只消費了7條訊息

2. 現狀

2.1. 當前SQL查詢的整體流程

1

  • 生產者:PHP
    • 將使用者的SQL查詢記錄在DB表,標識查詢任務狀態(f_status)為執行中;
    • 將DB表中的任務id、提交人等資訊傳送到RabbitMQ;
  • 訊息佇列:RabbitMQ
    • PHP訊息提交到了交換機;
    • 交換機再把訊息分發給指定的訊息佇列;
  • 消費者:Python
    • 主程序監聽訊息佇列,一旦有訊息就不停拉取;
    • 拉取一條訊息,就從程序池調起一個空閒程序來處理訊息;
    • 隨後反饋ACK給訊息佇列,將訊息從訊息佇列中移除;

2.2. 訊息傳送方:Web端

結論:訊息傳送正常
排查步驟:檢視log

2.3. 訊息佇列

結論:訊息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue外掛,用於dump佇列的訊息;
1. 執行機:停止服務;
2. 使用者:提交10個SQL查詢:
3. 傳送方:檢視Web服務端的輸出日誌,確定10個訊息已經往訊息佇列寫;
4. 執行機:通過rabbitmq-dump-queue檢視佇列的訊息,確認是正常10個訊息寫入;

watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:[email protected]:5672" -queue ph_open_task'

3
5. 執行機:啟動服務,訊息佇列中的訊息全部被接收;

2.4. 訊息接收方

程式碼邏輯:

try:
    pool = Pool(processes=40)

    def callback(ch, method, properties, body):
        try:
            doSomething...
            pool.apply_async(process)
        except
Exception as e: print traceback.format_exc() logger_msg.info(traceback.format_exc()) finally: // 這裡會有問題,即使訊息未被處理也會反饋ACK給RabbitMQ ch.basic_ack(delivery_tag=method.delivery_tag) while True: try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='xxxxxxxx')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=queue_name, no_ack=False) channel.start_consuming() except pika.exceptions.ConnectionClosed as e: continue except Exception as e: logger_msg.info(traceback.format_exc()) finally: channel.basic_ack(delivery_tag=method.delivery_tag) pool.close() pool.join()

結論:本例中消費者主程序將持續監聽MQ,一旦MQ有訊息將會拉取,隨後從程序池中啟動子程序來處理訊息,但是從程序池啟動子程序的過程並不一定成功(若當前程序池沒有空閒子程序),而主程序不管任何情況下都給MQ傳送ACK狀態碼,從而MQ將未處理的訊息移除掉,導致訊息丟失

3. 方案

問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:
* 原來邏輯:使用程序池技術,主程序負責監聽、接收MQ的訊息,子程序負責執行MQ的訊息,缺點是單一的主程序無法簡單處理ACK狀態碼,不易維護;
* 現有邏輯:使用RabbitMQ自身特性(work_queue),消費者不再維護程序池,是單程序,負責監聽、接收、處理MQ的訊息,處理完了以後再反饋ACK狀態碼,程序與程序之間互不干擾,易維護,併發量大時可隨時增加消費者程序;

目前方案的問題以及解決方案:

  • 問題1:訊息重複消費
    描述:使用者在頁面停止查詢時,會導致消費者程序被殺死,因此ACK狀態碼未反饋至MQ,從而訊息一直存留在MQ中,當新的消費者啟動時會重新消費;
    解決方案:消費者每次執行查詢前,首先在DB上查詢任務的執行狀態,若處於「取消/失敗/成功」則表示已經由其它消費者消費過,那麼直接返回ACK狀態碼給MQ,將訊息從MQ中移除;

  • 問題2:程序池如何維護?
    描述:使用者在頁面停止查詢時,會導致消費者程序被殺死,導致消費者數量減少;
    解決方案:維護一個監控指令碼,每分鐘輪詢消費者程序數,若少於40個程序,則新啟動一個消費者,直到數量足夠;

4