RocketMQ事務訊息解析
RocketMQ事務訊息解析
之前在網上看到了一篇關於RocketMQ的事務訊息的文章,感覺講的並不好,甚至有錯誤的地方,所以就想自己來寫一篇文章,講一講我對與RocketMQ的事務訊息的理解,不一定會正確,各位在看的時候,可以結合自己的思考,看看是否有一定的參考性,這裡會單刀直入的講解重點,而不會講一些鋪墊的東西,例如什麼是RocketMQ,什麼是事務訊息,什麼是事務等等,如果你遇到了一些名詞,自己不是很理解,需要自己去其它地方學習下。
應用的場景
電商支付場景中,向DB中寫入了訂單資訊,並且傳送了1個MQ訊息,通知其它系統處理積分、優惠券等服務,DB中的事物還是比較容易的,只要不是分庫的,就好操作,相關的內容網上也很多,就不過多說了,現在假設資料庫的事物是T1,傳送MQ的事務訊息是T2,那麼虛擬碼就是如下:
執行T1(寫DB)
if 成功
執行T2(發MQ)
如果你這麼寫,就會有很大的問題,因為T2有可能會失敗,這樣T1沒有回滾,就出現了問題;也有可能是T2超時了,導致T2實際到底有沒有成功,不知道,這個時候也會有問題。這裡是1個基本的場景,所以理論上來說,是不能這麼寫程式碼的,那麼要怎麼寫呢? RocketMQ給了一個事務訊息的選項。
事務訊息
Rocket MQ的事務訊息,可以保證MQ如果傳送成功,DB事務也一定成功,DB失敗了,MQ也一定不會成功,這樣就解決了上述的問題,怎麼做到的呢?接下來,我主要的目的,就是給大家講清楚,怎麼實現DB成功,MQ成功,DB失敗,MQ也一定失敗的。
首先要實現1個TransactionListener的介面,這個介面有2個方法
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction (final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
這裡要重點理解2個方法的註釋,不要覺的是英文的註釋,就不想看,從這個註釋上看,可以理解,Rocket MQ在假設一種未知的狀態,什麼狀態呢? 就是收不到prepare(half) message的迴應,這裡引申出來了1個概念,叫做預處理訊息,或者叫準備狀態的訊息,RocketMQ的事務訊息是基於兩階段提交實現的,也就是說訊息有兩個狀態,prepared和commited。當訊息執行完send方法後,進入的prepared狀態。這裡我們要講一下LocalTransactionState這個類,這個類有3種狀態:
- COMMIT_MESSAGE:提交訊息,這個訊息由prepared狀態進入到commited狀態,消費者可以消費這個訊息
- ROLLBACK_MESSAGE:回滾
- UNKNOW:未知狀態
這裡的一個場景是,未知狀態是什麼意思? 先放下,然後看看我們該怎麼用這2個方法。這裡我主要寫虛擬碼。
這裡我們降低難度,假設DB的事務就是1個insert,向訂單表插入了1條記錄
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 開啟DB事務
startDBTransaction();
//執行事務內容
long id = insert();
//提交DB事務
commit();
if id > 0 return COMMIT_MESSAGE;
else return ROLLBACK_MESSAGE;
} catch() {
// 回滾DB事務
rollback();
return ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//執行SQL,檢查剛才的事務,是否成功了,一般msg裡,有剛才DB事務的ID,這裡是訂單ID
//根據訂單ID去查訂單存在不,存在就說明DB事務成功了,不存在就說明DB事務失敗了
long id = msg.getId();
int count = getFromDbByOrderId(id);
if(count > 0) {
//說明訂單插入成功了
return COMMIT_MESSAGE;
} else {
return ROLLBACK_MESSAGE;
}
}
}
這麼做,是怎麼實現事務訊息的呢?RocketMQ的內部應該是這麼做的(我沒看原始碼,不確定具體的程式碼)
boolean succ = sendMq() //準備狀態,消費者看不見該條訊息
if(succ) {
boolean succ1 = executeLocalTransaction()
// A情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
if(succ1) {
commitMQ() //把MQ的狀態,從準備改為可消費,消費者可以消費該條訊息
// B情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
} else {
rollbackMQ() //把MQ的狀態,從準備改為取消,消費者不消費這條MQ訊息
// C情況 如果執行到這裡掛了,或者異常了,觸發checkLocalTransaction
}
} else {
return 失敗
}
//如果在執行期間掛了,恢復之後還是會checkLocalTransaction,看看DB事務是否成功
//RocketMQ有個後臺執行緒,一直在輪詢prepare message,這東西肯定在1個佇列裡
//重試5次,是我隨意寫的,具體看原始碼吧,這個不重要,重要的是思想。
MessageExt prepareMsg = getPrepareMessageFromQueue();
while(prepareMsg && 重試次數 < 5) {
LocalTransactionState state = checkLocalTransaction(prepareMsg)
if(state == COMMIT_MESSAGE) {
//提交MQ事務
//從佇列裡移除這條訊息
//下一條
prepareMsg = getPrepareMessageFromQueue();
} else if(state == ROLLBACK_MESSAGE) {
//回滾MQ事務
//從佇列裡移除這條訊息
//下一條
prepareMsg = getPrepareMessageFromQueue();
} else {
//這裡是UNKNOW狀態,這裡應該什麼都不做,直接獲取下一條prepare message
//DB裡的事務不應該是UNKNOW狀態
//除非DB超時了,沒有告訴MQ資料庫事務到底是成功還是失敗
prepareMsg = getPrepareMessageFromQueue();
//重試次數+1
}
}
核心思想:
- 先發MQ,再執行DB的事務,再根據DB事務的狀態,決定MQ訊息是否要給消費者消費,如果DB成功了,MQ通過重試機制,保證prepare(half) message可以變更為commited狀態。
- DB的事務,必須在 executeLocalTransaction() 這個方法裡寫。
- MQ自己內部保證 commitMQ() 或者 rollbackMQ() 一定能成功。
- 如果MQ掛了,再啟動的時候,可以再去DB裡查資料庫事務是否成功,保證訊息的最終一致。
這裡可以看到,RocketMQ通過先發MQ訊息,再執行DB事務,保證了在傳送MQ這個環境一旦出現錯誤,可以通過再次回溯DB,檢視DB的事務狀態,來判斷是提交MQ的事務訊息,還是回滾MQ的事務訊息,這裡其實也有問題,就是一旦MQ執行commitMQ失敗了,去回溯DB的時候,恰好DB也掛了,而且MQ重試了幾次,DB都沒有恢復,這個時候該怎麼辦呢?大家可以思考下。
看到這裡,我們再回過頭來看,未知狀態是什麼?其實就是各種異常情況導致的未知狀態,就是上述程式碼中的A情況、B情況、C情況,可能還會有網路超時、MQ自己掛了、MQ訊息超時等異常情況,這裡RocketMQ是通過回溯DB檢視DB的事務狀態來決定MQ的事務狀態的,本質上屬於2PC,並不是真正的分散式事務。這裡分散式事務的難點在於通訊通道的不可靠,具體可以看看2軍問題,2軍問題和拜占庭將軍問題是完全不同的2個問題,大家一定要分清楚。
全文完。
關注我的部落格,獲取更多Java程式設計知識: https://www.epoooll.com/