RabbitMQ之訊息確認機制AMQP事務
阿新 • • 發佈:2019-01-04
概述
我們在RabbitMQ中可以通過持久化來解決伺服器掛掉而丟失資料問題,但是大家有沒有想過,我的訊息到達了RabbitMQ伺服器了嗎??? 我們是不知道的,導致的問題就是 如果訊息在到達伺服器之前就丟失了,持久化也是不能解決問題的!
那怎麼辦?
我們有兩種方式:
1.通過AMQP協議的事務機制來實現訊息的確認
2.confirm模式;
事務機制
RabbitMQ中提供了3個方法:txSelect(), txCommit()以及txRollback(),其類似於jdbc中的事務開啟 提交 回滾;
txSelect用於將當前channel設定成transaction模式,
txCommit用於提交事務,
txRollback用於回滾事務,
在通過txSelect開啟事務之後,我們便可以釋出訊息給RabbitMQ伺服器了,如果txCommit提交成功了,則訊息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。
程式碼:
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
下面來舉個例子,因為訊息確認機制是對於生產者 ,我們這裡只討論生產者的程式碼
生產者
public class SendMQ {
private static final String QUEUE_NAME = "QUEUE_simple";
@Test
public void sendMsg() throws IOException, TimeoutException {
/* 獲取一個連線 */
Connection connection = ConnectionUtils.getConnection();
/* 從連線中建立通道 */
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false , false, false, null);
String msg = "Hello Simple QUEUE !";
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("----msg rollabck ");
}finally{
System.out.println("---------send msg over:" + msg);
}
channel.close();
connection.close();
}
}
消費者
public class Consumer {
private static final String QUEUE_NAME = "QUEUE_simple";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
//獲取到達的訊息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//監聽佇列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
通過測試:此種模式還是很耗時的,因為內部走了多次通訊,所以採用這種方式 降低了Rabbitmq的訊息吞吐量