1. 程式人生 > 其它 >RocketMQ事務訊息解析

RocketMQ事務訊息解析

技術標籤:Javajava

RocketMQ事務訊息解析

之前在網上看到了一篇關於RocketMQ的事務訊息的文章,感覺講的並不好,甚至有錯誤的地方,所以就想自己來寫一篇文章,講一講我對與RocketMQ的事務訊息的理解,不一定會正確,各位在看的時候,可以結合自己的思考,看看是否有一定的參考性,這裡會單刀直入的講解重點,而不會講一些鋪墊的東西,例如什麼是RocketMQ,什麼是事務訊息,什麼是事務等等,如果你遇到了一些名詞,自己不是很理解,需要自己去其它地方學習下。

應用的場景

電商支付場景中,向DB中寫入了訂單資訊,並且傳送了1個MQ訊息,通知其它系統處理積分、優惠券等服務,DB中的事物還是比較容易的,只要不是分庫的,就好操作,相關的內容網上也很多,就不過多說了,現在假設資料庫的事物是T1,傳送MQ的事務訊息是T2,那麼虛擬碼就是如下:

執行T1(寫DB)
if 成功
	執行T2(發MQ)

如果你這麼寫,就會有很大的問題,因為T2有可能會失敗,這樣T1沒有回滾,就出現了問題;也有可能是T2超時了,導致T2實際到底有沒有成功,不知道,這個時候也會有問題。這裡是1個基本的場景,所以理論上來說,是不能這麼寫程式碼的,那麼要怎麼寫呢? RocketMQ給了一個事務訊息的選項。

事務訊息

Rocket MQ的事務訊息,可以保證MQ如果傳送成功,DB事務也一定成功,DB失敗了,MQ也一定不會成功,這樣就解決了上述的問題,怎麼做到的呢?接下來,我主要的目的,就是給大家講清楚,怎麼實現DB成功,MQ成功,DB失敗,MQ也一定失敗的。

首先要實現1個TransactionListener的介面,這個介面有2個方法

