解決RabbitMQ訊息丟失問題和保證訊息可靠性
工作中經常用到訊息中介軟體來解決系統間的解耦問題或者高併發消峰問題,但是訊息的可靠性如何保證一直是個很大的問題,萬一訊息丟了怎麼辦?什麼情況下訊息就不見了呢?下面通過這篇文章,我們就聊聊RabbitMQ 中訊息的可靠性如何解決的?
本文分三部分說明
- mq訊息丟失場景有哪些?
- 如何避免訊息丟失?
- 大廠如何解決這些問題的?
mq訊息丟失場景有哪些?
首先我們看下訊息週期投遞過程:
我們把該圖分三部分,左中右三部分,每部分都會導致訊息丟失情況:
1.生產者生產訊息到RabbitMQ-Server 訊息丟失場景
- 外界環境問題導致:發生網路丟包、網路故障等造成訊息丟失
- 程式碼層面,配置層面,考慮不全導致訊息丟失
傳送端使用Confirm模式,方案不夠嚴謹,比如MQ Server接收訊息失敗傳送 nack給傳送端後,傳送端監聽失敗或者沒做任何事情,訊息丟失的情況;
再比如傳送訊息到exchange後,發下路由和queue沒有繫結,訊息會存在丟失情況,下面會講到具體的例子。
2.RabbitMQ-Server中儲存的訊息丟失
- 訊息沒有持久化導致丟失
- 單節點或者叢集模式沒有映象模式訊息丟失
- 個別磁碟意外損害導致訊息同步失敗
- 機房被炸
3.RabbitMQ-Server到消費者訊息丟失
- 消費者接收到相關訊息之後,還沒來得及處理就宕機了,訊息丟失
如何避免訊息丟失?
下面也是從三個方面介紹:
- 生產者生產訊息到RabbitMQ-Server 可靠性保證
- RabbitMQ-Server中儲存的訊息如何保證
- RabbitMQ-Server到消費者訊息如何不丟
1. 生產者生產訊息到RabbitMQ-Server可靠性保證
這個過程,訊息可能會丟,比如發生網路丟包、網路故障等造成訊息丟失,一般情況下如果不採取措施,生產者無法感知訊息是否已經正確無誤的傳送到exchange中,如果生產者能感知到的話,它可以進行進一步的處理動作,比如重新投遞相關訊息以確保訊息的可靠性。
1.1 別擔心,有一種方案可以解決:就是 AMQP協議提供的一個事務機制
RabbitMQ客戶端中Channel 介面提供了幾個事務機制相關的方法: channel.txSelect channel.txCommit channel.txRollback 原始碼截圖如下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}介面
在生產者傳送訊息之前,通過channel.txSelect開啟一個事務,接著傳送訊息, 如果訊息投遞server失敗,進行事務回滾channel.txRollback,然後重新發送, 如果server收到訊息,就提交事務channel.txCommit但是,很少有人這麼幹,因為這是同步操作,一條訊息傳送之後會使傳送端阻塞,以等待RabbitMQ-Server的迴應,之後才能繼續傳送下一條訊息,生產者生產訊息的吞吐量和效能都會大大降低。
1.2 不過幸運的是RabbitMQ提供了一個改進方案,即傳送方確認機制(publisher confirm)
首先生產者通過呼叫channel.confirmSelect方法將通道設定為confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所有匹配的佇列之後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含訊息的唯一deliveryTag和multiple引數),這就使得生產者知曉訊息已經正確到達了目的地了。
其實Confirm模式有三種方式實現:
- 序列confirm模式:producer每傳送一條訊息後,呼叫waitForConfirms()方法,等待broker端confirm,如果伺服器端返回false或者超時時間內未返回,客戶端進行訊息重傳。
- 批量confirm模式:producer每傳送一批訊息後,呼叫waitForConfirms()方法,等待broker端confirm。
- 非同步confirm模式:提供一個回撥方法,broker confirm了一條或者多條訊息後producer端會回撥這個方法。 我們分別來看看這三種confirm模式
序列confirm
for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); if (channel.waitForConfirms()) { System.out.println("傳送成功"); } else { //傳送失敗這裡可進行訊息重新投遞的邏輯 System.out.println("傳送失敗"); } }
批量confirm模式
for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); } if (channel.waitForConfirms()) { System.out.println("傳送成功"); } else { System.out.println("傳送失敗"); }
上面程式碼是簡單版本的,生產環境絕對不是迴圈傳送的,而是根據業務情況, 各個客戶端程式需要定期(每x秒)或定量(每x條)或者兩者結合來pubish訊息,然後等待伺服器端confirm。相比普通confirm模式,批量可以極大提升confirm效率。
但是有沒有發現什麼問題?
問題1: 批量傳送的邏輯複雜話了。
問題2: 一旦出現confirm返回false或者超時的情況時,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息數量,並且,當訊息經常丟失時,批量confirm效能應該是不升反降的。
非同步confirm模式
Channel channel = channelManager.getPublisherChannel(namespaceName); ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//監聽類 confirmListener.setChannelManager(channelManager); confirmListener.setChannel(channel); confirmListener.setNamespace(namespaceName); confirmListener.addSuccessCallbacks(successCallbacks); channel.addConfirmListener(confirmListener); channel.confirmSelect();//開啟confirm模式 AMQP.BasicProperties messageProperties = null; if (message.getProperty() instanceof AMQP.BasicProperties) { messageProperties = (AMQP.BasicProperties) message.getProperty(); } confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg); for(int i = 0;i<50;i++){ channel.basicPublish( exchange, routingKey, mandatory, immediate, messageProperties, message.getContent() ); }
非同步模式需要自己多寫一部分複雜的程式碼實現,非同步監聽類,監聽server端的通知訊息,非同步的好處效能會大幅度提升,傳送完畢之後,可以繼續傳送其他訊息。 MQServer通知生產端ConfirmListener監聽類:使用者可以繼承介面實現自己的實現類,處理訊息確認機制,此處繼承類程式碼省略,就是上面 ProxiedConfirmListener 類: 下面貼下要實現的介面:
package com.rabbitmq.client; import java.io.IOException; /** * Implement this interface in order to be notified of Confirm events. * Acks represent messages handled successfully; Nacks represent * messages lost by the broker. Note, the lost messages could still * have been delivered to consumers, but the broker cannot guarantee * this. */ public interface ConfirmListener { /** ** handleAck RabbitMQ訊息接收成功的方法,成功後業務可以做的事情 ** 傳送端投遞訊息前,需要把訊息先存起來,比如用KV儲存,接收到ack後刪除 **/ void handleAck(long deliveryTag, boolean multiple) throws IOException; //handleNack RabbitMQ訊息接收失敗的通知方法,使用者可以在這裡重新投遞訊息 void handleNack(long deliveryTag, boolean multiple) throws IOException; }
上面的介面很有意思,如果是你的話,怎麼實現? 訊息投遞前如何儲存訊息,ack 和 nack 如何處理訊息?
下面看下非同步confirm的訊息投遞流程:
解釋下這張圖片:
channerl1 連續發類1,2,3條訊息到RabbitMQ-Server,RabbitMQ-Server通知返回一條通知,裡面包含回傳給生產者的確認訊息中的deliveryTag包含了確認訊息的序號,此外還有一個引數multiple=true,表示到這個序號之前的所有訊息都已經得到了處理。這樣客戶端和服務端通知的次數就減少類,提升類效能。
加點訊息儲存和刪除邏輯
事務機制和publisher confirm機制確保的是訊息能夠正確的傳送至RabbitMQ,這裡的“傳送至RabbitMQ”的含義是指訊息被正確的發往至RabbitMQ的交換器,如果此交換器沒有匹配的佇列的話,那麼訊息也將會丟失,怎麼辦?
這裡有兩個解決方案,
1. 使用mandatory 設定true
2. 利用備份交換機(alternate-exchange):實現沒有路由到佇列的訊息
我們看下RabbitMQ客戶端程式碼方法
Channel 類中 釋出訊息方法
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
解釋下:basicPublish 方法中的,mandatory和immediate
/** * 當mandatory標誌位設定為true時,如果exchange根據自身型別和訊息routeKey無法找到一個符合條件的queue, 那麼會呼叫basic.return方法將訊息返回給生產者<br> * 當mandatory設定為false時,出現上述情形broker會直接將訊息扔掉。 */ @Setter(AccessLevel.PACKAGE) private boolean mandatory = false; /** * 當immediate標誌位設定為true時,如果exchange在將訊息路由到queue(s)時發現對於的queue上沒有消費者, 那麼這條訊息不會放入佇列中。 當immediate標誌位設定為false時,exchange路由的佇列沒有消費者時,該訊息會通過basic.return方法返還給生產者。 * RabbitMQ 3.0版本開始去掉了對於immediate引數的支援,對此RabbitMQ官方解釋是:這個關鍵字違背了生產者和消費者之間解耦的特性,因為生產者不關心訊息是否被消費者消費掉 */ @Setter(AccessLevel.PACKAGE) private boolean immediate;
所以為了保證訊息的可靠性,需要設定傳送訊息程式碼邏輯。如果不單獨形式設定mandatory=false
使用mandatory 設定true的時候有個關鍵點要調整,生產者如何獲取到沒有被正確路由到合適佇列的訊息呢?通過呼叫channel.addReturnListener來新增ReturnListener監聽器實現,只要傳送的訊息,沒有路由到具體的佇列,ReturnListener就會收到監聽訊息。
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP .BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); //進入該方法表示,沒路由到具體的佇列 //監聽到訊息,可以重新投遞或者其它方案來提高訊息的可靠性。 System.out.println("Basic.Return返回的結果是:" + message); } });
此時有人問了,不想複雜化生產者的程式設計邏輯,又不想訊息丟失,那麼怎麼辦? 還好RabbitMQ提供了一個叫做alternate-exchange東西,翻譯下就是備份交換器,這個幹什麼用呢?很簡單,它可以將未被路由的訊息儲存在另一個exchange佇列中,再在需要的時候去處理這些訊息。
那如何實現呢?
簡單一點可以通過webui管理後臺設定,當你新建一個exchange業務的時候,可以給它設定Arguments,這個引數就是 alternate-exchange,其實alternate-exchange就是一個普通的exchange,型別最好是fanout 方便管理
當你傳送訊息到你自己的exchange時候,對應key沒有路由到queue,就會自動轉移到alternate-exchange對應的queue,起碼訊息不會丟失。
下面一張圖看下投遞過程:
那麼有人有個疑問,上面介紹了,兩種方式處理,傳送的訊息無法路由到佇列的方案, 如果備份交換器和mandatory引數一起使用,會有什麼效果?
答案是:mandatory引數無效
總結下上面內容,主要如何保證訊息從生產者到RabbitMQ Server 端可靠性
1. Transaction: 訊息落盤,只能同步開啟、提交及回滾。
2. Confirm:訊息進入緩衝區,支援同步、非同步、批量確認。
3. Transaction和publisher confirm機制兩者是互斥的
4. 一般在生產者這塊避免資料丟失,都是用 Confirm 機制的。
2.RabbitMQ-Server中儲存的訊息如何保證
一般訊息都是存記憶體中的,如果訊息沒有持久化硬碟,一天機器需要重啟,獲取意外停電,重啟機器後,訊息全丟了,所以訊息