1. 程式人生 > >RabbitMQ訊息可靠性分析

RabbitMQ訊息可靠性分析

Introduction

有很多人問過我這麼一類問題:RabbitMQ如何確保訊息可靠?很多時候,筆者的回答都是:說來話長的事情何來長話短說。的確,要確保訊息可靠不只是單單幾句就能夠敘述明白的,包括Kafka也是如此。可靠並不是一個絕對的概念,曾經有人也留言說過類似全部磁碟損毀也會導致訊息丟失,筆者戲答:還有機房被炸了也會導致訊息丟失。可靠性是一個相對的概念,在條件合理的範圍內系統所能確保的多少個9的可靠性。一切儘可能的趨於完美而無法企及於完美。
我們可以儘可能的確保RabbitMQ的訊息可靠。在詳細論述RabbitMQ的訊息可靠性之前,我們先來回顧下訊息在RabbitMQ中的經由之路。
這裡寫圖片描述

如圖所示,從AMQP協議層面上來說:
1. 訊息先從生產者Producer出發到達交換器Exchange;
2. 交換器Exchange根據路由規則將訊息轉發對應的佇列Queue之上;
3. 訊息在佇列Queue上進行儲存;
4. 消費者Consumer訂閱佇列Queue並進行消費。
我們對於訊息可靠性的分析也從這四個階段來一一探討。

Phase 1

訊息從生產者發出到達交換器Exchange,在這個過程中可以發生各種情況,生產者客戶端傳送出去之後可以發生網路丟包、網路故障等造成訊息丟失。一般情況下如果不採取措施,生產者無法感知訊息是否已經正確無誤的傳送到交換器中。如果訊息在傳輸到Exchange的過程中發生失敗而可以讓生產者感知的話,生產者可以進行進一步的處理動作,比如重新投遞相關訊息以確保訊息的可靠性。

為此AMQP協議在建立之初就考慮到這種情況而提供了事務機制。RabbitMQ客戶端中與事務機制相關的方法有三個:channel.txSelect、channel.txCommit以及channel.txRollback。channel.txSelect用於將當前的通道設定成事務模式,channel.txCommit用於提交事務,而channel.txRollback用於事務回滾。在通過channel.txSelect方法開啟事務之後,我們便可以釋出訊息給RabbitMQ了,如果事務提交成功,則訊息一定到達了RabbitMQ中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因丟擲異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。注意這裡的RabbitMQ中的事務機制與大多數資料庫中的事務概念並不相同,需要注意區分。

事務確實能夠解決訊息傳送方和RabbitMQ之間訊息確認的問題,只有訊息成功被RabbitMQ接收,事務才能提交成功,否則我們便可在捕獲異常之後進行事務回滾,與此同時可以進行訊息重發。但是使用事務機制的話會“吸乾”RabbitMQ的效能,那麼有沒有更好的方法既能保證訊息傳送方確認訊息已經正確送達,又能基本上不帶來效能上的損失呢?從AMQP協議層面來看並沒有更好的辦法,但是RabbitMQ提供了一個改進方案,即傳送方確認機制(publisher confirm)。

生產者將通道設定成confirm(確認)模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所有匹配的佇列之後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含訊息的唯一ID),這就使得生產者知曉訊息已經正確到達了目的地了。RabbitMQ回傳給生產者的確認訊息中的deliveryTag包含了確認訊息的序號,此外RabbitMQ也可以設定channel.basicAck方法中的multiple引數,表示到這個序號之前的所有訊息都已經得到了處理。
這裡寫圖片描述
事務機制在一條訊息傳送之後會使傳送端阻塞,以等待RabbitMQ的迴應,之後才能繼續傳送下一條訊息。相比之下,傳送方確認機制最大的好處在於它是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用便可以通過回撥方法來處理該確認訊息,如果RabbitMQ因為自身內部錯誤導致訊息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程式同樣可以在回撥方法中處理該nack命令。

生產者通過呼叫channel.confirmSelect方法(即Confirm.Select命令)將通道設定為confirm模式,之後RabbitMQ會返回 Confirm.Select-Ok命令表示同意生產者將當前通道設定為confirm模式。所有被髮送的後續訊息都被ack或者nack一次,不會出現一條訊息即被ack又被nack的情況。並且RabbitMQ也並沒有對訊息被confirm的快慢做任何保證。

