1. 程式人生 > >RabbitMQ 之訊息確認機制(事務+Confirm)

RabbitMQ 之訊息確認機制(事務+Confirm)

概述

在 Rabbitmq 中我們可以通過持久化來解決因為伺服器異常而導致丟失的問題,
除此之外我們還會遇到一個問題:生產者將訊息傳送出去之後,訊息到底有沒有正確到達 Rabbit 伺服器呢?如果不錯得數處理,我們是不知道的,(即 Rabbit 伺服器不會反饋任何訊息給生產者),也就是預設的情況下是不知道訊息有沒有正確到達;

導致的問題:訊息到達伺服器之前丟失,那麼持久化也不能解決此問題,因為訊息根本就沒有到達 Rabbit 伺服器!

RabbitMQ 為我們提供了兩種方式 :

1. 通過 AMQP 事務機制實現,這也是 AMQP 協議層面提供的解決方案;

2. 通過將 channel 設定成 confirm 模式來實

事務機制

RabbitMQ 中與事務機制有關的方法有三個:txSelect(), txCommit()以及 txRollback(), txSelect 用於將當前 channel 設定成 transaction 模式,txCommit 用於提交事務,txRollback 用於回滾事務,在通過 txSelect 開啟事務之後,我們便可以釋出訊息給 broker 代理伺服器了,如果 txCommit 提交成功了,則訊息一定到達了 broker 了,如果在 txCommit執行之前 broker 異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過 txRollback 回滾事務。

關鍵程式碼

 1 channel.txSelect(); 2 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 3 channel.txCommit(); 

生產者

 1 public class SendMQ {
 2 private static final String QUEUE_NAME = "QUEUE_simple";
 3 @Test
 4 public void sendMsg() throws IOException, TimeoutException {
 5
/* 獲取一個連線 */ 6 Connection connection = ConnectionUtils.getConnection(); 7 /* 從連線中建立通道 */ 8 Channel channel = connection.createChannel(); 9 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 10 String msg = "Hello Simple QUEUE !"; 11 try { 12 channel.txSelect(); 13 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 14 int result = 1 / 0; 15 channel.txCommit(); 16 } catch (Exception e) { 17 channel.txRollback(); 18 System.out.println("----msg rollabck "); 19 }finally{ 20 System.out.println("---------send msg over:" + msg); 21 } 22 channel.close(); 23 connection.close(); 24 } 25 }

消費者

 1 public class Consumer {
 2 private static final String QUEUE_NAME = "QUEUE_simple";
 3 public static void main(String[] args) throws Exception {
 4 Connection connection = ConnectionUtils.getConnection();
 5 Channel channel = connection.createChannel();
 6 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 7 DefaultConsumer consumer = new DefaultConsumer(channel) {
 8 //獲取到達的訊息
 9 @Override
10 public void handleDelivery(String consumerTag, Envelope envelope, 
11 BasicProperties properties, byte[] body) throws IOException {
12 String message = new String(body, "UTF-8");
13 System.out.println(" [x] Received '" + message + "'");
14 }
15 };
16 //監聽佇列
17 channel.basicConsume(QUEUE_NAME, true, consumer);
18 }
19 }

此種模式還是很耗時的,採用這種方式 降低了 Rabbitmq 的訊息吞量

Confirm 模式

上面我們介紹了 RabbitMQ 可能會遇到的一個問題,即生成者不知道訊息是否真正到達 broker,隨後通過 AMQP 協議層面為我們提供了事務機制解決了這個問題,但是採用事務機制實現會降低RabbitMQ 的訊息吞吐量,那麼有沒有更加高效的解決方式呢?答案是採用 Confirm 模式。

producer 端 confirm 模式的實現原理

  生產者將通道設定成 confirm 模式,一旦通道進入 confirm 模式,所有在該通道上面釋出的訊息都會被指派一個唯一的 ID(從 1 開始),一旦訊息被投遞到所有匹配的佇列之後,broker 就會發送一個確認給生產者(包含訊息的唯一ID),這就使得生產者知道訊息已經正確到達目的隊列了,如果訊息和佇列是可持久化的,那麼確認訊息會將訊息寫入磁碟之後發出,broker 回傳給生產者的確認訊息中 deliver-tag 域包含了確認訊息的序列號,此外 broker 也可以設定 basic.ack 的 multiple 域,表示到這個序列號之前的所有訊息都已經得到了處理。
  confirm 模式最大的好處在於他是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用便可以通過回撥方法來處理該確認訊息,如果RabbitMQ 因為自身內部錯誤導致訊息丟失,就會發送一條 nack 訊息,生產者應用程式同樣可以在回撥方法中處理該 nack 訊息

開啟 confirm 模式的方法

已經在 transaction 事務模式的 channel 是不能再設定成 confirm 模式的,即這兩種模式是不能共存的。

生產者通過呼叫 channel 的 confirmSelect 方法將 channel 設定為 confirm 模式

