1. 程式人生 > >RabbitMQ 實踐之在處理非同步任務中的流程

RabbitMQ 實踐之在處理非同步任務中的流程

一、背景:

我司的系統,使用者可以建立任務,啟動任務,但任務的執行需要很長的時間,所以採用訊息佇列的方式,後臺非同步處理。

這裡所用到的是 RabbitMQ

二、MQ 處理任務的流程


① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,經過 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),傳送需要處理的任務的 uuid 入 MQ 佇列。這時候,還要修改資料庫,把該任務的狀態從 "new" -> "queue"。

⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 佇列吐出來的 message,即任務的 uuid。先修改資料庫該任務的狀態為"runnning", 然後呼叫"處理"模組去執行復雜運算,執行完成後,修改資料庫該任務的狀態是 "success" 還是 ”fail”,然後返回 ack 訊號給 MQ 。

注:這次的需求比較簡單,所以沒有用到 MQ 的交換機功能。

三、問與答


Q:如何做好 MQ 的錯誤處理?

MQ 的 connection 和 channel 物件都有 "error" 和 "close" 事件,需做好相關的日誌記錄。尤其是 "error",要加上 reconnect 機制,防止因為某個任務導致的錯誤或者 MQ 自身的原因,影響到後續任務的處理。

connection.on("error", function(err) {
        // reconnect 
});

channel.on("error", function(err) {
        // reconnect 
});

最後可以根據實際需要,在全域性加上 try……catch。

Q:如何保證 MQ 自身訊息的資料安全?

為了防止 MQ server 的崩潰導致的訊息損失,需要對資料做持久化。大致分兩塊:

佇列持久化 + 訊息持久化

channel.assertQueue(queue_name, {
        durable: true
        // 佇列持久化
}); 
channel.sendToQueue(
          queue_name,
          Buffer.from(uuid),
          {
            persistent: true
            // 訊息持久化
          },
          function(err, ok) { 
          
          }
);

Q:如何保證 DB 跟 MQ 資料的一致性?

1、傳送 message 時

④ 中的 sendToQueue() ,需要在 createConfirmChannel() 的基礎下使用,這樣 sendToQueue() 的第三個引數才有 MQ 收到 message 成功與否的回撥,根據這個,去結合 ② 的 DB 操作, 繫結為事務,來保證資料的一致性。

2、接受 message 時

channel.consume() 需開啟 ack 模式,等 Consumer 端一切確認完成後,再通知 MQ 。

channel.consume(
        queue_name,
        function(msg) {
          const uuid = msg.content.toString();
                // use uuid todo…… 
        },
        {
          noAck: false
        }
);

Q:如何避免 MQ 多發、少發的問題

從上面的 如何保證 DB 跟 MQ 資料的一致性? 其實就避免了該問題的發生。

但是額外要做的是:

1、重試機制,例如 傳送 message 失敗,規定重試的次數。

2、善用 MQ 的 Web 控制檯,地址形如 http://localhost:15672。除了關注基本的伺服器負載狀態,還要關注任務佇列是否正常吞吐,是否有卡殼。

3、構建運維一體的後臺管控系統,比上面的 2 自定義程度更高。

4、提供使用者類似"提交工單"/"問題反饋"/"錯誤上傳"的功能,查缺補漏