RabbitMQ系列—Java操作之事務模式和Confirm模式
在之前介紹到了RabbitMQ的訊息持久化和消費者端手動確認,解決了消費者異常導致的資料丟失問題,那麼我們如何確定生產者生產的訊息已經被髮送到rabbitmq伺服器了呢?通俗點說,如果訊息經過交換器進入佇列就可以完成訊息的持久化,但如果訊息在沒有到達broker之前出現意外,那就造成訊息丟失,有沒有辦法可以解決這個問題?有兩種方式:
- 通過AMQP協議,AMQP協議實現了事務機制。
- 通過Confirm模式
AMQP的事務機制
事務的實現主要是對通道(Channel)的設定,主要的方法有三個:
- channel.txSelect():宣告啟動事務模式
- channel.txComment():提交事務
- channel.txRollback():回滾事務
生產者Sender
public class Sender { private static final String QUEUE = "test_tx_queue"; public static void main(String[] args) { Connection con = null; Channel channel = null; try { // 獲取連線 con = ConnectionUtils.getConnection(); // 從連線中建立通道 channel = con.createChannel(); // 宣告一個佇列 channel.queueDeclare(QUEUE, false, false, false, null); // 訊息內容 String msg = "tx queue hello!"; // 開啟事務 channel.txSelect(); // 傳送訊息 channel.basicPublish("", QUEUE, null, msg.getBytes()); // 模擬異常 int num = 1/0; // 提交事務 channel.txCommit(); System.out.println("send success"); } catch (Exception e) { // 事務回滾 try { channel.txRollback(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } finally { // 關閉連線 ConnectionUtils.close(channel, con); } } }
消費者Recver
public class Recver { private static final String QUEUE = "test_tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取連線 Connection con = ConnectionUtils.getConnection(); // 從連線中建立通道 Channel channel = con.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE, false, false, false, null); // 建立消費者 Consumer consumer = new DefaultConsumer(channel) { // 獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("接收到訊息——" + msg); } }; // 監聽佇列 channel.basicConsume(QUEUE, true, consumer); } }
執行消費者和生產者,生產者報錯而且消費者也沒有收到訊息,說明訊息已經被回滾了。
消費者模式使用事務
我們知道,消費者可以使用訊息自動或手動傳送來確認消費訊息,那如果我們在消費者模式中使用事務(當然如果使用了手動確認訊息,完全用不到事務的),會發生什麼呢?
結果分為兩種情況:
- autoAck=false手動應對的時候是支援事務的,也就是說即使你已經手動確認了訊息已經收到了,但在確認訊息會等事務的返回解決之後,在做決定是確認訊息還是重新放回佇列,如果你手動確認現在之後,又回滾了事務,那麼已事務回滾為主,此條訊息會重新放回佇列。
- autoAck=true如果自定確認為true的情況是不支援事務的,也就是說你即使在收到訊息之後在回滾事務也是於事無補的,佇列已經把訊息移除了。
這種事務模式有個缺陷:效能差,降低了rabbitmq的訊息吞吐量,使用了事務模式比非事務模式效能差很多,那麼有沒有既能保證訊息的可靠性又能兼顧效能的解決方案呢?那就是下面的Confirm模式。
Confirm模式
Confirm傳送方確認模式使用和事務類似,也是通過設定Channel進行傳送方確認的。
實現原理
將Channel設定為Confirm模式後,此Channel傳送的每條訊息都會有標識這條訊息的ID(從1開始),當r訊息投放到匹配的佇列後,broker會返回一個確認資訊(包含訊息的唯一ID)給生產者通知生產者已經成功傳送到佇列。如果訊息和佇列是可持久化的,在佇列將訊息寫人到磁碟後再返回給生產者確認資訊。broker回傳給生產者的確認訊息中deliver-tag域包含了確認訊息的序列號,此外broker也可以設定basic.ack的multiple域,表示這個序列號之前的所有訊息都已經得到了處理。
三種程式設計方式
- 序列confirm模式:peoducer每傳送一條訊息後,呼叫waitForConfirms()方法,等待broker端confirm。
- 批量confirm模式:producer每傳送一批訊息後,呼叫waitForConfirms()方法,等待broker端confirm。
- 非同步confirm模式:提供一個回撥方法,broker confirm了一條或者多條訊息後producer端會回撥這個方法。
Confirm模式最大的優點就是它是非同步的。
序列confirm模式
生產者SingleSender
public class SingleSender {
private static final String QUEUE = "test_confirm_queue";
public static void main(String[] args) {
Connection con = null;
Channel channel = null;
try {
// 獲取連線
con = ConnectionUtils.getConnection();
// 從連線中建立通道
channel = con.createChannel();
// 宣告一個佇列
channel.queueDeclare(QUEUE, false, false, false, null);
// 訊息內容
String msg = "confirm queue hello!";
// 將Channel設定為Confirm模式
channel.confirmSelect();
// 傳送訊息
channel.basicPublish("", QUEUE, null, msg.getBytes());
// 訊息確認
if(channel.waitForConfirms()){
System.out.println("send success");
}else{
System.out.println("send fail");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 關閉連線
ConnectionUtils.close(channel, con);
}
}
}
消費者Recver
public class Recver{
private static final String QUEUE = "test_confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection con = ConnectionUtils.getConnection();
// 從連線中建立通道
Channel channel = con.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE, false, false, false, null);
// 建立消費者
Consumer consumer = new DefaultConsumer(channel) {
// 獲取訊息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到訊息——" + msg);
}
};
// 監聽佇列
channel.basicConsume(QUEUE, true, consumer);
}
}
普通模式需要一條一條確認,效能慢,可以選擇批量模式。
批量confirm模式
生產者BatchSender
public class BatchSender {
private static final String QUEUE = "test_confirm_queue";
public static void main(String[] args) {
Connection con = null;
Channel channel = null;
try {
// 獲取連線
con = ConnectionUtils.getConnection();
// 從連線中建立通道
channel = con.createChannel();
// 宣告一個佇列
channel.queueDeclare(QUEUE, false, false, false, null);
// 將Channel設定為Confirm模式
channel.confirmSelect();
for (int i = 0; i < 20; i++) {
// 訊息內容
String msg = "confirm queue hello!";
// 傳送訊息
channel.basicPublish("", QUEUE, null, msg.getBytes());
}
// 訊息確認
if (channel.waitForConfirms()) {
System.out.println("send success");
} else {
System.out.println("send fail");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 關閉連線
ConnectionUtils.close(channel, con);
}
}
}
通過迴圈批量傳送20條訊息,但只在控制檯輸出了一行“發send success”,該方法會等到最後一條訊息得到ack或者得到nack才會結束,也就是說在waitForConfirms處會造成當前程式的阻塞,這點我們看出broker端預設情況下是進行批量回復的,並不是針對每條訊息都發送一條ack訊息。
缺陷
一批資料中有一條訊息傳送失敗會都回滾。
非同步模式
普通模式和批量模式都是序列的、同步執行的,如果訊息傳送出去沒有返回確認訊息會一直等待,而非同步模式執行效率高,不需要等待訊息執行完,只需要監聽訊息即可。
生產者AsyncSender
public class AsyncSender {
private static final String QUEUE = "test_confirm_queue";
public static void main(String[] args) {
Connection con = null;
Channel channel = null;
try {
// 獲取連線
con = ConnectionUtils.getConnection();
// 從連線中建立通道
channel = con.createChannel();
// 宣告一個佇列
channel.queueDeclare(QUEUE, false, false, false, null);
// 將Channel設定為Confirm模式
channel.confirmSelect();
// 非同步監聽確認和未確認的訊息
channel.addConfirmListener(new ConfirmListener() {
/**
* 處理返回確認成功
*
* @param deliveryTag
* 如果是多條,這個就是最後一條訊息的tag
* @param multiple
* 是否多條 true是false否
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}
/**
* 處理返回確認失敗
*
* @param deliveryTag
* @param multiple
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack:deliveryTag:" + deliveryTag + ",multiple:" + multiple);
}
});
for (int i = 0; i < 50; i++) {
// 訊息內容
String msg = "confirm queue hello!" + i;
// long tag = channel.getNextPublishSeqNo();
// 傳送訊息
channel.basicPublish("", QUEUE, null, msg.getBytes());
// System.out.println("訊息tag" + tag);
}
System.out.println("執行結束");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉連線
ConnectionUtils.close(channel, con);
}
}
}
執行後控制檯列印
ack:deliveryTag:2,multiple:true
ack:deliveryTag:13,multiple:true
ack:deliveryTag:14,multiple:false
ack:deliveryTag:15,multiple:false
ack:deliveryTag:16,multiple:false
ack:deliveryTag:18,multiple:true
ack:deliveryTag:20,multiple:true
ack:deliveryTag:22,multiple:true
ack:deliveryTag:23,multiple:false
ack:deliveryTag:24,multiple:false
ack:deliveryTag:25,multiple:false
ack:deliveryTag:26,multiple:false
ack:deliveryTag:27,multiple:false
ack:deliveryTag:28,multiple:false
執行結束
ack:deliveryTag:29,multiple:false
ack:deliveryTag:34,multiple:true
可以看到,傳送50條訊息,收到的ack個數不一樣多次執行程式會發現每次傳送回來的ack訊息中的deliveryTag域的值並不是一樣的,說明broker端批量回傳給傳送者的ack訊息並不是以固定的批量大小回傳的。
效能比較
事務模式效能是最差的,普通confirm模式效能比事務模式稍微好點,但是和批量confirm模式還有非同步confirm模式相比,還是小巫見大巫。批量confirm模式的問題在於confirm之後返回false之後進行重發這樣會使效能降低,非同步confirm模式(async)程式設計模型較為複雜,至於採用哪種方式具體看實際情況。
注意:AMQP的事務模式和Confirm模式不能一起使用。