RabbitMQ 實踐之在處理非同步任務中的流程
一、背景:
我司的系統,使用者可以建立任務,啟動任務,但任務的執行需要很長的時間,所以採用訊息佇列的方式,後臺非同步處理。
這裡所用到的是 RabbitMQ
。
二、MQ 處理任務的流程
① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,經過 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),傳送需要處理的任務的 uuid 入 MQ 佇列。這時候,還要修改資料庫,把該任務的狀態從 "new" -> "queue"。
⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 佇列吐出來的 message,即任務的 uuid。先修改資料庫該任務的狀態為"runnning", 然後呼叫"處理"模組去執行復雜運算,執行完成後,修改資料庫該任務的狀態是 "success" 還是 ”fail”,然後返回 ack 訊號給 MQ 。
注:這次的需求比較簡單,所以沒有用到 MQ 的交換機功能。
三、問與答
Q:如何做好 MQ 的錯誤處理?
MQ 的 connection 和 channel 物件都有 "error" 和 "close" 事件,需做好相關的日誌記錄。尤其是 "error",要加上 reconnect 機制,防止因為某個任務導致的錯誤或者 MQ 自身的原因,影響到後續任務的處理。
connection.on("error", function(err) { // reconnect }); channel.on("error", function(err) { // reconnect });
最後可以根據實際需要,在全域性加上 try……catch。
Q:如何保證 MQ 自身訊息的資料安全?
為了防止 MQ server 的崩潰導致的訊息損失,需要對資料做持久化。大致分兩塊:
佇列持久化 + 訊息持久化
channel.assertQueue(queue_name, {
durable: true
// 佇列持久化
});
channel.sendToQueue( queue_name, Buffer.from(uuid), { persistent: true // 訊息持久化 }, function(err, ok) { } );
Q:如何保證 DB 跟 MQ 資料的一致性?
1、傳送 message 時
④ 中的 sendToQueue() ,需要在 createConfirmChannel() 的基礎下使用,這樣 sendToQueue() 的第三個引數才有 MQ 收到 message 成功與否的回撥,根據這個,去結合 ② 的 DB 操作, 繫結為事務,來保證資料的一致性。
2、接受 message 時
channel.consume() 需開啟 ack 模式,等 Consumer 端一切確認完成後,再通知 MQ 。
channel.consume(
queue_name,
function(msg) {
const uuid = msg.content.toString();
// use uuid todo……
},
{
noAck: false
}
);
Q:如何避免 MQ 多發、少發的問題
從上面的 如何保證 DB 跟 MQ 資料的一致性? 其實就避免了該問題的發生。
但是額外要做的是:
1、重試機制,例如 傳送 message 失敗,規定重試的次數。
2、善用 MQ 的 Web 控制檯,地址形如 http://localhost:15672。除了關注基本的伺服器負載狀態,還要關注任務佇列是否正常吞吐,是否有卡殼。
3、構建運維一體的後臺管控系統,比上面的 2 自定義程度更高。
4、提供使用者類似"提交工單"/"問題反饋"/"錯誤上傳"的功能,查缺補漏