事務機制和publisher confirm機制兩者是互斥的,不能共存。如果企圖將已開啟事務模式的通道再設定為publisher confirm模式,RabbitMQ會報錯:{amqp_error, precondition_failed, “cannot switch from tx to confirm mode”, ‘confirm.select’},或者如果企圖將已開啟publisher confirm模式的通道在設定為事務模式的話,RabbitMQ也會報錯:{amqp_error, precondition_failed, “cannot switch from confirm to tx mode”, ‘tx.select’ }。

事務機制和publisher confirm機制確保的是訊息能夠正確的傳送至RabbitMQ,這裡的“傳送至RabbitMQ”的含義是指訊息被正確的發往至RabbitMQ的交換器,如果此交換器沒有匹配的佇列的話,那麼訊息也將會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的佇列。更進一步的講,傳送方要配合mandatory引數或者備份交換器一起使用來提高訊息傳輸的可靠性。

Phase 2

mandatory和immediate是channel.basicPublish方法中的兩個引數,它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能。而RabbitMQ提供的備份交換器(Alternate Exchange)可以將未能被交換器路由的訊息(沒有繫結佇列或者沒有匹配的繫結)儲存起來,而不用返回給客戶端。
RabbitMQ 3.0版本開始去掉了對於immediate引數的支援,對此RabbitMQ官方解釋是:immediate引數會影響映象佇列的效能,增加程式碼複雜性,建議採用TTL和DLX的方法替代。所以本文只簡單介紹mandatory和備份交換器。
當mandatory引數設為true時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列的話,那麼RabbitMQ會呼叫Basic.Return命令將訊息返回給生產者。當mandatory引數設定為false時,出現上述情形的話,訊息直接被丟棄。 那麼生產者如何獲取到沒有被正確路由到合適佇列的訊息呢?這時候可以通過呼叫channel.addReturnListener來新增ReturnListener監聽器實現。使用mandatory引數的關鍵程式碼如下所示:

channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
            .BasicProperties basicProperties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("Basic.Return返回的結果是:" + message);
    }
});

上面程式碼中生產者沒有成功的將訊息路由到佇列,此時RabbitMQ會通過Basic.Return返回“mandatory test”這條訊息,之後生產者客戶端通過ReturnListener監聽到了這個事件,上面程式碼的最後輸出應該是“Basic.Return返回的結果是:mandatory test”。

生產者可以通過ReturnListener中返回的訊息來重新投遞或者其它方案來提高訊息的可靠性。
備份交換器,英文名稱Alternate Exchange,簡稱AE,或者更直白的可以稱之為“備胎交換器”。生產者在傳送訊息的時候如果不設定mandatory引數,那麼訊息在未被路由的情況下將會丟失,如果設定了mandatory引數,那麼需要新增ReturnListener的程式設計邏輯,生產者的程式碼將變得複雜化。如果你不想複雜化生產者的程式設計邏輯,又不想訊息丟失,那麼可以使用備份交換器,這樣可以將未被路由的訊息儲存在RabbitMQ中,再在需要的時候去處理這些訊息。 可以通過在宣告交換器(呼叫channel.exchangeDeclare方法)的時候新增alternate-exchange引數來實現,也可以通過策略的方式實現。如果兩者同時使用的話,前者的優先順序更高,會覆蓋掉Policy的設定。

參考下圖,如果此時我們傳送一條訊息到normalExchange上,當路由鍵等於“normalKey”的時候,訊息能正確路由到normalQueue這個佇列中。如果路由鍵設為其他值,比如“errorKey”,即訊息不能被正確的路由到與normalExchange繫結的任何佇列上,此時就會發送給myAe,進而傳送到unroutedQueue這個佇列。
這裡寫圖片描述
備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設定為fanout型別,如若讀者想設定為direct或者topic的型別也沒有什麼不妥。需要注意的是訊息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的。備份交換器的實質就是原有交換器的一個“備胎”,所有無法正確路由的訊息都發往這個備份交換器中,可以為所有的交換器設定同一個AE,不過這裡需要提前確保的是AE已經正確的綁定了佇列,最好型別也是fanout的。如果備份交換器和mandatory引數一起使用,那麼mandatory引數無效。

Phase 3

mandatory或者AE可以讓訊息在路由到佇列之前得到極大的可靠性保障,但是訊息存入佇列之後的可靠性又如何保證?