/**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
LocalTransactionState executeLocalTransaction
(final Message msg, final Object arg); /** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg);

這裡要重點理解2個方法的註釋,不要覺的是英文的註釋,就不想看,從這個註釋上看,可以理解,Rocket MQ在假設一種未知的狀態,什麼狀態呢? 就是收不到prepare(half) message的迴應,這裡引申出來了1個概念,叫做預處理訊息,或者叫準備狀態的訊息,RocketMQ的事務訊息是基於兩階段提交實現的,也就是說訊息有兩個狀態,prepared和commited。當訊息執行完send方法後,進入的prepared狀態。這裡我們要講一下LocalTransactionState這個類,這個類有3種狀態:

  • COMMIT_MESSAGE:提交訊息,這個訊息由prepared狀態進入到commited狀態,消費者可以消費這個訊息
  • ROLLBACK_MESSAGE:回滾
  • UNKNOW:未知狀態

這裡的一個場景是,未知狀態是什麼意思? 先放下,然後看看我們該怎麼用這2個方法。這裡我主要寫虛擬碼。

這裡我們降低難度,假設DB的事務就是1個insert,向訂單表插入了1條記錄

public class TransactionListenerImpl implements TransactionListener {
	
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        try {
            // 開啟DB事務
        	startDBTransaction();
        	//執行事務內容
        	long id = insert();
        	//提交DB事務
        	commit();
        	if id > 0 return COMMIT_MESSAGE;
            else return ROLLBACK_MESSAGE;
        } catch() {
            // 回滾DB事務
            rollback();
            return ROLLBACK_MESSAGE;
        }  
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        
        //執行SQL,檢查剛才的事務,是否成功了,一般msg裡,有剛才DB事務的ID,這裡是訂單ID
        //根據訂單ID去查訂單存在不,存在就說明DB事務成功了,不存在就說明DB事務失敗了
        long id = msg.getId();
        int count = getFromDbByOrderId(id);
        if(count > 0) {
            //說明訂單插入成功了
            return COMMIT_MESSAGE;
        } else {
            return ROLLBACK_MESSAGE;
        }
    }
}

這麼做,是怎麼實現事務訊息的呢?RocketMQ的內部應該是這麼做的(我沒看原始碼,不確定具體的程式碼)

boolean succ = sendMq() //準備狀態,消費者看不見該條訊息
if(succ) {
  boolean succ1 = executeLocalTransaction()
  // A情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
  if(succ1) {
     commitMQ() //把MQ的狀態,從準備改為可消費,消費者可以消費該條訊息
     // B情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
  } else {
     rollbackMQ() //把MQ的狀態,從準備改為取消,消費者不消費這條MQ訊息
     // C情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
  }
} else {
  	return 失敗
}

//如果在執行期間掛了,恢復之後還是會checkLocalTransaction,看看DB事務是否成功
//RocketMQ有個後臺執行緒,一直在輪詢prepare message,這東西肯定在1個佇列裡
//重試5次,是我隨意寫的,具體看原始碼吧,這個不重要,重要的是思想。
MessageExt prepareMsg = getPrepareMessageFromQueue();
while(prepareMsg && 重試次數 < 5) {
    LocalTransactionState state = checkLocalTransaction(prepareMsg)
    if(state == COMMIT_MESSAGE) {
        //提交MQ事務
        //從佇列裡移除這條訊息
        //下一條
        prepareMsg = getPrepareMessageFromQueue();
    } else if(state == ROLLBACK_MESSAGE) {
        //回滾MQ事務
        //從佇列裡移除這條訊息
        //下一條
        prepareMsg = getPrepareMessageFromQueue();
    } else {
        //這裡是UNKNOW狀態,這裡應該什麼都不做,直接獲取下一條prepare message
        //DB裡的事務不應該是UNKNOW狀態
        //除非DB超時了,沒有告訴MQ資料庫事務到底是成功還是失敗
        prepareMsg = getPrepareMessageFromQueue();
        //重試次數+1
    } 
}

核心思想:

  • 先發MQ,再執行DB的事務,再根據DB事務的狀態,決定MQ訊息是否要給消費者消費,如果DB成功了,MQ通過重試機制,保證prepare(half) message可以變更為commited狀態。
  • DB的事務,必須在 executeLocalTransaction() 這個方法裡寫。
  • MQ自己內部保證 commitMQ() 或者 rollbackMQ() 一定能成功。
  • 如果MQ掛了,再啟動的時候,可以再去DB裡查資料庫事務是否成功,保證訊息的最終一致。

這裡可以看到,RocketMQ通過先發MQ訊息,再執行DB事務,保證了在傳送MQ這個環境一旦出現錯誤,可以通過再次回溯DB,檢視DB的事務狀態,來判斷是提交MQ的事務訊息,還是回滾MQ的事務訊息,這裡其實也有問題,就是一旦MQ執行commitMQ失敗了,去回溯DB的時候,恰好DB也掛了,而且MQ重試了幾次,DB都沒有恢復,這個時候該怎麼辦呢?大家可以思考下。

看到這裡,我們再回過頭來看,未知狀態是什麼?其實就是各種異常情況導致的未知狀態,就是上述程式碼中的A情況、B情況、C情況,可能還會有網路超時、MQ自己掛了、MQ訊息超時等異常情況,這裡RocketMQ是通過回溯DB檢視DB的事務狀態來決定MQ的事務狀態的,本質上屬於2PC,並不是真正的分散式事務。這裡分散式事務的難點在於通訊通道的不可靠,具體可以看看2軍問題,2軍問題和拜占庭將軍問題是完全不同的2個問題,大家一定要分清楚。

全文完。

關注我的部落格,獲取更多Java程式設計知識: https://www.epoooll.com/