分散式訊息佇列 RocketMQ原始碼解析:事務訊息
本文主要基於 RocketMQ 4.0.x 正式版
- 1. 概述
- 2. 事務訊息傳送
- 2.1 Producer 傳送事務訊息
- 2.2 Broker 處理結束事務請求
- 2.3 Broker 生成 ConsumeQueue
- 3. 事務訊息回查
- 3.1 Broker 發起【事務訊息回查】
- 3.1.1 官方V3.1.4:基於檔案系統
- 3.1.1.1 儲存訊息到 CommitLog
- 3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)
- 3.1.1.3 【事務訊息】回查
- 3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)
- 3.1.1.5 補充
- 3.1.2 官方V4.0.0:基於資料庫
- 3.1.1 官方V3.1.4:基於檔案系統
- 3.2 Producer 接收【事務訊息回查】
- 3.1 Broker 發起【事務訊息回查】
1. 概述
必須必須必須 前置閱讀內容:
- 《事務訊息(阿里雲)》
2. 事務訊息傳送
2.1 Producer 傳送事務訊息
- 活動圖如下(結合
核心程式碼
理解):
- 實現程式碼如下:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】 2: /** 3: * 傳送事務訊息 4: * 5: * @param msg 訊息 6: * @param tranExecuter 【本地事務】執行器 7: * @param arg 【本地事務】執行器引數 8: * @return 事務傳送結果 9: * @throws MQClientException 當 Client 發生異常時 10: */ 11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) 12: throws MQClientException { 13: if (null == tranExecuter) { 14: throw new MQClientException("tranExecutor is null", null); 15: } 16: Validators.checkMessage(msg, this.defaultMQProducer); 17: 18: // 傳送【Half訊息】 19: SendResult sendResult; 20: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); 21: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); 22: try { 23: sendResult = this.send(msg); 24: } catch (Exception e) { 25: throw new MQClientException("send message Exception", e); 26: } 27: 28: // 處理髮送【Half訊息】結果 29: LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; 30: Throwable localException = null; 31: switch (sendResult.getSendStatus()) { 32: // 傳送【Half訊息】成功,執行【本地事務】邏輯 33: case SEND_OK: { 34: try { 35: if (sendResult.getTransactionId() != null) { // 事務編號。目前開源版本暫時沒用到,猜想ONS在使用。 36: msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); 37: } 38: 39: // 執行【本地事務】邏輯 40: localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); 41: if (null == localTransactionState) { 42: localTransactionState = LocalTransactionState.UNKNOW; 43: } 44: 45: if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { 46: log.info("executeLocalTransactionBranch return {}", localTransactionState); 47: log.info(msg.toString()); 48: } 49: } catch (Throwable e) { 50: log.info("executeLocalTransactionBranch exception", e); 51: log.info(msg.toString()); 52: localException = e; 53: } 54: } 55: break; 56: // 傳送【Half訊息】失敗,標記【本地事務】狀態為回滾 57: case FLUSH_DISK_TIMEOUT: 58: case FLUSH_SLAVE_TIMEOUT: 59: case SLAVE_NOT_AVAILABLE: 60: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; 61: break; 62: default: 63: break; 64: } 65: 66: // 結束事務:提交訊息 COMMIT / ROLLBACK 67: try { 68: this.endTransaction(sendResult, localTransactionState, localException); 69: } catch (Exception e) { 70: log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); 71: } 72: 73: // 返回【事務傳送結果】 74: TransactionSendResult transactionSendResult = new TransactionSendResult(); 75: transactionSendResult.setSendStatus(sendResult.getSendStatus()); 76: transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); 77: transactionSendResult.setMsgId(sendResult.getMsgId()); 78: transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); 79: transactionSendResult.setTransactionId(sendResult.getTransactionId()); 80: transactionSendResult.setLocalTransactionState(localTransactionState); 81: return transactionSendResult; 82: } 83: 84: /** 85: * 結束事務:提交訊息 COMMIT / ROLLBACK 86: * 87: * @param sendResult 傳送【Half訊息】結果 88: * @param localTransactionState 【本地事務】狀態 89: * @param localException 執行【本地事務】邏輯產生的異常 90: * @throws RemotingException 當遠端呼叫發生異常時 91: * @throws MQBrokerException 當 Broker 發生異常時 92: * @throws InterruptedException 當執行緒中斷時 93: * @throws UnknownHostException 當解碼訊息編號失敗是 94: */ 95: public void endTransaction(// 96: final SendResult sendResult, // 97: final LocalTransactionState localTransactionState, // 98: final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { 99: // 解碼訊息編號 100: final MessageId id; 101: if (sendResult.getOffsetMsgId() != null) { 102: id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); 103: } else { 104: id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); 105: } 106: 107: // 建立請求 108: String transactionId = sendResult.getTransactionId(); 109: final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); 110: EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); 111: requestHeader.setTransactionId(transactionId); 112: requestHeader.setCommitLogOffset(id.getOffset()); 113: switch (localTransactionState) { 114: case COMMIT_MESSAGE: 115: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); 116: break; 117: case ROLLBACK_MESSAGE: 118: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); 119: break; 120: case UNKNOW: 121: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); 122: break; 123: default: 124: break; 125: } 126: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); 127: requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); 128: requestHeader.setMsgId(sendResult.getMsgId()); 129: String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; 130: 131: // 提交訊息 COMMIT / ROLLBACK。!!!通訊方式為:Oneway!!! 132: this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); 133: }
2.2 Broker 處理結束事務請求
- ? 查詢請求的訊息,進行提交 / 回滾。實現程式碼如下:
1: // ⬇️⬇️⬇️【EndTransactionProcessor.java】 2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { 3: final RemotingCommand response = RemotingCommand.createResponseCommand(null); 4: final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); 5: 6: // 省略程式碼 =》列印日誌(只處理 COMMIT / ROLLBACK) 7: 8: // 查詢提交的訊息 9: final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset()); 10: if (msgExt != null) { 11: // 省略程式碼 =》校驗訊息 12: 13: // 生成訊息 14: MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt); 15: msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); 16: msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); 17: msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); 18: msgInner.setStoreTimestamp(msgExt.getStoreTimestamp()); 19: if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { 20: msgInner.setBody(null); 21: } 22: 23: // 儲存生成訊息 24: final MessageStore messageStore = this.brokerController.getMessageStore(); 25: final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); 26: 27: // 處理儲存結果 28: if (putMessageResult != null) { 29: switch (putMessageResult.getPutMessageStatus()) { 30: // Success 31: case PUT_OK: 32: case FLUSH_DISK_TIMEOUT: 33: case FLUSH_SLAVE_TIMEOUT: 34: case SLAVE_NOT_AVAILABLE: 35: response.setCode(ResponseCode.SUCCESS); 36: response.setRemark(null); 37: break; 38: // Failed 39: case CREATE_MAPEDFILE_FAILED: 40: response.setCode(ResponseCode.SYSTEM_ERROR); 41: response.setRemark("create maped file failed."); 42: break; 43: case MESSAGE_ILLEGAL: 44: case PROPERTIES_SIZE_EXCEEDED: 45: response.setCode(ResponseCode.MESSAGE_ILLEGAL); 46: response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); 47: break; 48: case SERVICE_NOT_AVAILABLE: 49: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); 50: response.setRemark("service not available now."); 51: break; 52: case OS_PAGECACHE_BUSY: 53: response.setCode(ResponseCode.SYSTEM_ERROR); 54: response.setRemark("OS page cache busy, please try another machine"); 55: break; 56: case UNKNOWN_ERROR: 57: response.setCode(ResponseCode.SYSTEM_ERROR); 58: response.setRemark("UNKNOWN_ERROR"); 59: break; 60: default: 61: response.setCode(ResponseCode.SYSTEM_ERROR); 62: response.setRemark("UNKNOWN_ERROR DEFAULT"); 63: break; 64: } 65: 66: return response; 67: } else { 68: response.setCode(ResponseCode.SYSTEM_ERROR); 69: response.setRemark("store putMessage return null"); 70: } 71: } else { 72: response.setCode(ResponseCode.SYSTEM_ERROR); 73: response.setRemark("find prepared transaction message failed"); 74: return response; 75: } 76: 77: return response; 78: }
2.3 Broker 生成 ConsumeQueue
- ? 事務訊息,提交(
COMMIT
)後才生成ConsumeQueue
。
1: // ⬇️⬇️⬇️【DefaultMessageStore.java】
2: public void doDispatch(DispatchRequest req) {
3: // 非事務訊息 或 事務提交訊息 建立 訊息位置資訊 到 ConsumeQueue
4: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
5: switch (tranType) {
6: case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事務訊息
7: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事務訊息COMMIT
8: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
9: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
10: break;
11: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事務訊息PREPARED
12: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事務訊息ROLLBACK
13: break;
14: }
15: // 省略程式碼 =》 建立 索引資訊 到 IndexFile
16: }
3. 事務訊息回查
- 【事務訊息回查】功能曾經開源過,目前(V4.0.0)暫未開源。如下是該功能的開源情況:
版本 |
【事務訊息回查】 |
|
---|---|---|
官方V3.0.4 ~ V3.1.4 |
基於 檔案系統 實現 |
已開源 |
官方V3.1.5 ~ V4.0.0 |
基於 資料庫 實現 |
未完全開源 |
我們來看看兩種情況下是怎麼實現的。
3.1 Broker 發起【事務訊息回查】
3.1.1 官方V3.1.4:基於檔案系統
倉庫地址:https://github.com/YunaiV/rocketmq-3.1.9/tree/release_3.1.4
相較於普通訊息,【事務訊息】多依賴如下三個元件:
- TransactionStateService :事務狀態服務,負責對【事務訊息】進行管理,包括儲存與更新事務訊息狀態、回查事務訊息狀態等等。
-
TranStateTable :【事務訊息】狀態儲存。基於
MappedFileQueue
實現,預設儲存路徑為~/store/transaction/statetable
,每條【事務訊息】狀態儲存結構如下:
第幾位 |
欄位 |
說明 |
資料型別 |
位元組數 |
---|---|---|---|---|
1 |
offset |
CommitLog 物理儲存位置 |
Long |
8 |
2 |
size |
訊息長度 |
Int |
4 |
3 |
timestamp |
訊息儲存時間,單位:秒 |
Int |
4 |
4 |
producerGroupHash |
producerGroup 求 HashCode |
Int |
4 |
5 |
state |
事務狀態 |
Int |
4 |
TranRedoLog :TranStateTable重放日誌,每次寫操作 TranStateTable記錄重放日誌。當 Broker 異常關閉時,使用 TranRedoLog 恢復 TranStateTable。基於 ConsumeQueue 實現,Topic 為 TRANSACTION_REDOLOG_TOPIC_XXXX,預設儲存路徑為 ~/store/transaction/redolog。 |
||||
-
TranRedoLog :
TranStateTable
重放日誌,每次寫操作TranStateTable
記錄重放日誌。當Broker
異常關閉時,使用TranRedoLog
恢復TranStateTable
。基於ConsumeQueue
實現,Topic
為TRANSACTION_REDOLOG_TOPIC_XXXX
,預設儲存路徑為~/store/transaction/redolog
。
簡單手繪邏輯圖如下?:
3.1.1.1 儲存訊息到 CommitLog
- ?儲存【half訊息】到
CommitLog
時,訊息佇列位置(queueOffset
)使用TranStateTable
最大物理位置(可寫入物理位置)。這樣,訊息可以索引到自己對應的TranStateTable
的位置和記錄。
核心程式碼如下:
1: // ⬇️⬇️⬇️【DefaultAppendMessageCallback.java】
2: class DefaultAppendMessageCallback implements AppendMessageCallback {
3: public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
4: // ...省略程式碼
5:
6: // 事務訊息需要特殊處理
7: final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
8: switch (tranType) {
9: case MessageSysFlag.TransactionPreparedType: // 訊息佇列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置)
10: queueOffset = CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().get();
11: break;
12: case MessageSysFlag.TransactionRollbackType:
13: queueOffset = msgInner.getQueueOffset();
14: break;
15: case MessageSysFlag.TransactionNotType:
16: case MessageSysFlag.TransactionCommitType:
17: default:
18: break;
19: }
20:
21: // ...省略程式碼
22:
23: switch (tranType) {
24: case MessageSysFlag.TransactionPreparedType:
25: // 更新 TranStateTable 最大物理位置(可寫入物理位置)
26: CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().incrementAndGet();
27: break;
28: case MessageSysFlag.TransactionRollbackType:
29: break;
30: case MessageSysFlag.TransactionNotType:
31: case MessageSysFlag.TransactionCommitType:
32: // 更新下一次的ConsumeQueue資訊
33: CommitLog.this.topicQueueTable.put(key, ++queueOffset);
34: break;
35: default:
36: break;
37: }
38:
39: // 返回結果
40: return result;
41: }
42: }
3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)
- ?處理【Half訊息】時,新增【事務訊息】狀態儲存(
TranStateTable
)。 - ?處理【Commit / Rollback訊息】時,更新 【事務訊息】狀態儲存(
TranStateTable
) COMMIT / ROLLBACK。 - ?每次寫操作【事務訊息】狀態儲存(
TranStateTable
),記錄重放日誌(TranRedoLog
)。
核心程式碼如下:
1: // ⬇️⬇️⬇️【DispatchMessageService.java】
2: private void doDispatch() {
3: if (!this.requestsRead.isEmpty()) {
4: for (DispatchRequest req : this.requestsRead) {
5:
6: // ...省略程式碼
7:
8: // 2、寫【事務訊息】狀態儲存(TranStateTable)
9: if (req.getProducerGroup() != null) {
10: switch (tranType) {
11: case MessageSysFlag.TransactionNotType:
12: break;
13: case MessageSysFlag.TransactionPreparedType:
14: // 新增 【事務訊息】狀態儲存(TranStateTable)
15: DefaultMessageStore.this.getTransactionStateService().appendPreparedTransaction(
16: req.getCommitLogOffset(), req.getMsgSize(), (int) (req.getStoreTimestamp() / 1000), req.getProducerGroup().hashCode());
17: break;
18: case MessageSysFlag.TransactionCommitType:
19: case MessageSysFlag.TransactionRollbackType:
20: // 更新 【事務訊息】狀態儲存(TranStateTable) COMMIT / ROLLBACK
21: DefaultMessageStore.this.getTransactionStateService().updateTransactionState(
22: req.getTranStateTableOffset(), req.getPreparedTransactionOffset(), req.getProducerGroup().hashCode(), tranType);
23: break;
24: }
25: }
26: // 3、記錄 TranRedoLog
27: switch (tranType) {
28: case MessageSysFlag.TransactionNotType:
29: break;
30: case MessageSysFlag.TransactionPreparedType:
31: // 記錄 TranRedoLog
32: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
33: req.getCommitLogOffset(), req.getMsgSize(), TransactionStateService.PreparedMessageTagsCode,
34: req.getStoreTimestamp(), 0L);
35: break;
36: case MessageSysFlag.TransactionCommitType:
37: case MessageSysFlag.TransactionRollbackType:
38: // 記錄 TranRedoLog
39: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
40: req.getCommitLogOffset(), req.getMsgSize(), req.getPreparedTransactionOffset(),
41: req.getStoreTimestamp(), 0L);
42: break;
43: }
44: }
45:
46: // ...省略程式碼
47: }
48: }
49: // ⬇️⬇️⬇️【TransactionStateService.java】
50: /**
51: * 新增事務狀態
52: *
53: * @param clOffset commitLog 物理位置
54: * @param size 訊息長度
55: * @param timestamp 訊息儲存時間
56: * @param groupHashCode groupHashCode
57: * @return 是否成功
58: */
59: public boolean appendPreparedTransaction(//
60: final long clOffset,//
61: final int size,//
62: final int timestamp,//
63: final int groupHashCode//
64: ) {
65: MapedFile mapedFile = this.tranStateTable.getLastMapedFile();
66: if (null == mapedFile) {
67: log.error("appendPreparedTransaction: create mapedfile error.");
68: return false;
69: }
70:
71: // 首次建立,加入定時任務中
72: if (0 == mapedFile.getWrotePostion()) {
73: this.addTimerTask(mapedFile);
74: }
75:
76: this.byteBufferAppend.position(0);
77: this.byteBufferAppend.limit(TSStoreUnitSize);
78:
79: // Commit Log Offset
80: this.byteBufferAppend.putLong(clOffset);
81: // Message Size
82: this.byteBufferAppend.putInt(size);
83: // Timestamp
84: this.byteBufferAppend.putInt(timestamp);
85: // Producer Group Hashcode
86: this.byteBufferAppend.putInt(groupHashCode);
87: // Transaction State
88: this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);
89:
90: return mapedFile.appendMessage(this.byteBufferAppend.array());
91: }
92:
93: /**
94: * 更新事務狀態
95: *
96: * @param tsOffset tranStateTable 物理位置
97: * @param clOffset commitLog 物理位置
98: * @param groupHashCode groupHashCode
99: * @param state 事務狀態
100: * @return 是否成功
101: */
102: public boolean updateTransactionState(
103: final long tsOffset,
104: final long clOffset,
105: final int groupHashCode,
106: final int state) {
107: SelectMapedBufferResult selectMapedBufferResult = this.findTransactionBuffer(tsOffset);
108: if (selectMapedBufferResult != null) {
109: try {
110:
111: // ....省略程式碼:校驗是否能夠更新
112:
113: // 更新事務狀態
114: selectMapedBufferResult.getByteBuffer().putInt(TS_STATE_POS, state);
115: }
116: catch (Exception e) {
117: log.error("updateTransactionState exception", e);
118: }
119: finally {
120: selectMapedBufferResult.release();
121: }
122: }
123:
124: return false;
125: }
3.1.1.3 【事務訊息】回查
- ?
TranStateTable
每個MappedFile
都對應一個Timer
。Timer
固定週期(預設:60s)遍歷MappedFile
,查詢【half訊息】,向Producer
發起【事務訊息】回查請求。【事務訊息】回查結果的邏輯不在此處進行,在 CommitLog dispatch時執行。
實現程式碼如下:
1: // ⬇️⬇️⬇️【TransactionStateService.java】
2: /**
3: * 初始化定時任務
4: */
5: private void initTimerTask() {
6: //
7: final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
8: for (MapedFile mf : mapedFiles) {
9: this.addTimerTask(mf);
10: }
11: }
12:
13: /**
14: * 每個檔案初始化定時任務
15: * @param mf 檔案
16: */
17: private void addTimerTask(final MapedFile mf) {
18: this.timer.scheduleAtFixedRate(new TimerTask() {
19: private final MapedFile mapedFile = mf;
20: private final TransactionCheckExecuter transactionCheckExecuter = TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();
21: private final long checkTransactionMessageAtleastInterval = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
22: .getCheckTransactionMessageAtleastInterval();
23: private final boolean slave = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
24:
25: @Override
26: public void run() {
27: // Slave不需要回查事務狀態
28: if (slave) {
29: return;
30: }
31: // Check功能是否開啟
32: if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
33: .isCheckTransactionMessageEnable()) {
34: return;
35: }
36:
37: try {
38: SelectMapedBufferResult selectMapedBufferResult = mapedFile.selectMapedBuffer(0);
39: if (selectMapedBufferResult != null) {
40: long preparedMessageCountInThisMapedFile = 0; // 回查的【half訊息】數量
41: int i = 0;
42: try {
43: // 迴圈每條【事務訊息】狀態,對【half訊息】進行回查
44: for (; i < selectMapedBufferResult.getSize(); i += TSStoreUnitSize) {
45: selectMapedBufferResult.getByteBuffer().position(i);
46:
47: // Commit Log Offset
48: long clOffset = selectMapedBufferResult.getByteBuffer().getLong();
49: // Message Size
50: int msgSize = selectMapedBufferResult.getByteBuffer().getInt();
51: // Timestamp
52: int timestamp = selectMapedBufferResult.getByteBuffer().getInt();
53: // Producer Group Hashcode
54: int groupHashCode = selectMapedBufferResult.getByteBuffer().getInt();
55: // Transaction State
56: int tranType = selectMapedBufferResult.getByteBuffer().getInt();
57:
58: // 已經提交或者回滾的訊息跳過
59: if (tranType != MessageSysFlag.TransactionPreparedType) {
60: continue;
61: }
62:
63: // 遇到時間不符合最小輪詢間隔,終止
64: long timestampLong = timestamp * 1000;
65: long diff = System.currentTimeMillis() - timestampLong;
66: if (diff < checkTransactionMessageAtleastInterval) {
67: break;
68: }
69:
70: preparedMessageCountInThisMapedFile++;
71:
72: // 回查Producer
73: try {
74: this.transactionCheckExecuter.gotoCheck(groupHashCode, getTranStateOffset(i), clOffset, msgSize);
75: } catch (Exception e) {
76: tranlog.warn("gotoCheck Exception", e);
77: }
78: }
79:
80: // 無回查的【half訊息】數量,且遍歷完,則終止定時任務
81: if (0 == preparedMessageCountInThisMapedFile //
82: && i == mapedFile.getFileSize()) {
83: tranlog.info("remove the transaction timer task, because no prepared message in this mapedfile[{}]", mapedFile.getFileName());
84: this.cancel();
85: }
86: } finally {
87: selectMapedBufferResult.release();
88: }
89:
90: tranlog.info("the transaction timer task execute over in this period, {} Prepared Message: {} Check Progress: {}/{}", mapedFile.getFileName(),//
91: preparedMessageCountInThisMapedFile, i / TSStoreUnitSize, mapedFile.getFileSize() / TSStoreUnitSize);
92: } else if (mapedFile.isFull()) {
93: tranlog.info("the mapedfile[{}] maybe deleted, cancel check transaction timer task", mapedFile.getFileName());
94: this.cancel();
95: return;
96: }
97: } catch (Exception e) {
98: log.error("check transaction timer task Exception", e);
99: }
100: }
101:
102:
103: private long getTranStateOffset(final long currentIndex) {
104: long offset = (this.mapedFile.getFileFromOffset() + currentIndex) / TransactionStateService.TSStoreUnitSize;
105: return offset;
106: }
107: }, 1000 * 60, this.defaultMessageStore.getMessageStoreConfig().getCheckTransactionMessageTimerInterval());
108: }
109:
110: // 【DefaultTransactionCheckExecuter.java】
111: @Override
112: public void gotoCheck(int producerGroupHashCode, long tranStateTableOffset, long commitLogOffset,
113: int msgSize) {
114: // 第一步、查詢Producer
115: final ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager().pickProducerChannelRandomly(producerGroupHashCode);
116: if (null == clientChannelInfo) {
117: log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode);
118: return;
119: }
120:
121: // 第二步、查詢訊息
122: SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(commitLogOffset, msgSize);
123: if (null == selectMapedBufferResult) {
124: log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: ", commitLogOffset, msgSize);
125: return;
126: }
127:
128: // 第三步、向Producer發起請求
129: final CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
130: requestHeader.setCommitLogOffset(commitLogOffset);
131: requestHeader.setTranStateTableOffset(tranStateTableOffset);
132: this.brokerController.getBroker2Client().checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, selectMapedBufferResult);
133: }
3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)
- ?根據最後 Broker 關閉是否正常,會有不同的初始化方式。
核心程式碼如下:
// 微信文章長度限制,請點選【閱讀原文】
3.1.1.5 補充
- 為什麼 V3.1.5 開始,使用 資料庫 實現【事務狀態】的儲存?如下是來自官方文件的說明,可能是一部分原因:
RocketMQ 這種實現事務方式,沒有通過 KV 儲存做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改資料,會令系統的髒頁過多,需要特別關注。
3.1.2 官方V4.0.0:基於資料庫
倉庫地址:https://github.com/apache/incubator-rocketmq
官方V4.0.0 暫時未完全開源【事務訊息回查】功能,So 我們需要進行一些猜想,可能不一定正確?。
?我們來對比【官方V3.1.4:基於檔案】的實現。
- TransactionRecord :記錄每條【事務訊息】。類似
TranStateTable
。
TranStateTable |
TransactionRecord |
|
---|---|---|
offset |
offset |
|
producerGroupHash |
producerGroup |
|
size |
無 |
非必須欄位:【事務訊息】回查時,使用 offset 讀取 CommitLog 獲得。 |
timestamp |
無 |
非必須欄位:【事務訊息】回查時,使用 offset 讀取 CommitLog 獲得。 |
state |
無 |
非必須欄位: 事務開始,增加記錄;事務結束,刪除記錄。 |
另外,資料庫本身保證了資料儲存的可靠性,無需 TranRedoLog
。
簡單手繪邏輯圖如下?:
3.2 Producer 接收【事務訊息回查】
- 順序圖如下:
- 核心程式碼如下:
// 微信文章長度限制,請點選【閱讀原文】