首先是持久化。持久化可以提高佇列的可靠性,以防在異常情況(重啟、關閉、宕機等)下的資料丟失。佇列的持久化是通過在宣告佇列時將durable引數置為true實現的,如果佇列不設定持久化,那麼在RabbitMQ服務重啟之後,相關佇列的元資料將會丟失,此時資料也會丟失。正所謂“皮之不存,毛將焉附”,佇列都沒有了,訊息又能存在哪裡呢?佇列的持久化能保證其本身的元資料不會因異常情況而丟失,但是並不能保證內部所儲存的訊息不會丟失。要確保訊息不會丟失,需要將其設定為持久化。通過將訊息的投遞模式(BasicProperties中的deliveryMode屬性)設定為2即可實現訊息的持久化。

設定了佇列和訊息的持久化,當RabbitMQ服務重啟之後,訊息依舊存在。單單隻設定佇列持久化,重啟之後訊息會丟失;單單隻設定訊息的持久化,重啟之後佇列消失,既而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。

在持久化的訊息正確存入RabbitMQ之後,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁碟之中。RabbitMQ並不會為每條訊息都做同步存檔(呼叫核心的fsync6方法)的處理,可能僅僅儲存到作業系統快取之中而不是物理磁碟之中。如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異常情況,訊息儲存還沒來得及落盤,那麼這些訊息將會丟失。

如果在Phase1中採用了事務機制或者publisher confirm機制的話,服務端的返回是在訊息落盤之後執行的,這樣可以進一步的提高了訊息的可靠性。但是即便如此也無法避免單機故障且無法修復(比如磁碟損毀)而引起的訊息丟失,這裡就需要引入映象佇列。映象佇列相當於配置了副本,絕大多數分散式的東西都有多副本的概念來確保HA。在映象佇列中,如果主節點(master)在此特殊時間內掛掉,可以自動切換到從節點(slave),這樣有效的保證了高可用性,除非整個叢集都掛掉。雖然這樣也不能完全的保證RabbitMQ訊息不丟失(比如機房被炸。。。),但是配置了映象佇列要比沒有配置映象佇列的可靠性要高很多,在實際生產環境中的關鍵業務佇列一般都會設定映象佇列。

Phase 4

進一步的從消費者的角度來說,如果在消費者接收到相關訊息之後,還沒來得及處理就宕機了,這樣也算資料丟失。

為了保證訊息從佇列可靠地達到消費者,RabbitMQ提供了訊息確認機制(message acknowledgement)。消費者在訂閱佇列時,可以指定autoAck引數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上是先打上刪除標記,之後再刪除)。當autoAck等於true時,RabbitMQ會自動把傳送出去的訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正的消費到了這些訊息。

採用訊息確認機制後,只要設定autoAck引數為false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ會一直等待持有訊息直到消費者顯式呼叫Basic.Ack命令為止。

當autoAck引數置為false,對於RabbitMQ服務端而言,佇列中的訊息分成了兩個部分:一部分是等待投遞給消費者的訊息;一部分是已經投遞給消費者,但是還沒有收到消費者確認訊號的訊息。如果RabbitMQ一直沒有收到消費者的確認訊號,並且消費此訊息的消費者已經斷開連線,則RabbitMQ會安排該訊息重新進入佇列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。

RabbitMQ不會為未確認的訊息設定過期時間,它判斷此訊息是否需要重新投遞給消費者的唯一依據是消費該訊息的消費者連線是否已經斷開,這麼設計的原因是RabbitMQ允許消費者消費一條訊息的時間可以很久很久。

如果訊息消費失敗,也可以呼叫Basic.Reject或者Basic.Nack來拒絕當前訊息而不是確認,如果只是簡單的拒絕那麼訊息會丟失,需要將相應的requeue引數設定為true,那麼RabbitMQ會重新將這條訊息存入佇列,以便可以傳送給下一個訂閱的消費者。如果requeue引數設定為false的話,RabbitMQ立即會把訊息從佇列中移除,而不會把它傳送給新的消費者。

還有一種情況需要考慮:requeue的訊息是存入佇列頭部的,即可以快速的又被髮送給消費,如果此時消費者又不能正確的消費而又requeue的話就會進入一個無盡的迴圈之中。對於這種情況,筆者的建議是在出現無法正確消費的訊息時不要採用requeue的方式來確保訊息可靠性,而是重新投遞到新的佇列中,比如設定的死信佇列中,以此可以避免前面所說的死迴圈而又可以確保相應的訊息不丟失。對於死信佇列中的訊息可以用另外的方式來消費分析,以便找出問題的根本。

歡迎支援筆者新書:《RabbitMQ實戰指南》以及關注微信公眾號:朱小廝的部落格。