RocketMQ原始碼分析之從官方示例窺探:RocketMQ事務訊息實現基本思想
RocketMQ4.3.0版本開始支援事務訊息,後續分享將開始將剖析事務訊息的實現原理。首先從官方給出的Demo例項入手,以此通往RocketMQ事務訊息的世界中。
官方版本未釋出之前,從apache rocketmq第一個版本上線後,程式碼中存在與事務訊息相關的程式碼,例如COMMIT、ROLLBACK、PREPARED,在事務訊息未開源之前網上對於事務訊息的“聲音”基本上是使用類似二階段提交,主要是根據訊息系統標誌MessageSysFlag中定義來推測的:
TRANSACTION_PREPARED_TYPE
TRANSACTION_COMMIT_TYPE
TRANSACTION_ROLLBACK_TYPE
訊息傳送者首先發送TRANSACTION_PREPARED_TYPE型別的訊息,然後根據事務狀態來決定是提交或回滾事務傳送commit請求或rollback請求,如果commit/rollback請求丟失後,rocketmq會在指定超時時間後回查事務狀態來決定提交或回滾事務。
讓我們各自帶著自己的理解和猜測,從閱讀RocketMQ官方提供的Demo程式入手,試圖窺探一些大體的資訊。
Demo示例程式位於:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。該包中未放置訊息消費者,為了驗證事務的訊息消費情況,我們可以從其他包copy一個消費者,從而先執行生產者,然後執行消費者,判斷事務訊息的預發放、提交、回滾等效果,二話不說,先執行一下,看下效果再說:
訊息傳送端執行結果:
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]
訊息消費端效果:
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='C0A8010518DC6D06D69C8D5768640007'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='C0A8010518DC6D06D69C8D57680F0001'}]]
綜上所述,服務端傳送了10條訊息,而消費端只收到3條訊息,應該是由於事務回滾,造成只提交了3條訊息,為了更加嚴謹,可以安裝一個rocketmq-consonse,更加直觀的觀察shangshagn's上述結果:
接下來對示例程式碼進行解讀:
1、生產者端程式碼解讀:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl(); // @1
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {br/>@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}); // @2
producer.setExecutorService(executorService); // @3
producer.setTransactionListener(transactionListener); // @4
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) { // @5
try {
Message msg =
new Message("transaction_topic_test", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) { //這裡只是阻止生產者過早退出,導致事務訊息的相關機制無法執行
Thread.sleep(1000);
}
producer.shutdown();
}
}
程式碼@1:建立TransactionListener 例項,字面理解為事務訊息事件監聽器,下文詳細對其進行展開。
程式碼@2:ExecutorService executorService,建立一個執行緒池,其執行緒的名稱字首”client-transaction-msg-check-thread“,從字面理解為客戶端事務訊息狀態檢測執行緒,我們可以大膽的猜測一下是不是這個執行緒池呼叫TransactionListener方法,完成對事務訊息的檢測呢?【這裡只是作者的猜測,大家不能當真,在作者後續文章釋出後,如果該觀點錯誤,會加以修復,這裡寫出來,主要是想分享一下我讀原始碼的方法】。br/>程式碼@3:為事務訊息傳送者設定執行緒池。
程式碼@4:為事務訊息傳送者設定事務監聽器。
程式碼@5:傳送10條訊息。
2、TransactionListener程式碼解讀
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
executeLocalTransaction方法:記錄本地事務的事務狀態,這裡其實現就是迴圈設定事務訊息的狀態為0,1,2,demo中是把訊息的狀態資料存放在一個Map中。實際應用時通常會持久化訊息的事務狀態,例如資料庫或快取。
checkLocalTransaction方法,事務回查業務實現,查本地事務表,判斷事務的狀態如為0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。這裡就能解釋,生產者連續發10條訊息,因為只有3條訊息的事務狀態為COMMIT_MESSAGE,故訊息消費者只能消費3條。
到這裡,基本上還是可以得知事務訊息的實現方式,基本與文章開頭所示的“網上聲音”實現類似,下一節將詳細分析TransactionMQProducer事務訊息傳送的實現細節。
鄭重宣告:本文主要是展示事務訊息的基本使用,本文所下的結論還僅僅是作者的猜測,下一篇文章,將重點分析事務訊息的實現細節,本文一個非常重要的目的,是向讀者朋友們展示作者學習原始碼的一個方法,總結為:先做全面瞭解(網上,官方文件)、然後加以自己的思考,從Demo例項入手學習,將學習任務分解之,邊寫邊看。
這算不算文末有彩蛋呢?呵呵,下一篇見:詳細分析RocketMQ事務訊息的實現細節。
本文節選自書籍《RocketMQ技術內幕:RocketMQ架構設計與實現原理》