1. 程式人生 > >分散式訊息通訊-RabbitMQ可靠性投遞與生產實踐

分散式訊息通訊-RabbitMQ可靠性投遞與生產實踐

本章重點:

可靠性投遞
1.確保訊息傳送到RabbitMQ伺服器
2.確保訊息被正確的路由
3.確保訊息在佇列正確地儲存
4.確保訊息從佇列正確地投遞到消費者
5.消費者回調
6.補償機制
7.訊息冪等性
8.訊息的順序性

可靠性投遞

首先需要明確,效率和可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對訊息的收發效率造成影響,如過是一些業務實時性要求不是特別高的場合,可以犧牲可靠性來換取效率。

  1. ①代表訊息從生產者傳送到Exchange
  2. ②代表訊息從Exchange路由到Queue
  3. ③ 代表訊息在Queue中儲存;
  4. ④ 代表消費者訂閱Queue並消費訊息。 

1.確保訊息傳送到RabbitMQ伺服器

可能因為網路或者Broker的問題導致①失敗,而生產者是無法得知訊息是否正確傳送到Broker的。

有兩種解決方案:

第一種是Transaction事務模式

第二種是Confirm確認模式

1.在通過channel.txSelect方法開啟事務之後,我們便可以釋出訊息給RabbitMQ了,如果事務提交成功,則訊息一定 到達了RabbitMQ中,如果在事務提交執行之前由於RabbitMQ異常崩潰或者其他原因丟擲異常,這個時候我們便可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。使用事務機制的話會“吸乾”RabbitMQ的性 能,一般不建議使用。

2.生產者通過呼叫channel.confirmSelect方法(即Confirm.Select命令)將通道設定為confirm模式。一旦訊息被投遞到所有匹配的佇列之後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含訊息的唯一ID),這就使得生產者知曉訊息已經正確到達了目的地了。

2.確保訊息被正確的路由

可能因為路由關鍵字錯誤,或者佇列不存在,或者佇列名稱錯誤導致②失敗。

  1. 使用mandatory引數和ReturnListener,可以實現訊息無法路由的時候返回給生產者。
  2. 另一種方式就是使用備份交換機(alternate-exchange),無法路由的訊息會發送到這個交換機上。
Map<String,Object> arguments = new HashMap<String,Object>();
// 指定交換機的備份交換機
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); 
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
 

3.確保訊息在佇列正確地儲存

可能因為系統宕機、重啟、關閉等等情況導致儲存在佇列的訊息丟失,即③出現問題。

解決方案:

1.佇列持久化

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments 
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2.交換機持久化

// String exchange, boolean durable 
channel.exchangeDeclare("MY_EXCHANGE","true");

3.訊息持久化

AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder() 
// 2代表持久化,其他代表瞬態              
.deliveryMode(2)    
.build();         
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

4.確保訊息從佇列正確地投遞到消費者

如果消費者收到訊息後未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。
為了保證訊息從佇列可靠性到達消費者,RabbitMQ提供了訊息確認機制(message acknowledgement),消費者在訂閱佇列時,可以指定autoAck引數,當autoAck等於false時,RabbitMQ會等待消費者顯示地回覆確認訊息才從佇列中刪除該訊息。

如果訊息消費失敗,也可以呼叫Basic.Reject或者BasicNack來拒絕當前訊息而不是確認,如果requere引數為true,可以把這條訊息重新存入佇列,以便傳送給下一個消費者。

5.消費者回調

消費者處理訊息之後,可以再發送一條訊息給生產者,或者呼叫生產者地API,告知訊息處理完畢。

6.補償機制

對於一定時間沒有響應地訊息,可以設定一個定時重發地機制,但是要控制次數,比如最多重複三次,否則會造成訊息堆積。

7.訊息冪等性

服務端是沒有這種控制的,只能在消費端控制。

如何避免訊息的重複消費?

訊息重複消費可能會有兩個原因:

  1. 生產者的問題。環節①重複傳送訊息,比如在開啟Confirm模式但未收到確認
  2. 環節④出了問題,由於消費者未傳送ACK或者其它原因,訊息重複投遞

對於重複傳送的訊息,可以對每一條訊息生成一個唯一的業務id,通過日誌或者建表來做重複控制。

8.訊息的順序性

訊息的順序性是指消費者消費訊息的順序跟生產者投遞訊息的順序是一致的。

在RabbitMQ中,一個佇列有多個消費者時,由於不同的消費者消費訊息的速度是不一樣的