1. 程式人生 > 實用技巧 >RabbitMQ 基礎概念進階

RabbitMQ 基礎概念進階

上一篇 RabbitMQ 入門之基礎概念 介紹了 RabbitMQ 的一些基礎概念,本文再來介紹其中的一些細節和其它的進階的概念。

一、訊息生產者傳送的訊息不可達時如何處理

RabbitMQ 提供了訊息在傳遞過程中無法傳送到一個佇列(比如根據自己的型別和路由鍵沒有找到匹配的佇列)時將訊息回傳給訊息傳送方的功能,使用 RabbitMQ 的客戶端提供 channel.basicPublish 方法的兩個引數 mandatory 和 immediate (RabbitMQ 3.0 以下版本),除此之外還提供了一個備份交換器可以將無法傳送的訊息儲存起來處理,不用重新傳回給傳送方。

1.1 mandatory 引數

mandatory 被定義在 RabbitMQ 提供的客戶端的 channel.basicPublish 方法中,如下所示:

當把方法的 mandatory 引數設定為 true 時,那麼會在交換器無法根據自身的型別和路由鍵找到一個符合要求的佇列時,RabbitMQ 會自動呼叫 Basic.Return 把該訊息回傳給傳送方也就是我們的訊息生產者。反之,如果設定為 false 的話,訊息就會被直接丟棄掉。那麼問題來了,我們要如何去獲取這些沒有被髮送出去的訊息呢?RabbitMQ 給我們提供了事件監聽機制來獲取這種訊息,可以通過 addReturnListener 方法新增一個 ReturnListener

 來獲取這種未傳送到佇列的訊息,如下所示:

通過檢視 ReturnListener 介面的原始碼可以看到,該介面只有一個方法,如果是 JDK8+ 的版本的話可以使用 Lambda 表示式來簡化一些程式碼。

可以看出,當設定了 mandatory 引數時,還必須為生產者同時新增 ReturnListener 監聽器的程式設計邏輯,這樣就會使得生產者的程式碼變得更加複雜了,為了處理這種情況,RabbitMQ 提供了 `備份交換器` 來將沒有成功路由出去的訊息儲存起來,當我們需要的時候再去處理即可。

1.2 immediate 引數

該的引數同樣也是在channel.basicPublish 方法中定義的,其官方描述如下:

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.

當把 immediate 引數設定為 true 時,如果交換器根據其型別和路由鍵找到符合要求的佇列時,發現所有佇列上沒有任何消費者,則該訊息並不會存入到佇列中,會通過 Basic.Return 命令把訊息回傳給生產者。簡而言之也就是說,當設定了 immediate 引數時,該訊息關聯的佇列上存在消費者時,會立即傳送訊息到該佇列中,反之如果匹配的佇列上不存在任何消費者,則直接把訊息回傳給生產者。這裡有一點需要注意的是:從 RabbitMQ 3.0 + 已經去除了該引數。

二、如何對訊息和佇列設定過期時間 (TTL)

TTL 是 time to live 首字母的簡稱,RabbitMQ 中可以設定訊息和佇列的過期時間,我們先來看看要如何設定訊息的過期時間。

1.1 訊息 TTL 設定

RabbitMQ 提供了兩種設定訊息的過期時間,第一種是通過佇列的屬性設定,該方式的特點就是佇列中所有訊息的過期時間都一致。還有一種是更小粒度的設定,就是對每條訊息單獨設定過期時間,這種方式更加靈活,每條訊息的過期時間都可以不一樣。這是你可能會問,如果同時設定了佇列的過期屬性和訊息本身的過期屬性,最終以哪個為準呢?結果是 RabbitMQ 會比較這兩個 TTL 的值大小,以較小的那個為準。很容易想到,通過佇列的屬性的方式設定過期時間的話是在宣告佇列的時候指定,對應到客戶端就是其提供的 channel.queueDeclare 方法的引數 arguments 指定,示例程式碼如下:

需要注意的是 x-message-ttl 引數的單位是毫秒。如果不設定 TLL,則表示該訊息不會過期,如果將 TTL 設定為 0,表示除非此時可以把訊息直接傳送投遞到消費者端去,否則就會直接丟棄該訊息。

準對每條訊息設定 TTL 的方法是在傳送訊息的時候設定的,對應到客戶端方法是 channel.basicPublish 的 expiration 屬性引數,具體設定程式碼如下:

這種設定方式,即使佇列過期也不會立即從佇列中移除,因為每條訊息是否過期的判定是在傳送到消費者是才進行的,如果此時發現已經過期才會刪除訊息。而對於第一種方式則會把已經過期的訊息移到佇列頭部,然後 RabbitMQ 只要定期的從頭開始掃描是否存在過期的訊息即可。

1.2 佇列 TTL 設定

設定佇列的過期時間使用的是客戶端的 channel.queueDeclare 方法引數中的 x-expires 引數,其單位同樣也是毫秒,不過需要注意的是它不能設定為 0。設定佇列過期的程式碼如下所示:

上面程式碼建立了一個過期時間為 15 分鐘的佇列。

三、死信佇列介紹

