1. 程式人生 > >RocketMQ系列(七)事務訊息(資料庫|最終一致性)

RocketMQ系列(七)事務訊息(資料庫|最終一致性)

終於到了今天了,終於要講RocketMQ最牛X的功能了,那就是**事務訊息**。為什麼事務訊息被吹的比較熱呢?近幾年微服務大行其道,整個系統被切成了多個服務,每個服務掌管著一個數據庫。那麼多個數據庫之間的資料一致性就成了問題,雖然有像XA這種強一致性事務的支援,但是這種強一致性在網際網路的應用中並不適合,人們還是更傾向於使用最終一致性的解決方案,在最終一致性的解決方案中,使用MQ保證各個系統之間的資料一致性又是首選。 RocketMQ為我們提供了事務訊息的功能,它使得我們投放訊息和其他的一些操作保持一個整體的原子性。比如:向資料庫中插入資料,再向MQ中投放訊息,把這兩個動作作為一個原子性的操作。貌似其他的MQ是沒有這種功能的。 但是,縱觀全網,講RocketMQ事務訊息的博文中,幾乎沒有結合資料庫的,都是直接投放訊息,然後講解事務訊息的幾個狀態,雖然講的也沒毛病,但是和專案中事務最終一致性的落地方案還相距甚遠。包括我自己在內,在專案中,服務化以後,用MQ保證事務的最終一致性,在網上一搜,根本沒有落地的方案,都是侃侃而談。於是,我寫下這篇博文,結合資料庫,來談一談RocketMQ的事務訊息到底怎麼用。 ## 基礎概念 要使用RocketMQ的事務訊息,要實現一個TransactionListener的介面,這個介面中有兩個方法,如下: ```java /** * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. * * @param msg Half(prepare) message * @param arg Custom business parameter * @return Transaction state */ LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg); ``` RocketMQ的事務訊息是基於兩階段提交實現的,也就是說訊息有兩個狀態,prepared和commited。當訊息執行完send方法後,進入的prepared狀態,進入prepared狀態以後,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定著這個訊息的命運, * COMMIT_MESSAGE:提交訊息,這個訊息由prepared狀態進入到commited狀態,消費者可以消費這個訊息; * ROLLBACK_MESSAGE:回滾,這個訊息將被刪除,消費者不能消費這個訊息; * UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個訊息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個訊息命運的,是checkLocalTransaction這個方法。 當executeLocalTransaction方法返回UNKNOW以後,RocketMQ會每隔一段時間呼叫一次checkLocalTransaction,這個方法的返回值決定著這個訊息的最終歸宿。那麼checkLocalTransaction這個方法多長時間呼叫一次呢?我們在BrokerConfig類中可以找到, ```java /** * Transaction message check interval. */ @ImportantField private long transactionCheckInterval = 60 * 1000; ``` 這個值是在brokder.conf中配置的,預設值是60*1000,也就是1分鐘。那麼會檢查多少次呢?如果每次都返回UNKNOW,也不能無休止的檢查吧, ```java /** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField private int transactionCheckMax = 5; ``` 這個是檢查的最大次數,超過這個次數,如果還返回UNKNOW,這個訊息將被刪除。 事務訊息中,TransactionListener這個最核心的概念介紹完後,我們看看程式碼如何寫吧。 ## 落地案例 我們在資料庫中有一張表,具體如下: ```sql CREATE TABLE `s_term` ( `id` int(11) NOT NULL AUTO_INCREMENT, `term_year` year(4) NOT NULL , `type` int(1) NOT NULL DEFAULT '1' , PRIMARY KEY (`id`) ) ``` 欄位的具體含義大家不用管,一會我們將向這張表中插入一條資料,並且向MQ中投放訊息,這兩個動作是一個原子性的操作,要麼全成功,要麼全失敗。 我們先來看看事務訊息的客戶端的配置,如下: ```java @Bean(name = "transactionProducer",initMethod = "start",destroyMethod = "shutdown") public TransactionMQProducer transactionProducer() { TransactionMQProducer producer = new TransactionMQProducer("TransactionMQProducer"); producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); producer.setTransactionListener(transactionListener()); return producer; } @Bean public TransactionListener transactionListener() { return new TransactionListenerImpl(); } ``` 我們使用TransactionMQProducer生命生產者的客戶端,並且生產者組的名字叫做TransactionMQProducer,後面NameServer的地址沒有變化。最後就是設定了一個TransactionListener監聽器,這個監聽器的實現我們也定義了一個Bean,返回的是我們自定義的TransactionListenerImpl,我們看看裡邊怎麼寫的吧。 ```java public class TransactionListenerImpl implements TransactionListener { @Autowired private TermMapper termMapper; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Integer termId = (Integer)arg; Term term = termMapper.selectById(termId); System.out.println("executeLocalTransaction termId="+termId+" term:"+term); if (term != null) return COMMIT_MESSAGE; return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String termId = msg.getKeys(); Term term = termMapper.selectById(Integer.parseInt(termId)); System.out.println("checkLocalTransaction termId="+termId+" term:"+term); if (term != null) { System.out.println("checkLocalTransaction:COMMIT_MESSAGE"); return COMMIT_MESSAGE; } System.out.println("checkLocalTransaction:ROLLBACK_MESSAGE"); return ROLLBACK_MESSAGE; } } ``` 在這個類中,我們要實現executeLocalTransaction和checkLocalTransaction兩個方法,其中executeLocalTransaction是在執行完send方法後立刻執行的,裡邊我們根據term表的id去查詢,如果能夠查詢出結果,就commit,消費端可以消費這個訊息,如果查詢不到,就返回一個UNKNOW,說明過一會會呼叫checkLocalTransaction再次檢查。在checkLocalTransaction方法中,我們同樣用termId去查詢,這次如果再查詢不到就直接回滾了。 好了,事務訊息中最重要的兩個方法都已經實現了,我們再來看看service怎麼寫吧, ```java @Autowired private TermMapper termMapper; @Autowired @Qualifier("transactionProducer") private TransactionMQProducer producer; @Transactional(rollbackFor = Exception.class) public void sendTransactionMQ() throws Exception { Term term = new Term(); term.setTermYear(2020); term.setType(1); int insert = termMapper.insert(term); Message message = new Message(); message.setTopic("cluster-topic"); message.setKeys(term.getId()+""); message.setBody(new String("this is transaction mq "+new Date()).getBytes()); TransactionSendResult sendResult = producer .sendMessageInTransaction(message, term.getId()); System.out.println("sendResult:"+sendResult.getLocalTransactionState() +" 時間:"+new Date()); } ``` * 在sendTransactionMQ方法上,我們使用了@Transactional註解,那麼在這個方法中,發生任何的異常,資料庫事務都會回滾; * 然後,我們建立Term物件,向資料庫中插入Term; * 構建Mesaage的資訊,將termId作為message的key; * 使用sendMessageInTransaction傳送訊息,傳入message和termId,**這兩個引數和executeLocalTransaction方法的入參是對應的。** 最後,我們在test方法中,呼叫sendTransactionMQ方法,如下: ```java @Test public void sendTransactionMQ() throws InterruptedException { try { transactionService.sendTransactionMQ(); } catch (Exception e) { e.printStackTrace(); } Thread.sleep(600000); } ``` 整個生產端的程式碼就是這些了,消費端的程式碼沒有什麼變化,就不給大家貼出來了。接下來,我們把消費端的應用啟動起來,**消費端的應用最好不要包含生產端的程式碼,因為TransactionListener例項化以後,就會進行監聽,而我們在消費者端是不希望看到TransactionListener中的日誌的。** 我們執行一下生產端的程式碼,看看是什麼情況,日誌如下: ```shell executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term@4a3509b0 sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 08:56:49 CST 2020 ``` * 我們看到,先執行的是executeLocalTransaction這個方法,termId打印出來了,傳送的結果也出來了,是COMMIT_MESSAGE,那麼消費端是可以消費這個訊息的; * 注意一下兩個日誌的順序,先執行的executeLocalTransaction,說明在執行sendMessageInTransaction時,就會呼叫監聽器中的executeLocalTransaction,它的返回值決定著這個訊息是否真正的投放到佇列中; 再看看消費端的日誌, ```shell msgs.size():1 this is transaction mq Wed Jun 17 08:56:49 CST 2020 ``` 訊息被正常消費,沒有問題。那麼資料庫中有沒有termId=15的資料呢?我們看看吧, ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101629056-1481171140.png) 資料是有的,插入資料也是成功的。 **這樣使用就真的正確的嗎?我們改一下程式碼看看,在service方法中拋個異常,讓資料庫的事務回滾,看看是什麼效果。**改動程式碼如下: ```java @Transactional(rollbackFor = Exception.class) public void sendTransactionMQ() throws Exception { …… throw new Exception("資料庫事務異常"); } ``` 丟擲異常後,資料庫的事務會回滾,那麼MQ呢?我們再發送一個訊息看看, 生產端的日誌如下: ```shell executeLocalTransaction termId=16 term:com.example.rocketmqdemo.entity.Term@5d6b5d3d sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 09:07:15 CST 2020 java.lang.Exception: 資料庫事務異常 ``` * 從日誌中,我們可以看到,訊息是投放成功的,termId=16,事務的返回狀態是COMMIT_MESSAGE; * 最後丟擲了我們定義的異常,那麼資料庫中應該是不存在這條訊息的啊; 我們先看看資料庫吧, ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101640895-1105192970.png) 資料庫中並沒有termId=16的資料,**那麼資料庫的事務是回滾了,而訊息是投放成功的,並沒有保持原子性啊**。那麼為什麼在執行executeLocalTransaction方法時,能夠查詢到termId=16的資料呢?**還記得MySQL的事務隔離級別嗎?忘了的趕快複習一下吧。**在事務提交前,我們是可以查詢到termId=16的資料的,所以訊息提交了,看看消費端的情況, ```shell msgs.size():1 this is transaction mq Wed Jun 17 09:07:15 CST 2020 ``` 訊息也正常消費了,這明顯不符合我們的要求,我們如果在微服務之間使用這種方式保證資料的最終一致性,肯定會有大麻煩的。那我們該怎麼使用s呢?我們可以在executeLocalTransaction方法中,固定返回UNKNOW,資料插入資料庫成功也好,失敗也罷,我們都返回UNKNOW。那麼這個訊息是否投放到佇列中,就由checkLocalTransaction決定了。checkLocalTransaction肯定在sendTransactionMQ後執行,而且和sendTransactionMQ不在同一事務中。我們改一下程式吧, ```java @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.UNKNOW; } ``` 其他的地方不用改,我們再發送一下訊息, ```shell sendResult:UNKNOW 時間:Wed Jun 17 09:56:59 CST 2020 java.lang.Exception: 資料庫事務異常 checkLocalTransaction termId=18 term:null checkLocalTransaction:ROLLBACK_MESSAGE ``` * 事務訊息傳送的結果是UNKNOW,然後丟擲異常,事務回滾; * checkLocalTransaction方法,查詢termId=18的資料,為null,訊息再回滾; 又看了一下消費端,沒有日誌。資料庫中也沒有termId=18的資料,這才符合我們的預期,資料庫插入不成功,訊息投放不成功。我們再把丟擲異常的程式碼註釋掉,看看能不能都成功。 ```java @Transactional(rollbackFor = Exception.class) public void sendTransactionMQ() throws Exception { …… //throw new Exception("資料庫事務異常"); } ``` 再執行一下發送端程式,日誌如下: ```shell sendResult:UNKNOW 時間:Wed Jun 17 10:02:57 CST 2020 checkLocalTransaction termId=19 term:com.example.rocketmqdemo.entity.Term@3b643475 checkLocalTransaction:COMMIT_MESSAGE ``` * 傳送結果返回UNKNOW; * checkLocalTransaction方法查詢termId=19的資料,能夠查到; * 返回COMMIT_MESSAGE,訊息提交到佇列中; 先看看資料庫中的資料吧, ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200617101653384-1952630753.png) termId=19的資料入庫成功了,再看看消費端的日誌, ```shell msgs.size():1 this is transaction mq Wed Jun 17 10:02:56 CST 2020 ``` 消費成功,這才符合我們的預期。資料插入資料庫成功,訊息投放佇列成功,消費訊息成功。 ## 總結 事務訊息最重要的就是TransactionListener介面的實現,我們要理解executeLocalTransaction和checkLocalTransaction這兩個方法是幹什麼用的,以及它們的執行時間。再一個就是和資料庫事務的結合,資料庫事務的隔離級別大家要知道。把上面這幾點掌握了,就可以靈活的使用RocketMQ的事務消