核心程式碼:

//生產者通過呼叫channel的confirmSelect方法將channel設定為confirm模式channel.confirmSelect();

 

程式設計模式 

1. 普通 confirm 模式:每傳送一條訊息後,呼叫 waitForConfirms()方法,等待伺服器端confirm。實際上是一種序列 confirm 了。

2. 批量 confirm 模式:每傳送一批訊息後,呼叫 waitForConfirms()方法,等待伺服器端confirm。

3. 非同步 confirm 模式:提供一個回撥方法,服務端 confirm 了一條或者多條訊息後 Client 端會回撥這個方法;

 

通 confirm 模式

 1 public class SendConfirm {
 2 private static final String QUEUE_NAME = "QUEUE_simple_confirm";
 3 @Test
 4 public void sendMsg() throws IOException, TimeoutException, 
 5 InterruptedException {
 6 /* 獲取一個連線 */
 7 Connection connection = ConnectionUtils.getConnection();
 8 /* 從連線中建立通道 */
 9 Channel channel = connection.createChannel();
10 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11 //生產者通過呼叫channel的confirmSelect方法將channel設定為confirm模式
12 channel.confirmSelect();
13 String msg = "Hello QUEUE !";
14 channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
15 if(!channel.waitForConfirms()){
16  System.out.println("send message failed.");
17 }else{
18 System.out.println(" send messgae ok ...");
19 }
20 channel.close();
21 connection.close();
22 }
23 }

批量 confirm模式

批量 confirm 模式稍微複雜一點,客戶端程式需要定期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish 訊息,然後等待伺服器端 confirm, 相比普通 confirm 模式,批量極大提升 confirm 效率,但是問題在於一旦出現 confirm 返回 false 或者超時的情況時,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息數量,並且,當訊息經常丟失時,批量 confirm 效能應該是不升反降

 1 public class SendbatchConfirm {
 2 private static final String QUEUE_NAME = "QUEUE_simple_confirm";
 3 @Test
 4 public void sendMsg() throws IOException, TimeoutException, 
 5 InterruptedException {
 6 /* 獲取一個連線 */
 7 Connection connection = ConnectionUtils.getConnection();
 8 /* 從連線中建立通道 */
 9 Channel channel = connection.createChannel();
10 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11 //生產者通過呼叫channel的confirmSelect方法將channel設定為confirm模式
12 channel.confirmSelect();
13 String msg = "Hello QUEUE !";
14 for (int i = 0; i < 10; i++) {
15 channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
16 }
17 if(!channel.waitForConfirms()){
18  System.out.println("send message failed.");
19 }else{
20 System.out.println(" send messgae ok ...");
21 }
22 channel.close();
23 connection.close();
24 }
25 }

非同步 confirm模式

Channel 物件提供的 ConfirmListener()回撥方法只包含 deliveryTag(當前 Chanel 發出的訊息序號),我們需要自己為每一個 Channel 維護一個 unconfirm 的訊息序號集合,每 publish 一條資料,集合中元素加 1,每回調一次 handleAck
方法,unconfirm 集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程式執行效率上看,這個unconfirm 集合最好採用有序集合 SortedSet 儲存結構。實際上,SDK 中的 waitForConfirms()方法也是通過 SortedSet
維護訊息序號

 1 public class SendAync {
 2 private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
 3 public static void main(String[] args) throws IOException, TimeoutException {
 4 /* 獲取一個連線 */
 5 Connection connection = ConnectionUtils.getConnection();
 6 /* 從連線中建立通道 */
 7 Channel channel = connection.createChannel();
 8 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 9 //生產者通過呼叫channel的confirmSelect方法將channel設定為confirm模式
10 channel.confirmSelect();
11 final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new
12 TreeSet<Long>());
13 channel.addConfirmListener(new ConfirmListener() {
14 //每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)
15 或多條(multiple=true)記錄。
16 @Override
17 public void handleAck(long deliveryTag, boolean multiple) throws
18 IOException {
19 if (multiple) {
20 System.out.println("--multiple--");
21 confirmSet.headSet(deliveryTag + 1).clear();//用一個
22 SortedSet, 返回此有序集合中小於end的所有元素。
23 } else {
24 System.out.println("--multiple false--");
25 confirmSet.remove(deliveryTag);
26 }
27 }
28 @Override
29 public void handleNack(long deliveryTag, boolean multiple) throws
30 IOException {
31 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: 
32 " + multiple);
33 if (multiple) {
34 confirmSet.headSet(deliveryTag + 1).clear();
35 } else {
36 confirmSet.remove(deliveryTag);
37 }
38 }
39 });
40 String msg = "Hello QUEUE !";
41 while (true) {
42 long nextSeqNo = channel.getNextPublishSeqNo();
43 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
44 confirmSet.add(nextSeqNo);
45 }
46 }
47 }