死信交換器(DLX)的全稱是 Dead-Letter-Exchange ,也稱之為死信郵箱。簡單來說就是當一個訊息由於 訊息被拒絕 、 訊息過期 、 佇列達到最大長度 時,變成死信(dead message)之後,會被重新發送到一個交換器中,這個交換器就是死信交換器,繫結在這個交換器上的佇列就稱之為死信佇列。死信交換器實際上就是平常的交換器,可以在任何佇列上指定,當在一個佇列上設定死信交換器後,如果該隊列出現死信時就會被 RabbitMQ 把死信訊息重新發送到死信交換器上去,然後路由到死信佇列中,我們可以監聽這個佇列來處理那些死信訊息。為一個佇列設定死信交換器是在生產者的宣告佇列的方法中設定 x-dead-letter-message 引數來實現的,如下所示:

同時也可以通過 x-dead-letter-routing-key 引數設定死信互動器的路由鍵,不設定預設使用原始度列的路由鍵。可以到 RabbitMQ 的後臺管理介面,有 DLX 標誌的就是死信佇列。

RabbitMQ 提供的 DLX 是個比較實用的功能特性,它可以在我們訊息不能被消費者正確消費的情況下放入到死信佇列,後續我們可以通過這個死信佇列的內容來檢視異常情況來改造和優化系統。

四、延遲佇列介紹

顧名思義,延遲佇列儲存的是哪些需要等待指定時間後才能拿到的延遲訊息,一個比較典型的場景就是訂單 30 分鐘後未支付取消訂單。這裡需要注意的是,在 RabbitMQ 中並沒有直接提供延遲佇列的功能,而是需要通過上面介紹的過期時間(TTL)和死信佇列一起來實現,比如超時取消訂單這個場景,我們可以讓消費者訂閱死信佇列,設定正常的那個佇列的超時時間為 30 分鐘並繫結到該死信佇列上,當訊息超過 30 分鐘未被處理後訊息就會把傳送到死信佇列中,然後死信佇列的消費者就可以在 30 分鐘後成功的消費到該訊息了。

同時當我們有其它的超時配置需求時也很方便擴充套件,比如可以在生產者傳送訊息的時候通過設定不同的路由鍵,通過路由鍵來將訊息傳送到與交換器繫結的不同佇列中,然後這些佇列分別設定不同的過期時間和與之相對應的死信佇列,當訊息過期時就會被 RabbitMQ 轉發到相應的死信佇列中,這樣就可以去訂閱相應的死信佇列即可。

五、交換器、訊息和佇列持久化

持久化可以提高可靠性,可以防止宕機或者重啟等異常下資料的丟失,RabbitMQ 的持久化從組成結構上可以分為三個部分,即交換器持久化、訊息持久化和佇列持久化。

1.1 交換器持久化

交換器持久化是在宣告交換器時將 durable 引數設定為 true 來實現的。如果不設定持久化屬性的話,當 RabbitMQ 服務重啟後交換器的資料就會丟失,需要注意的是,是交換器的資料丟失,訊息不會丟失,只是不能將訊息傳送到這個交換器中了,一般生產環境使用都會把該屬性設定為持久化。

1.2 訊息持久化

交換器的持久化僅僅只是保證了交換器本身的元資料不會丟失,無法保證其儲存的訊息不會丟失,如果需要其內部儲存的訊息不丟失,則需要設定訊息的持久化,通過將訊息的投遞模式(deliveryMode)設定為 2 即可實現訊息的持久化,如下所示:

需要訊息持久化的前提是其所在的佇列也要設定持久化,假如僅僅只設定訊息的持久化的話,RabbitMQ 重啟之後佇列消失,然後訊息也會丟失。這裡有點需要注意一下,雖然持久化可以提高可靠性,但是持久化是將資料儲存到硬碟上,比直接操作記憶體要慢很多,所以對於哪些可靠性要求不高的業務不需要進行持久化。

1.3 佇列持久化

佇列的持久化的設定和交換器持久化類似,同樣也是在宣告的時候通過 durable 引數設定為 true 實現的,如果不設定,當 RabbitMQ 重啟後,相關的佇列元資料也會丟失,相應的其儲存的訊息也會隨之丟失掉。

將交換器、佇列、訊息都設定了持久化之後就能百分之百保證資料不丟失了嗎?其實無法保證百分之百資料不丟失。比如消費者在訂閱消費佇列時將自動應答(autoAck)引數設定為 true 的話,在接收到訊息後還沒來得及處理就掛了,這時需要把自動應答設定 false,進行手動 ack 應答即可。還有一個就是由於不是實時持久化存檔,當訊息存檔的過程中 RabbitMQ 宕機了,此時也會發生資料丟失,此時需要通過 RabbitMQ 的 映象佇列機制 來處理了。

五、總結

本文主要介紹了一些引數具體使用時的設定細節和死信佇列、延遲佇列以及持久化等,還有一些比較重要的點沒有涉及到,比如訊息確認機制。“紙上得來終覺淺,絕知此事要躬行”,在瞭解一些基礎的概念之後還是需要通過具體編碼實踐才能對其更加理解深刻。