第一階段 javaSE 面向物件 02
1. 概念
如何保障當 RabbitMQ 服務停掉以後訊息生產者傳送過來的訊息不丟失。預設情況下 RabbitMQ 退出或由於某種原因崩潰時,它忽視佇列和訊息,除非告知它不要這樣做。確保訊息不會丟失需要做兩件事: 我們需要將佇列和訊息都標記為持久化。
2. 佇列如何實現持久化
之前我們建立的佇列都是非持久化的, rabbitmq 如果重啟的化,該佇列就會被刪除掉,如果要佇列實現持久化需要在宣告佇列的時候把 durable 引數設定為持久化
但是需要注意的就是如果之前宣告的佇列不是持久化的,需要把原先佇列先刪除,或者重新建立一個持久化的佇列,不然就會出現錯誤。
以下為控制檯中持久化與非持久化佇列的 UI 顯示區:
這個時候即使重啟 rabbitmq 佇列也依然存在。
3. 訊息實現持久化
要想讓訊息實現持久化需要在訊息生產者修改程式碼, MessageProperties.PERSISTENT_TEXT_PLAIN 新增這個屬性。
將訊息標記為持久化並不能完全保證不會丟失訊息。儘管它告訴 RabbitMQ 將訊息儲存到磁碟,但是這裡依然存在當訊息剛準備儲存在磁碟的時候但是還沒有儲存完,訊息還在快取的一個間隔點。此時並沒有真正寫入磁碟。永續性保證並不強,但是對於我們的簡單任務佇列而言,這已經綽綽有餘了。如果需要更強有力的持久化策略,可通過釋出確認方式實現。
4. 不公平分發
在最開始的時候我們學習到 RabbitMQ 分發訊息採用的輪訓分發,但是在某種場景下這種策略並不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2處理速度卻很慢,這個時候我們還是採用輪訓分發的化就會到這處理速度快的這個消費者很大一部分時間處於空閒狀態,而處理慢的那個消費者一直在幹活,這種分配方式在這種情況下其實就不太好,但是RabbitMQ 並不知道這種情況它依然很公平的進行分發。
為了避免這種情況,我們可以設定引數 channel.basicQos(1);
意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然後 rabbitmq 就會把該任務分配給沒有那麼忙的那個空閒消費者,當然如果所有的消費者都沒有完成手上任務,佇列還在不停的新增新任務,佇列有可能就會遇到佇列被撐滿的情況,這個時候就只能新增新的 worker 或者改變其他儲存任務的策略。
5. 預取值
本身訊息的傳送就是非同步傳送的,所以在任何時候, channel 上肯定不止只有一個訊息另外來自消費者的,手動確認本質上也是非同步的。因此這裡就存在一個未確認的訊息緩衝區,因此希望開發人員能限制此緩衝區的大小,以避免緩衝區裡面無限制的未確認訊息問題。
這個時候就可以通過使用 basic.qos 方法設定“預取計數” 值來完成的。 該值定義通道上允許的未確認訊息的最大數量。一旦數量達到配置的數量,RabbitMQ 將停止在通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認,例如,假設在通道上有未確認的訊息 5、 6、 7, 8,並且通道的預取計數設定為 4,此時RabbitMQ 將不會在該通道上再傳遞任何訊息,除非至少有一個未應答的訊息被 ack。比方說 tag=6 這個訊息剛剛被確認 ACK, RabbitMQ 將會感知到這個情況並再傳送一條訊息。
訊息應答和 QoS 預取值對使用者吞吐量有重大影響。通常,增加預取將提高向消費者傳遞訊息的速度。 雖然自動應答傳輸訊息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的訊息的數量也會增加,從而增加了消費者的 RAM 消耗(隨機存取儲存器)應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的訊息如果沒有確認的話,會導致消費者連線節點的記憶體消耗變大,所以找到合適的預取值是一個反覆試驗的過程,不同的負載該值取值也不同 100 到 300 範圍內的值通常可提供最佳的吞吐量,並且不會給消費者帶來太大的風險。預取值為 1 是最保守的。當然這將使吞吐量變得很低,特別是消費者連線延遲很嚴重的情況下,特別是在消費者連線等待時間較長的環境中。對於大多數應用來說,稍微高一點的值將是最佳的。