RocketMq在閹割訊息回查checkTransactionState後實現分散式事務
利用rocketMQ解決分散式事務
在rocketMQ中生產者有三種角色 NormalProducer(普通)、OrderProducer(順序)、TransactionProducer(事務)
根據名字大概可以看出各個代表著什麼作用,我們這裡用 TransactionProducer(事務)來解決問題。
先舉個列子來說明下我們解決方案的設計方式吧:最經典的莫過於銀行轉賬了,網上到處都有,時序圖如下
下面貼一下測試程式碼:
(1) 執行業務邏輯的部分
/**
* @Date: Created in 2018/2/12 15:55
執行本地事務
*/
public class TransactionExecuterimpl implements LocalTransactionExecuter{
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message message, final Object o) {
try{
//DB操作 應該帶上事務 service -> dao
//如果資料操作失敗 需要回滾 同事返回RocketMQ一個失敗訊息 意味著 消費者無法消費到這條失敗的訊息
//如果成功 就要返回一個rocketMQ成功的訊息,意味著消費者將讀取到這條訊息
//o就是attachment
//測試程式碼
if(new Random().nextInt(3) == 2){
int a = 1 / 0;
}
System.out.println(new Date()+"===> 本地事務執行成功,傳送確認訊息");
}catch (Exception e){
System.out.println(new Date()+"===> 本地事務執行失敗!!!");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
(2) 處理事務回查的程式碼部分
/**
* @Date: Created in 2018/2/12 15:48
* 未決事務,伺服器端回查客戶端
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
System.out.println("伺服器端回查事務訊息: "+messageExt.toString());
//由於RocketMQ遲遲沒有收到訊息的確認訊息,因此主動詢問這條prepare訊息,是否正常?
//可以查詢資料庫看這條資料是否已經處理
return LocalTransactionState.COMMIT_MESSAGE;
}
}
(3) 啟動生產者
/**
* @Date: Created in 2018/2/12 15:24
* 測試本地事務
*/
public class TestTransactionProducer {
public static void main(String[] args){
//事務回查監聽器
TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl();
//事務訊息生產者
TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
//MQ伺服器地址
producer.setNamesrvAddr("192.168.56.105:9876;192.168.106:9876");
//註冊事務回查監聽
producer.setTransactionCheckListener(checkListener);
//本地事務執行器
TransactionExecuterimpl executerimpl = null;
try {
//啟動生產者
producer.start();
executerimpl = new TransactionExecuterimpl();
Message msg1 = new Message("TransactionTopic", "tag", "KEY1", "hello RocketMQ 1".getBytes());
Message msg2 = new Message("TransactionTopic", "tag", "KEY2", "hello RocketMQ 2".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
System.out.println(new Date() + "msg1"+sendResult);
sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
System.out.println(new Date() + "msg2"+sendResult);
} catch (MQClientException e) {
e.printStackTrace();
}
producer.shutdown();
}
}
(4) 消費之消費訊息
/**
* @Date: Created in 2018/2/11 15:37
*/
public class TestConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("192.168.56.105:9876;192.168.56.106:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//消費普通訊息
// consumer.subscribe("TopicTest","*");
//消費事務訊息
consumer.subscribe("TransactionTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt ext:msgs) {
try {
System.out.println(new Date() + new String(ext.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Start............");
}
}
重點來了:3.2.6之前的版本這樣寫就可以了,但是之後的版本被關於事務回查這個藉口被閹割了,不會在進行事務回查操作。
那麼第五步向MQ傳送訊息如果失敗的話,會造成A銀行扣款成功而B銀行收款未成功的資料不一致的情況
解決辦法
這裡只需要考慮本地事務執行成功後的情況(因為本地事務失敗不管確認訊息傳送成功與失敗MQ叢集都不會再發送訊息到消費者):
- 本地事務成功後宕機,確認訊息沒有發出,分散式事務只執行一半。
- 確認訊息COMMIT_MESSGE發出,但因網路不可達RocketMQ叢集沒收到。
- 確認訊息COMMIT_MESSGE發出,RocketMQ叢集收到COMMIT_MESSGE訊息,但rocketmq取消了回查機制,
生產者還是不知道COMMIT_MESSGE發出是否成功。
上面三種情況的本質是一樣的,就是生產者本地事務成功後,COMMIT_MESSGE訊息是否送達rocketmq叢集;所以可以看做同一種情況.
下面是按key值查詢事務成功提交COMMIT_MESSGE
訊息後的返回資訊:
QueryResult
[indexLastUpdateTimestamp=1516002830440,
messageList=[
MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
Message
[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]],
MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=8, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830440, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000369, commitLogOffset=873, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=582, toString()=
Message
[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]]
]
]
共返回兩條訊息:兩條訊息中大部分資料是一樣的,但sysFlag
、storeTimestamp
、msgId
、commitLogOffset
、preparedTransactionOffset
欄位是不一樣的:其中第1條為prepared傳送的訊息,第2條只有在提交COMMIT_MESSGE訊息成功後產生。
注意sysFlag
、preparedTransactionOffset
欄位與prepared
訊息的區別,當提交COMMIT_MESSGE訊息成功後,推測MQ叢集做了如下動作:1. 讀取prepared訊息,修改sysFlag
、preparedTransactionOffset
值,2. 在存入commitlog日誌檔案,設定consumerqueue序列;因為當作一條新的訊息處理,所以toreTimestamp
、msgId
、commitLogOffset
欄位自然也就變了。所以按照發送的prepared訊息的返回結果顯示的msgId
檢視sysFlag
狀態只是prepared訊息的sysFlag
狀態,RocketMQ4.2版本的話要用key值去查詢,才能檢視事務提交成功的訊息標誌sysFlag=8
。
下面是按key值查詢事務失敗提交ROLLBACK_MESSAGE
訊息後的返回資訊:
QueryResult
[indexLastUpdateTimestamp=1516002830440,
messageList=[
MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
Message
[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]]
]
]
由此我們可以根據syyFlag判斷我們在提交事務以後, 訊息傳送是否成功
下面是邏輯分析圖
一、在執行本地事務commit前向回查表插入訊息的KEY值。
二、在生產者叢集上設定一個定時任務(根據自身分散式事務流程執行的時間設定)。
- 從回查表獲取CONFIRM為0的記錄列表,從記錄列表中獲取COUNT為3的記錄,當count列達到指定閥值(假定是3)時:
此時記錄的COUNT為3,如果CONFIRM還是為0,那麼說明對此事務的回查次數為3,但RocketMQ叢集還未收到COMMIT_MESSAGE訊息,說明發送COMMIT_MESSAGE訊息失敗,但本地事務已經執行成功,那麼必須要重發與此條記錄中KEY值相對應的Perpared訊息的確認訊息。根據KEY值向MQ叢集查詢訊息,根據獲取的訊息重新用同步的方式傳送此條訊息到MQ叢集,並更新此記錄的CONFIRM為1,COUNT+1 - 根據第1步獲取的記錄列表,取出CONFIRM為0且COUNT小於3的記錄,根據KEY值向MQ叢集查詢訊息。
- 根據第2步獲取的訊息判斷是否是sysFlag為8的訊息;如果是,更新回查表對應KEY記錄的CONFIRM為1,COUNT為count+1,如果不是,更新回查表對應KEY記錄的COUNT為count+1。
這裡是用COUNT來避免剛剛傳送訊息就開始判斷sysFlag的情況,相當於預留了一點rabbitmq的訊息傳送延遲時間,這個也可以使用CREATE_TIME來代替,比如判斷記錄列表中CREATE_TIME在當前時間前3s以前,未成功傳送的訊息
這隻能算其中一種,還有一些手動從生產者和消費者兩端進行資料庫查詢的方法,有興趣的自己可以去了解