RabbitMQ學習(五)——訊息確認機制(AMQP事務)
在前面的文章中,我們對RabbitMQ的訊息分發機制做了探究,知道RabbitMQ訊息的分發機制,包括公平分發和輪詢分發,如果忘記了可以去複寫一下RabbitMQ學習(四)——訊息分發機制。
我們知道可以通過持久化(交換機、佇列和訊息持久化)來保障我們在伺服器崩潰時,重啟伺服器訊息資料不會丟失。但是我們無法確認當訊息的釋出者在將訊息傳送出去之後,訊息到底有沒有正確到達Broker代理伺服器呢?如果不進行特殊配置的話,預設情況下發布操作是不會返回任何資訊給生產者的,也就是預設情況下我們的生產者是不知道訊息有沒有正確到達Broker的。如果在訊息到達Broker之前已經丟失的話,持久化操作也解決不了這個問題,因為訊息根本就沒到達代理伺服器,這個是沒有辦法進行持久化的,那麼當我們遇到這個問題又該如何去解決呢?
這裡就是我們講解到的RabbitMQ中的訊息確認機制,通過訊息確認機制我們可以確保我們的訊息可靠送達到我們的使用者手中,即使訊息丟失掉,我們也可以通過進行重複分發確保使用者可靠收到訊息。
今天我們講解的RabbitMQ訊息確認機制,主要包括兩個方面,因為RabbitMQ為我們提供了兩種方式:
- 通過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;
- 通過將channel設定成confirm模式來實現;
一、AMQP事務
1、使用java原生事務
我們知道事務可以保證訊息的傳遞,使得可靠訊息最終一致性。接下來我們先來探究一下RabbitMQ的事務機制。
RabbitMQ中與事務有關的主要有三個方法:
- txSelect()
- txCommit()
- txRollback()
txSelect主要用於將當前channel設定成transaction模式,txCommit用於提交事務,txRollback用於回滾事務。
當我們使用txSelect提交開始事務之後,我們就可以釋出訊息給Broke代理伺服器,如果txCommit提交成功了,則訊息一定到達了Broke了,如果在txCommit執行之前Broker出現異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過txRollback方法進行回滾事務了。
所以RabbitMQ事務中的主要程式碼為:
channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.txCommit();
先進行事務提交,然後開始傳送訊息,最後提交事務。
還是在原來的demo程式碼基礎下,在sender和receiver包下分別新建TransactionSender1.java和TransactionReceiver1.java。分別如下所示:
TransactionSender1.java
package net.anumbrella.rabbitmq.sender;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TransactionSender1 {
private final static String QUEUE_NAME = "transition";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 建立連線連線到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個頻道
Channel channel = connection.createChannel();
// 指定一個佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 傳送的訊息
String message = "This is a transaction message!";
try {
// 開啟事務
channel.txSelect();
// 往佇列中發出一條訊息,使用rabbitmq預設交換機
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事務
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
// 事務回滾
channel.txRollback();
}
System.out.println(" TransactionSender1 Sent '" + message + "'");
// 關閉頻道和連線
channel.close();
connection.close();
}
}
在上面中我們使用try-catch來捕獲異常,如果傳送失敗,就會進行事務回滾。
try {
// 開啟事務
channel.txSelect();
// 往佇列中發出一條訊息,使用rabbitmq預設交換機
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事務
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
// 事務回滾
channel.txRollback();
}
TransactionReceiver1.java
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TransactionReceiver1 {
private final static String QUEUE_NAME = "transition";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Receiver1 waiting for messages. To exit press CTRL+C");
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
String message = new String(body, "UTF-8");
System.out.println(" TransactionReceiver1 : " + message);
System.out.println(" TransactionReceiver1 Done! at " + time.format(new Date()));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
訊息的接收者跟原來是一樣的,因為事務主要是保證訊息要傳送到Broker當中。
接著我們使用wireshark來監聽網路,這裡也可以使用Fiddler。由於筆者使用的是MAC系統,沒有Fiddler版本。如果讀者要使用Fiddler,同時使用windows可以看看這篇文章,後面有對Fiddler的介紹,JMeter搭配Fiddler的簡單使用(一)。
啟動wireshark,選擇好網路,輸入amqp過濾我們需要的資訊。
然後我們分別啟動TransactionReceiver1.java 和 TransactionSender1.java。
從上面我們可以清晰的看見訊息的分發過程,與我們前面分析的一致。主要執行了四個步驟:
- Client傳送Tx.Select
- Broker傳送Tx.Select-Ok(在它之後,傳送訊息)
- Client傳送Tx.Commit
- Broker傳送Tx.Commit-Ok
接下來我們通過丟擲異常來模擬傳送訊息錯誤,進行事務回滾。更改傳送資訊程式碼為:
try {
// 開啟事務
channel.txSelect();
// 往佇列中發出一條訊息,使用rabbitmq預設交換機
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 除以0,模擬異常,使用rabbitmq預設交換機
int t = 1/0;
// 提交事務
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
// 事務回滾
channel.txRollback();
}
這裡我們通過除以0來模擬丟擲異常,接著按同樣的順序執行程式碼。
可以看見事務進行了回滾,同時我們在接收端也沒有收到訊息。
通過上面我們可以知道事務確實能夠解決訊息的傳送者和Broker之間訊息的確認,只有當訊息成功被服務端Broker接收,並且接受時,事務才能提交成功,不然我們便可以在捕獲異常進行事務回滾操作同時進行訊息重發。
在上面的情況中,我們使用java原生程式碼來模擬事務進行傳送,而在實際開發中,我們可能需要結合框架來完成。
2、結合Spring Boot來使用事務
我們一般在Spring Boot使用RabbitMQ,主要是通過封裝的RabbitTemplate模板來實現訊息的傳送,這裡主要也是分為兩種情況,使用RabbitTemplate同步傳送,或者非同步傳送。
注意:釋出確認和事務。(兩者不可同時使用)在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
所以在使用事務時,在application.properties中,需要將確認模式更改為false。
# 支援釋出確認
spring.rabbitmq.publisher-confirms=false
A、同步
通過設定RabbitTemplate的channelTransacted為true,來設定事務環境,使得可以使用RabbitMQ事務。如下:
template.setChannelTransacted(true);
在demo程式碼裡面,主要是在config包下的RabbitConfig.java裡的rabbitTemplateNew方法裡面配置,如下:
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplateNew() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setChannelTransacted(true);
return template;
}
接著在在sender和receiver包,分別建立TransactionSender2.java和TransactionReceiver2.java。分別如下所示:
TransactionSender2.java
package net.anumbrella.rabbitmq.sender;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
public class TransactionSender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
@Transactional(rollbackFor = Exception.class)
public void send(String msg) {
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sendMsg = msg + time.format(new Date()) + " This is a transaction message! ";
/**
* 這裡可以執行資料庫操作
*
**/
System.out.println("TransactionSender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend("transition", sendMsg);
}
}
在上面程式碼中,我們通過呼叫者提供外部事務@Transactional(rollbackFor = Exception.class),來現實事務方法。一旦方法中丟擲異常,比如執行資料庫操作時,就會被捕獲到,同時事務將進行回滾,並且向外傳送的訊息將不會發送出去。
TransactionReceiver2.java
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class TransactionReceiver2 {
@RabbitListener(queues = "transition")
public void process(Message message, Channel channel) throws IOException {
System.out.println("TransactionReceiver2 : " + new String(message.getBody()));
}
}
新增完訊息的傳送者和接收者後,還需要在controller包下的RabbitTest.java中新增模擬訊息傳送的Restful介面方法,新增如下程式碼:
@Autowired
private TransactionSender2 transactionSender;
/**
* 事務訊息傳送測試
*/
@GetMapping("/transition")
public void transition() {
transactionSender.send("Transition: ");
}
啟動wireshark,選擇好網路,輸入amqp過濾我們需要的資訊。
然後啟動Spring Boot專案,訪問介面http://localhost:8080/rabbit/transition。
在控制檯我們可以得到訊息已經發送和收到,
TransactionSender2 : Transition: 2018-06-18 23:00:16 This is a transaction message!
TransactionReceiver2 : Transition: 2018-06-18 23:00:16 This is a transaction message!
檢視wireshark如下:
可以看到這裡與前面我們講解的原生事務是一致的,而當傳送訊息出現異常時,就會響應執行事務回滾。
B、非同步
剛才我們講解的是同步的情況,現在我們講解一下非同步的形式。在非同步當中,主要使用MessageListener 介面,它是 Spring AMQP 非同步訊息投遞的監聽器介面。而MessageListener的實現類SimpleMessageListenerContainer則是作為了整個非同步訊息投遞的核心類存在。
接下來我們開始介紹使用非同步的方法,同樣表示需要的外部事務,使用者需要在容器配置的時候指定PlatformTransactionManager的實現。程式碼如下:
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setTransactionManager(rabbitTransactionManager());
container.setChannelTransacted(true);
// 開啟手動確認
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(transitionQueue());
container.setMessageListener(new TransitionConsumer());
return container;
}
這段程式碼我們是新增在config下的RabbitConfig.java下,通過配置事務管理器,將channelTransacted屬性被設定為true。
在容器中配置事務時,如果提供了transactionManager,channelTransaction必須為true,使得如果監聽器處理失敗,並且丟擲異常,那麼事務將進行回滾,那麼訊息將返回給訊息代理;如果為false,外部的事務仍然可以提供給監聽容器,造成的影響是在回滾的業務操作中也會提交訊息傳輸的操作。
通過使用RabbitTransactionManager,這個事務管理器是PlatformTransactionManager介面的實現,它只能在一個Rabbit ConnectionFactory中使用。
注意:這種策略不能夠提供XA事務,例如在訊息和資料庫之間共享事務。
除了上面的程式碼外,還有RabbitTransactionManager和TransitionConsumer需要新增,程式碼如下:
/**
* 宣告transition2佇列
*
* @return
*/
@Bean
public Queue transitionQueue() {
return new Queue("transition2");
}
/**
* 事務管理
*
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory());
}
/**
* 自定義消費者
*/
public class TransitionConsumer implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("TransitionConsumer: " + new String(body));
// 確認訊息成功消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 除以0,模擬異常,進行事務回滾
// int t = 1 / 0;
}
}
因為我們在container中設定佇列為“transition2”,所以我們在TransactionSender2中更改傳送的佇列為“transition2”,如下:
this.rabbitTemplate.convertAndSend("transition2", sendMsg);
接著我們啟動wireshark,選擇好網路,輸入amqp過濾我們需要的資訊。
然後啟動Spring Boot專案,訪問介面http://localhost:8080/rabbit/transition。
我們可以在wireshark中看到有事務的提交,如下:
然後我們在TransitionConsumer中把除以0的模擬異常情況開啟,然後再執行上面的操作,可得:
可以看到先進行了事務提交,後面事務又回滾了。意味著訊息沒有接收成功,我們在RabbitMQ管理介面也可以檢視到訊息,如果將consumer關掉,則unacked的msg則會又回到了ready狀態。(注意:這裡我們模擬的是消費者接收事務,前面是訊息生成者傳送到Broker的事務)
關閉訊息者監聽後,訊息又恢復了ready狀態。當重啟應用會重新發過它。
好了,RabbitMQ訊息的事務就到這裡,可能有些細節的地方講的不是很清楚,有些地方筆者也不是也清楚,歡迎一起討論。
還剩RabbitMQ訊息的確認模式沒有講解,這個就留到下一篇。
最後上面演示的demo,還是放在github,歡迎start,rabbitmq-demo。