RabbitMQ 釋出訂閱-實現延時重試佇列(參考)
RabbitMQ訊息處理失敗,我們會讓失敗訊息進入重試佇列等待執行,因為在重試佇列距離真正執行還需要定義的時間間隔,因此,我們可以將重試佇列設定成延時處理。今天參考網上其他人的實現,簡單梳理下訊息延時重試執行的思路。
消費失敗後,自動延時將訊息重新投遞,當達到一定的重試次數後,將訊息投遞到失敗訊息佇列,等待人工介入處理。在這裡我們一步一步實現一個帶有失敗重試功能的釋出訂閱元件,使用該元件後可以非常簡單的實現訊息的釋出訂閱。
業務背景
- 結合RabbitMQ的Topic模式和Work Queue模式實現生產方產生訊息,消費方按需訂閱,訊息投遞到消費方的佇列之後,多個worker同時對訊息進行消費
- 結合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實現訊息的延時重試功能
- 訊息達到最大重試次數之後,將其投遞到失敗佇列,等待人工介入處理bug後,重新將其加入佇列消費
執行流程圖
- 生產者釋出訊息到主Exchange
- 主Exchange根據Routing Key將訊息分發到對應的訊息佇列
- 多個消費者的worker程序同時對佇列中的訊息進行消費,因此它們之間採用“競爭”的方式來爭取訊息的消費
- 訊息消費後,不管成功失敗,都要返回ACK消費確認訊息給佇列,避免訊息消費確認機制導致重複投遞,同時,如果訊息處理成功,則結束流程,否則進入重試階段
- 如果重試次數小於設定的最大重試次數(預設為3次),則將訊息重新投遞到Retry Exchange的重試佇列
- 重試佇列不需要消費者直接訂閱,它會等待訊息的有效時間過期之後,重新將訊息投遞給Dead Letter Exchange,我們在這裡將其設定為主Exchange,實現延時後重新投遞訊息,這樣消費者就可以重新消費訊息
- 如果三次以上都是消費失敗,則認為訊息無法被處理,直接將訊息投遞給Failed Exchange的Failed Queue,這時候應用可以觸發報警機制,以通知相關責任人處理
- 等待人工介入處理(解決bug)之後,重新將訊息投遞到主Exchange,這樣就可以重新消費了
技術實現:
建立Exchange
為了實現訊息的延時重試和失敗儲存,我們需要建立三個Exchange來處理訊息。
- master 主Exchange,釋出訊息時釋出到該Exchange
- master.retry 重試Exchange,訊息處理失敗時(3次以內),將訊息重新投遞給該Exchange
- master.failed 失敗Exchange,超過三次重試失敗後,訊息投遞到該Exchange
所有的Exchange宣告(declare)必須使用以下引數
引數 | 值 | 說明 |
---|---|---|
exchange | - | Exchange名稱 |
type | topic | Exchange 型別 |
passive | false | 如果Exchange已經存在,則返回成功,不存在則建立 |
durable | true | 持久化儲存Exchange,這裡僅僅是Exchange本身持久化,訊息和佇列需要單獨指定其持久化 |
no-wait | false | 該方法需要應答確認 |
在RabbitMQ的管理介面中,我們可以看到建立的三個Exchange
訊息釋出
訊息釋出時,使用basic_publish
方法,引數如下
引數 | 值 | 說明 |
---|---|---|
message | - | 釋出的訊息物件 |
exchange | master | 訊息釋出到的Exchange |
routing-key | - | 路由KEY,用於標識訊息型別 |
mandatory | false | 是否強制路由,指定了該選項後,如果沒有訂閱該訊息,則會返回路由不可達錯誤 |
immediate | false | 指定了當訊息無法直接路由給消費者時如何處理 |
釋出訊息時,對於message
物件,其內容使用json編碼後的字串,同時訊息進行持久化
訊息訂閱
訊息訂閱的實現相對複雜一些,需要完成佇列的宣告以及佇列和Exchange的繫結
Declare Queue
對於每一個訂閱訊息的服務,都必須建立一個該服務對應的佇列,將該佇列繫結到關注的路由規則,這樣之後,訊息生產者將訊息投遞給Exchange之後,就會按照路由規則將訊息分發到對應的佇列供消費者消費了。
消費服務需要declare三個佇列
[queue_name]
佇列名稱,格式符合[服務名稱]@訂閱服務標識
[queue_name]@retry
重試佇列[queue_name]@failed
失敗佇列
Declare佇列時,引數規定規則如下
引數 | 值 | 說明 |
---|---|---|
queue | - | 佇列名稱 |
passive | false | 佇列不存在則建立,存在則直接成功 |
durable | true | 佇列持久化 |
exclusive | false | 排他,指定該選項為true則佇列只對當前連線有效,連線斷開後自動刪除 |
no-wait | false | 該方法需要應答確認 |
auto-delete | false | 當不再使用時,是否自動刪除 |
對於@retry
重試佇列,需要指定額外引數
'x-dead-letter-exchange' => 'master'
'x-dead-letter-routing-key' => [queue_name],
'x-message-ttl' => 30 * 1000 // 重試時間設定為30s
這裡的兩個header欄位的含義是,在佇列中延遲30s後,將該訊息重新投遞到x-dead-letter-exchange
對應的Exchange中,並且routing key指定為消費佇列的名稱,這樣就可以實現訊息只投遞給原始出錯時的佇列,避免訊息重新投遞給所有關注當前routing key的消費者了。
在RabbitMQ的管理介面中,Queues部分可以看到我們建立的三個佇列
檢視佇列的詳細資訊,我們可以看到 [email protected] 佇列與其它兩個佇列的不同
佇列和Exchange繫結
建立完佇列之後,需要將佇列與Exchange繫結(bind
),不同佇列需要繫結到之前建立的對應的Exchange上面
Queue | Exchange |
---|---|
[queue_name] | master |
[queue_name]@retry | master.retry |
[queue_name]@failed | master.failed |
繫結時,需要提供訂閱的路由KEY,該路由KEY與訊息釋出時的路由KEY對應,區別是這裡可以使用萬用字元同時訂閱多種型別的訊息。
引數 | 值 | 說明 |
---|---|---|
queue | - | 繫結的佇列 |
exchange | - | 繫結的Exchange |
routing-key | - | 訂閱的訊息路由規則 |
no-wait | false | 該方法需要應答確認 |
在RabbitMQ的管理介面中,我們可以看到該佇列與Exchange和routing-key的繫結關係
訊息消費實現
使用 basic_consume
對訊息進行消費的時候,需要注意下面引數
引數 | 值 | 說明 |
---|---|---|
queue | - | 消費的佇列名稱 |
consumer-tag | - | 消費者標識,留空即可 |
no_local | false | 如果設定了該欄位,伺服器將不會發布訊息到 釋出它的客戶端 |
no_ack | false | 需要消費確認應答 |
exclusive | false | 排他訪問,設定後只允許當前消費者訪問該佇列 |
nowait | false | 該方法需要應答確認 |
消費端在消費訊息時,需要從訊息中獲取訊息被消費的次數,以此判斷該訊息處理失敗時重試還是傳送到失敗佇列。
在訊息傳送到重試佇列和失敗佇列時,我們在訊息的headers中添加了一個名為x-orig-routing-key
的欄位,該欄位是實現訊息重試的關鍵欄位,由於我們的訊息需要在不同的Exchange,Queue之間流轉,為了避免訊息在重新投遞到主Exchange時,被所有的消費者佇列重新消費,在重試過程中,我們將訊息的routing-key修改為佇列名稱,直接投遞給原始消費訊息的佇列。x-orig-routing-key
用於在之後能夠重新獲取到最開始的routing-key。
這裡的重複消費是指 某個訊息被兩個消費方A和B消費了,其中A消費失敗,B成功,這時候,訊息由A消費者重新投遞到主Exchange後,B消費佇列也會獲取到該訊息,因此就會導致B消費者重複消費已經消費國的訊息
本文實現延時重試,使用了三個重試Exchange,Exchange如果訂閱特別多的話,Exchange的壓力會非常大,因此在非常極端的情況下,訊息大批量失敗,且訊息收發非常快,那麼Exchange的效能可能會有問題。
本文是使用釋出訂閱實現延時重試的訊息執行,也會有其他思路。