RocketMQ系列(七)事務訊息(資料庫|最終一致性)
阿新 • • 發佈:2020-06-17
終於到了今天了,終於要講RocketMQ最牛X的功能了,那就是**事務訊息**。為什麼事務訊息被吹的比較熱呢?近幾年微服務大行其道,整個系統被切成了多個服務,每個服務掌管著一個數據庫。那麼多個數據庫之間的資料一致性就成了問題,雖然有像XA這種強一致性事務的支援,但是這種強一致性在網際網路的應用中並不適合,人們還是更傾向於使用最終一致性的解決方案,在最終一致性的解決方案中,使用MQ保證各個系統之間的資料一致性又是首選。
RocketMQ為我們提供了事務訊息的功能,它使得我們投放訊息和其他的一些操作保持一個整體的原子性。比如:向資料庫中插入資料,再向MQ中投放訊息,把這兩個動作作為一個原子性的操作。貌似其他的MQ是沒有這種功能的。
但是,縱觀全網,講RocketMQ事務訊息的博文中,幾乎沒有結合資料庫的,都是直接投放訊息,然後講解事務訊息的幾個狀態,雖然講的也沒毛病,但是和專案中事務最終一致性的落地方案還相距甚遠。包括我自己在內,在專案中,服務化以後,用MQ保證事務的最終一致性,在網上一搜,根本沒有落地的方案,都是侃侃而談。於是,我寫下這篇博文,結合資料庫,來談一談RocketMQ的事務訊息到底怎麼用。
## 基礎概念
要使用RocketMQ的事務訊息,要實現一個TransactionListener的介面,這個介面中有兩個方法,如下:
```java
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
```
RocketMQ的事務訊息是基於兩階段提交實現的,也就是說訊息有兩個狀態,prepared和commited。當訊息執行完send方法後,進入的prepared狀態,進入prepared狀態以後,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定著這個訊息的命運,
* COMMIT_MESSAGE:提交訊息,這個訊息由prepared狀態進入到commited狀態,消費者可以消費這個訊息;
* ROLLBACK_MESSAGE:回滾,這個訊息將被刪除,消費者不能消費這個訊息;
* UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個訊息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個訊息命運的,是checkLocalTransaction這個方法。
當executeLocalTransaction方法返回UNKNOW以後,RocketMQ會每隔一段時間呼叫一次checkLocalTransaction,這個方法的返回值決定著這個訊息的最終歸宿。那麼checkLocalTransaction這個方法多長時間呼叫一次呢?我們在BrokerConfig類中可以找到,
```java
/**
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
```
這個值是在brokder.conf中配置的,預設值是60*1000,也就是1分鐘。那麼會檢查多少次呢?如果每次都返回UNKNOW,也不能無休止的檢查吧,
```java
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 5;
```
這個是檢查的最大次數,超過這個次數,如果還返回UNKNOW,這個訊息將被刪除。
事務訊息中,TransactionListener這個最核心的概念介紹完後,我們看看程式碼如何寫吧。
## 落地案例
我們在資料庫中有一張表,具體如下:
```sql
CREATE TABLE `s_term` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`term_year` year(4) NOT NULL ,
`type` int(1) NOT NULL DEFAULT '1' ,
PRIMARY KEY (`id`)
)
```
欄位的具體含義大家不用管,一會我們將向這張表中插入一條資料,並且向MQ中投放訊息,這兩個動作是一個原子性的操作,要麼全成功,要麼全失敗。
我們先來看看事務訊息的客戶端的配置,如下:
```java
@Bean(name = "transactionProducer",initMethod = "start",destroyMethod = "shutdown")
public TransactionMQProducer transactionProducer() {
TransactionMQProducer producer = new
TransactionMQProducer("TransactionMQProducer");
producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
producer.setTransactionListener(transactionListener());
return producer;
}
@Bean
public TransactionListener transactionListener() {
return new TransactionListenerImpl();
}
```
我們使用TransactionMQProducer生命生產者的客戶端,並且生產者組的名字叫做TransactionMQProducer,後面NameServer的地址沒有變化。最後就是設定了一個TransactionListener監聽器,這個監聽器的實現我們也定義了一個Bean,返回的是我們自定義的TransactionListenerImpl,我們看看裡邊怎麼寫的吧。
```java
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private TermMapper termMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Integer termId = (Integer)arg;
Term term = termMapper.selectById(termId);
System.out.println("executeLocalTransaction termId="+termId+" term:"+term);
if (term != null) return COMMIT_MESSAGE;
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String termId = msg.getKeys();
Term term = termMapper.selectById(Integer.parseInt(termId));
System.out.println("checkLocalTransaction termId="+termId+" term:"+term);
if (term != null) {
System.out.println("checkLocalTransaction:COMMIT_MESSAGE");
return COMMIT_MESSAGE;
}
System.out.println("checkLocalTransaction:ROLLBACK_MESSAGE");
return ROLLBACK_MESSAGE;
}
}
```
在這個類中,我們要實現executeLocalTransaction和checkLocalTransaction兩個方法,其中executeLocalTransaction是在執行完send方法後立刻執行的,裡邊我們根據term表的id去查詢,如果能夠查詢出結果,就commit,消費端可以消費這個訊息,如果查詢不到,就返回一個UNKNOW,說明過一會會呼叫checkLocalTransaction再次檢查。在checkLocalTransaction方法中,我們同樣用termId去查詢,這次如果再查詢不到就直接回滾了。
好了,事務訊息中最重要的兩個方法都已經實現了,我們再來看看service怎麼寫吧,
```java
@Autowired
private TermMapper termMapper;
@Autowired
@Qualifier("transactionProducer")
private TransactionMQProducer producer;
@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
Term term = new Term();
term.setTermYear(2020);
term.setType(1);
int insert = termMapper.insert(term);
Message message = new Message();
message.setTopic("cluster-topic");
message.setKeys(term.getId()+"");
message.setBody(new String("this is transaction mq "+new Date()).getBytes());
TransactionSendResult sendResult = producer
.sendMessageInTransaction(message, term.getId());
System.out.println("sendResult:"+sendResult.getLocalTransactionState()
+" 時間:"+new Date());
}
```
* 在sendTransactionMQ方法上,我們使用了@Transactional註解,那麼在這個方法中,發生任何的異常,資料庫事務都會回滾;
* 然後,我們建立Term物件,向資料庫中插入Term;
* 構建Mesaage的資訊,將termId作為message的key;
* 使用sendMessageInTransaction傳送訊息,傳入message和termId,**這兩個引數和executeLocalTransaction方法的入參是對應的。**
最後,我們在test方法中,呼叫sendTransactionMQ方法,如下:
```java
@Test
public void sendTransactionMQ() throws InterruptedException {
try {
transactionService.sendTransactionMQ();
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(600000);
}
```
整個生產端的程式碼就是這些了,消費端的程式碼沒有什麼變化,就不給大家貼出來了。接下來,我們把消費端的應用啟動起來,**消費端的應用最好不要包含生產端的程式碼,因為TransactionListener例項化以後,就會進行監聽,而我們在消費者端是不希望看到TransactionListener中的日誌的。**
我們執行一下生產端的程式碼,看看是什麼情況,日誌如下:
```shell
executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term@4a3509b0
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 08:56:49 CST 2020
```
* 我們看到,先執行的是executeLocalTransaction這個方法,termId打印出來了,傳送的結果也出來了,是COMMIT_MESSAGE,那麼消費端是可以消費這個訊息的;
* 注意一下兩個日誌的順序,先執行的executeLocalTransaction,說明在執行sendMessageInTransaction時,就會呼叫監聽器中的executeLocalTransaction,它的返回值決定著這個訊息是否真正的投放到佇列中;
再看看消費端的日誌,
```shell
msgs.size():1
this is transaction mq Wed Jun 17 08:56:49 CST 2020
```
訊息被正常消費,沒有問題。那麼資料庫中有沒有termId=15的資料呢?我們看看吧,
![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101629056-1481171140.png)
資料是有的,插入資料也是成功的。
**這樣使用就真的正確的嗎?我們改一下程式碼看看,在service方法中拋個異常,讓資料庫的事務回滾,看看是什麼效果。**改動程式碼如下:
```java
@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
……
throw new Exception("資料庫事務異常");
}
```
丟擲異常後,資料庫的事務會回滾,那麼MQ呢?我們再發送一個訊息看看,
生產端的日誌如下:
```shell
executeLocalTransaction termId=16 term:com.example.rocketmqdemo.entity.Term@5d6b5d3d
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 09:07:15 CST 2020
java.lang.Exception: 資料庫事務異常
```
* 從日誌中,我們可以看到,訊息是投放成功的,termId=16,事務的返回狀態是COMMIT_MESSAGE;
* 最後丟擲了我們定義的異常,那麼資料庫中應該是不存在這條訊息的啊;
我們先看看資料庫吧,
![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101640895-1105192970.png)
資料庫中並沒有termId=16的資料,**那麼資料庫的事務是回滾了,而訊息是投放成功的,並沒有保持原子性啊**。那麼為什麼在執行executeLocalTransaction方法時,能夠查詢到termId=16的資料呢?**還記得MySQL的事務隔離級別嗎?忘了的趕快複習一下吧。**在事務提交前,我們是可以查詢到termId=16的資料的,所以訊息提交了,看看消費端的情況,
```shell
msgs.size():1
this is transaction mq Wed Jun 17 09:07:15 CST 2020
```
訊息也正常消費了,這明顯不符合我們的要求,我們如果在微服務之間使用這種方式保證資料的最終一致性,肯定會有大麻煩的。那我們該怎麼使用s呢?我們可以在executeLocalTransaction方法中,固定返回UNKNOW,資料插入資料庫成功也好,失敗也罷,我們都返回UNKNOW。那麼這個訊息是否投放到佇列中,就由checkLocalTransaction決定了。checkLocalTransaction肯定在sendTransactionMQ後執行,而且和sendTransactionMQ不在同一事務中。我們改一下程式吧,
```java
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
```
其他的地方不用改,我們再發送一下訊息,
```shell
sendResult:UNKNOW 時間:Wed Jun 17 09:56:59 CST 2020
java.lang.Exception: 資料庫事務異常
checkLocalTransaction termId=18 term:null
checkLocalTransaction:ROLLBACK_MESSAGE
```
* 事務訊息傳送的結果是UNKNOW,然後丟擲異常,事務回滾;
* checkLocalTransaction方法,查詢termId=18的資料,為null,訊息再回滾;
又看了一下消費端,沒有日誌。資料庫中也沒有termId=18的資料,這才符合我們的預期,資料庫插入不成功,訊息投放不成功。我們再把丟擲異常的程式碼註釋掉,看看能不能都成功。
```java
@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
……
//throw new Exception("資料庫事務異常");
}
```
再執行一下發送端程式,日誌如下:
```shell
sendResult:UNKNOW 時間:Wed Jun 17 10:02:57 CST 2020
checkLocalTransaction termId=19 term:com.example.rocketmqdemo.entity.Term@3b643475
checkLocalTransaction:COMMIT_MESSAGE
```
* 傳送結果返回UNKNOW;
* checkLocalTransaction方法查詢termId=19的資料,能夠查到;
* 返回COMMIT_MESSAGE,訊息提交到佇列中;
先看看資料庫中的資料吧,
![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101653384-1952630753.png)
termId=19的資料入庫成功了,再看看消費端的日誌,
```shell
msgs.size():1
this is transaction mq Wed Jun 17 10:02:56 CST 2020
```
消費成功,這才符合我們的預期。資料插入資料庫成功,訊息投放佇列成功,消費訊息成功。
## 總結
事務訊息最重要的就是TransactionListener介面的實現,我們要理解executeLocalTransaction和checkLocalTransaction這兩個方法是幹什麼用的,以及它們的執行時間。再一個就是和資料庫事務的結合,資料庫事務的隔離級別大家要知道。把上面這幾點掌握了,就可以靈活的使用RocketMQ的事務消