1. 程式人生 > >RocketMQ事務訊息學習及刨坑過程

RocketMQ事務訊息學習及刨坑過程

一、背景

MQ元件是系統架構裡必不可少的一門利器,設計層面可以降低系統耦合度,高併發場景又可以起到削峰填谷的作用,從單體應用到叢集部署方案,再到現在的微服務架構,MQ憑藉其優秀的效能和高可靠性,得到了廣泛的認可。
隨著資料量增多,系統壓力變大,開始出現這種現象:資料庫已經更新了,但訊息沒發出來,或者訊息先發了,但後來資料庫更新失敗了,結果研發童鞋各種資料修復,這種生產問題出現的概率不大,但讓人很鬱悶。這個其實就是資料庫事務與MQ訊息的一致性問題,簡單來講,資料庫的事務跟普通MQ訊息傳送無法直接繫結與資料庫事務繫結在一起,例如上面提及的兩種問題場景:

  1. 資料庫事務提交後傳送MQ訊息;
  2. MQ訊息先發,然後再提交資料庫事務。

場景1的問題是資料庫事務可能剛剛提交,伺服器就宕機了,MQ訊息沒發出去,場景2的問題就是MQ訊息傳送出去了,但資料庫事務提交失敗,又沒辦法追加已經發出去的MQ訊息,結果導致資料沒更新,下游已經收到訊息,最終事務出現不一致的情況。

二、事務訊息的引出

我們以微服務架構的購物場景為例,參照一下RocketMQ官方的例子,使用者A發起訂單,支付100塊錢操作完成後,能得到100積分,賬戶服務和會員服務是兩個獨立的微服務模組,有各自的資料庫,按照上文提及的問題可能性,將會出現這些情況:

  • 如果先扣款,再發訊息,可能錢剛扣完,宕機了,訊息沒發出去,結果積分沒增加。
  • 如果先發訊息,再扣款,可能積分增加了,但錢沒扣掉,白送了100積分給人家。
  • 錢正常扣了,訊息也傳送成功了,但會員服務例項消費訊息出現問題,結果積分沒增加。

由此引出的是資料庫事務與MQ訊息的事務一致性問題,rocketmq事務訊息解決的問題:解決本地事務執行與訊息傳送的原子性問題。這裡界限一定要明白,是確保MQ生產端正確無誤地將訊息傳送出來,沒有多發,也不會漏發。但至於傳送後消費端有沒有正常的消費掉(如上面提及的第三種情況,錢正常扣了,訊息也發了,但下游消費出問題導致積分不對),這種異常場景將由MQ訊息消費失敗重試機制來保證,不在此次的討論範圍內。

常用的MQ元件針對此場景都有自己的實現方案,如ActiveMQ使用AMQP協議(二階提交方式)保證訊息正確傳送,這裡我們以RocketMQ為重點進行學習。

三、RocketMQ事務訊息設計思路

根據CAP理論,RocketMQ事務訊息通過非同步確保方式,保證事務的最終一致性。設計流程上借鑑兩階段提交理論,流程圖如下:

  1. 應用模組遇到要傳送事務訊息的場景時,先發送prepare訊息給MQ。
  2. prepare訊息傳送成功後,應用模組執行資料庫事務(本地事務)。
  3. 根據資料庫事務執行的結果,再返回Commit或Rollback給MQ。
  4. 如果是Commit,MQ把訊息下發給Consumer端,如果是Rollback,直接刪掉prepare訊息。
  5. 第3步的執行結果如果沒響應,或是超時的,啟動定時任務回查事務狀態(最多重試15次,超過了預設丟棄此訊息),處理結果同第4步。
  6. MQ消費的成功機制由MQ自己保證。

四、RocketMQ事務訊息實現流程

以RocketMQ 4.5.2版本為例,事務訊息有專門的一個佇列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare訊息都先往這裡放,當訊息收到Commit請求後,就把訊息再塞到真實的Topic佇列裡,供Consumer消費,同時向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一條訊息。簡易流程圖如下:

上述流程中,請允許我這樣劃分模組職責:

  1. RocketMQ Client即我們工程中匯入的依賴jar包,RocketMQ Broker端即部署的服務端,NameServer暫未體現。
  2. 應用模組成對出現,上游為事務訊息生產端,下游為事務訊息消費端(事務訊息對消費端是透明的,與普通訊息一致)。

應用模組的事務因為中斷,或是其他的網路原因,導致無法立即響應的,RocketMQ當做UNKNOW處理,RocketMQ事務訊息還提供了一個補救方案:定時查詢事務訊息的資料庫事務狀態
簡易流程圖如下:

五、原始碼剖析

講解的思路基本上按照如下流程圖,根據模組職責和流程逐一分析。

  1. 環境準備
    閱讀原始碼前需要在IDE上獲取和除錯RocketMQ的原始碼,這部分請自行查閱方法。

  2. 應用模組(事務訊息生產端)核心原始碼
    建立一個監聽類,實現TransactionListener介面,在實現的資料庫事務提交方法和回查事務狀態方法模擬結果。
/**
 * @program: rocket
 * @description: 除錯事務訊息示例程式碼
 * @author: Huang
 * @create: 2019-10-16
 **/
public class SelfTransactionListener implements TransactionListener {
   private AtomicInteger transactionIndex = new AtomicInteger(0);
   private AtomicInteger checkTimes = new AtomicInteger(0);

   private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   /**
    * 執行本地事務
    *
    * @param message
    * @param o
    * @return
    */
   @Override
   public LocalTransactionState executeLocalTransaction(Message message, Object o) {
      String msgKey = message.getKeys();
      System.out.println("start execute local transaction " + msgKey);
      LocalTransactionState state;
      if (msgKey.contains("1")) {
         // 第一條訊息讓他通過
         state = LocalTransactionState.COMMIT_MESSAGE;
      } else if (msgKey.contains("2")) {
         // 第二條訊息模擬異常,明確回覆回滾操作
         state = LocalTransactionState.ROLLBACK_MESSAGE;
      } else {
         // 第三條訊息無響應,讓它呼叫回查事務方法
         state = LocalTransactionState.UNKNOW;
         // 給剩下3條訊息,放1,2,3三種狀態
         localTrans.put(msgKey, transactionIndex.incrementAndGet());
      }
      System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());
      return state;
   }

   /**
    * 回查本地事務結果
    *
    * @param messageExt
    * @return
    */
   @Override
   public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
      String msgKey = messageExt.getKeys();
      System.out.println("start check local transaction " + msgKey);
      Integer state = localTrans.get(msgKey);
      switch (state) {
         case 1:
            System.out.println("check result unknown 回查次數" + checkTimes.incrementAndGet());
            return LocalTransactionState.UNKNOW;
         case 2:
            System.out.println("check result commit message, 回查次數" + checkTimes.incrementAndGet());
            return LocalTransactionState.COMMIT_MESSAGE;
         case 3:
            System.out.println("check result rollback message, 回查次數" + checkTimes.incrementAndGet());
            return LocalTransactionState.ROLLBACK_MESSAGE;

         default:
            return LocalTransactionState.COMMIT_MESSAGE;
      }
   }
}

事務訊息生產者程式碼示例,共傳送5條訊息,基本上包含全部的場景,休眠時間設定足夠的時間,保證回查事務時例項還在執行中,程式碼如下:

/**
 * @program: rocket
 * @description: Rocketmq事務訊息
 * @author: Huang
 * @create: 2019-10-16
 **/
public class TransactionProducer {

   public static void main(String[] args) {
      try {
         TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");
         producer.setNamesrvAddr("10.0.133.29:9876");
         producer.setTransactionListener(new SelfTransactionListener());
         producer.start();
         for (int i = 1; i < 6; i++) {
            Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" +  i).getBytes());
            try {
               SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" +  i);
               System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
            } catch (Exception e) {
               e.printStackTrace();
            }
         }
         Thread.sleep(Integer.MAX_VALUE);
         producer.shutdown();
      } catch (MQClientException e) {
         e.printStackTrace();
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }
}
  1. RocketMQ Client端程式碼,程式碼主要邏輯可以分成三段:第一段為設定訊息為prepare訊息,併發送給RocketMQ服務端
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
    sendResult = this.send(msg);
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}

第二段:訊息傳送成功後,呼叫應用模組資料庫事務方法,獲取事務結果(為節省篇幅,程式碼有刪節)

switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            if (null != localTransactionExecuter) {
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                log.debug("Used new transaction API");
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

第三段:傳送事務結果到RocketMQ端,結束事務,並響應結果給應用模組

try {
    this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
  1. RocketMQ Broker端事務提交/回滾操作(這裡取endTransaction部分)
    程式碼入口:org.apache.rocketmq.broker.processor.EndTransactionProcessor
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 修改訊息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實Topic
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 將訊息儲存到真實Topic中,供Consumer消費
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 將訊息儲存到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記為刪除狀態,事務訊息回查的定時任務中會做處理
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
  1. RocketMQ Broker端定時任務回查資料庫事務部分
    方法入口:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 超過15次的回查事務狀態失敗後,預設是丟棄此訊息
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

回查事務呼叫入口:

// 此段程式碼為TransactionalMessageServiceImpl類中的check方法
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
    || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
    || (valueOfCurrentMinusBorn <= -1
);

if (isNeedCheck) {
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 呼叫AbstractTransactionalMessageCheckListener的
    listener.resolveHalfMsg(msgExt);
} else {
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
        messageQueue, pullResult);
    continue;
}

// 此方法在AbstractTransactionalMessageCheckListener類中
public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

// 此方法在AbstractTransactionalMessageCheckListener類中
public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
    if (channel != null) {
        // 通過Netty傳送請求到RocketMQ Client端,執行checkTransactionState方法
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}

RocketMQ Client接收到服務端的請求後,重新呼叫回查資料庫事務方法,並將事務結果再次提交到RocketMQ Broker端
方法入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類的方法

try {
    if (transactionCheckListener != null) {
        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
    } else if (transactionListener != null) {
        log.debug("Used new check API in transaction message");
        localTransactionState = transactionListener.checkLocalTransaction(message);
    } else {
        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
    }
} catch (Throwable e) {
    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
    exception = e;
}

this.processTransactionState(
    localTransactionState,
    group,
    exception);

六、補充一個問題

官網有提及,事務訊息是不支援延遲訊息和批量訊息,我手賤試了一下延遲訊息,事務訊息設定一個DelayTimeLevel,結果這條訊息就一直無法從RMQ_SYS_TRANS_HALF_TOPIC移除掉了,應用模組的日誌發現在反覆地嘗試回查事務,Console介面上RMQ_SYS_TRANS_HALF_TOPIC的訊息查詢列表很快就超過2000條記錄了,為什麼?

我們回到程式碼層面進行分析,過程如下:

1.設定了DelayTimeLevel後,資料事務提交後(或是回查資料庫事務完成後),將訊息寫入目標Topic時,由於DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX,同時REAL_TOPIC變成RMQ_SYS_TRANS_HALF_TOPIC,真實的Topic在這個環節已經丟失。

// RocketMQ Broker端接受事務提交後的處理
org.apache.rocketmq.broker.processor.EndTransactionProcessor類
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 這裡呼叫CommitLog的putMessage方法
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 修改訊息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實Topic
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 將訊息儲存到真實Topic中,此時Topic已經變成SCHEDULE_TOPIC_XXXX
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 將訊息儲存到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記為刪除狀態,事務訊息回查的定時任務中會做處理
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
}

// 此段程式碼在org.apache.rocketmq.store.CommitLog類的putMessage方法中
// 由於DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }

        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

列印的日誌示例如下:

2019-10-17 14\:41\:05 INFO EndTransactionThread_4 - Transaction op message write successfully. messageId=0A00851D00002A9F0000000000000E09, queueId=0 
msgExt:MessageExt [queueId=0, storeSize=335, queueOffset=5, sysFlag=8, bornTimestamp=1571293959305, bornHost=/10.0.133.29:54634, storeTimestamp=1571294460555, 
storeHost=/10.0.133.29:10911, msgId=0A00851D00002A9F0000000000000E09, commitLogOffset=3593, bodyCRC=1849408413, reconsumeTimes=0, preparedTransactionOffset=0, 
toString()=Message{topic='SCHEDULE_TOPIC_XXXX', flag=0, properties={REAL_TOPIC=RMQ_SYS_TRANS_HALF_TOPIC, TRANSACTION_CHECK_TIMES=3, KEYS=msg-test-3, 
TRAN_MSG=true, UNIQ_KEY=0A00851D422C18B4AAC25584B0880000, WAIT=false, DELAY=1, PGROUP=transactionMQProducer, TAGS=transactionTest, REAL_QID=0}, 
body=[72, 101, 108, 108, 111, 84, 105, 109, 101, 58, 51], transactionId='null'}]

2.延遲訊息是定時任務觸發的,我剛剛設定的延遲是1秒,定時任務又把訊息重新放回RMQ_SYS_TRANS_HALF_TOPIC中,注意此時只有RMQ_SYS_TRANS_HALF_TOPIC有訊息,RMQ_SYS_TRANS_OP_HALF_TOPIC佇列是沒有這條訊息的,如下程式碼:

// 此段程式碼在org.apache.rocketmq.store.schedule.ScheduleMessageService類executeOnTimeup方法內
try {
    // 訊息重新回到RMQ_SYS_TRANS_HALF_TOPIC佇列中
    MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
    PutMessageResult putMessageResult =
        ScheduleMessageService.this.writeMessageStore
            .putMessage(msgInner);

    if (putMessageResult != null
        && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
        continue;
    } else {
        log.error(
            "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
            msgExt.getTopic(), msgExt.getMsgId());
        ScheduleMessageService.this.timer.schedule(
            new DeliverDelayedMessageTimerTask(this.delayLevel,
                nextOffset), DELAY_FOR_A_PERIOD);
        ScheduleMessageService.this.updateOffset(this.delayLevel,
            nextOffset);
        return;
    }
} catch (Exception e) {
    log.error(
        "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
            + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
            + offsetPy + ",sizePy=" + sizePy, e);
}

3.事務訊息定時任務啟動,查RMQ_SYS_TRANS_HALF_TOPIC有訊息,但RMQ_SYS_TRANS_OP_HALF_TOPIC沒有訊息,為了保證訊息順序寫入,又將此訊息重新填入RMQ_SYS_TRANS_OP_HALF_TOPIC中,並且觸發一次回查事務操作。示例程式碼如上文回查事務呼叫入口相同:

// 此段程式碼為TransactionalMessageServiceImpl類中的check方法
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
    || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
    || (valueOfCurrentMinusBorn <= -1
);

if (isNeedCheck) {
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    listener.resolveHalfMsg(msgExt);
} else {
    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
    log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
        messageQueue, pullResult);
    continue;
}

這樣構成了一個死迴圈,直到嘗試到15次才丟棄此訊息(預設最大嘗試次數是15次),這個代價有點大。針對此問題的優化,已經提交PR到RocketMQ社群,新版本釋出後,事務訊息將遮蔽DelayTimeLevel,這個問題就不會再出現了。

在新版本釋出之前,我們的解決辦法:

  1. 明確研發過程中事務訊息禁止設定DelayTimeLevel。
    感覺有風險,畢竟新來的童鞋,不是特別瞭解此部分功能的可能會手抖加上(像我最早那樣)。
  2. 對RocketMQ Client做一次簡單的封裝,比如提供一個rocketmq-spring-boot-starter,在提供傳送事務訊息的方法裡不提供設定的入口,如下示例:
/**
 * 事務訊息傳送
 * 不支援延遲傳送和批量傳送
 */
public void sendMessageInTransaction(String topic, String tag, Object message, String requestId) throws Exception {
   TransactionMQProducer producer = annotationScan.getProducer(topic + "_" + tag);
   producer.sendMessageInTransaction(MessageBuilder.of(topic, tag, message, requestId).build(), message);
}

應該靠譜一些,畢竟從源頭杜絕了DelayTimeLevel引數的設定。

七、結束語

本篇簡單介紹了事務訊息的解決的場景和職責的界限,基本的設計思路和流程,在此借鑑學習了RocketMQ作者的圖稿,然後挑了部分程式碼作簡要的講解,還是自己的刨坑過程,文章內有任何不正確或不詳盡之處請留言指導,謝謝。

專注Java高併發、分散式架構,更多技術乾貨分享與心得,請關注公眾號:Java架構社群

相關推薦

RocketMQ事務訊息學習過程

一、背景 MQ元件是系統架構裡必不可少的一門利器,設計層面可以降低系統耦合度,高併發場景又可以起到削峰填谷的作用,從單體應用到叢集部署方案,再到現在的微服務架構,MQ憑藉其優秀的效能和高可靠性,得到了廣泛的認可。 隨著資料量增多,系統壓力變大,開始出現這種現象:資料庫已經更新了,但訊息沒發出來,或者訊息先發了

java opc client 開源Utgard學習

最近的專案中需要運用到opc client。需要從opc server中獲取資料,或修改引數。主要運用的語言為java。 準備知識:opc server 協議 OPC DA: Data Access協議,是最基本的OPC協議。OPC DA伺服器本身不儲存資料,只負責顯示資

RocketMQ-事務訊息

1.RocketMQ有三種訊息     1)普通訊息     2)順序訊息     3)事務訊息 2.分散式事務-強調最終一致性而不是強一致性 單個應用,資料庫分庫分表了(跨資料庫) 多個應用,服務

搞懂分散式技術19:使用RocketMQ事務訊息解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。但這裡面有個問題:A是先update DB,

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。 但這裡面有個問題:A是先update D

RocketMQ原始碼分析之RocketMQ事務訊息實現原理上篇(二階段提交)

根據上文的描述,傳送事務訊息的入口為: TransactionMQProducer#sendMessageInTransaction: public TransactionSendResult sendMessageInTransaction(final Message msg, final Object

聯想拯救者(r720)安裝Ubuntu16.04過程+cuda+cudnn+opencv+tensorflow+caffe安裝pip,git網速慢問題

U盤安裝Ubuntu16.04LTS: 2)在磁碟管理器中壓縮出給ubuntu的空閒空間,確保該空間為未分配。我的筆記本是128gSSD+1tHDD,win10作為主系統放在SSD中,ubuntu裝在HDD中,其中對HDD壓縮了100G空間。 4)重啟電腦,按F2

RocketMQ原始碼分析之從官方示例窺探:RocketMQ事務訊息實現基本思想

RocketMQ4.3.0版本開始支援事務訊息,後續分享將開始將剖析事務訊息的實現原理。首先從官方給出的Demo例項入手,以此通往RocketMQ事務訊息的世界中。 官方版本未釋出之前,從apache rocketmq第一個版本上線後,程式碼中存在與事務訊息相關的程式碼,例如COMMIT、ROLLBACK、

rocketmq 事務訊息

轉自  https://mp.weixin.qq.com/s?__biz=MzU4NzU0MDIzOQ==&mid=2247484003&idx=1&sn=8b9b084f463c6a0bd0ce04d9ca670079&scene=19 近日,

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。

rocketmq事務訊息的理解

http://www.cnblogs.com/wxd0108/p/6038543.html RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問訊息,並修改狀態。細心的你可能又發現問題了,如果確認訊

rocketmq事務訊息入門介紹

說明 週五的時候發了篇:Rocketmq4.3支援事務啦!!!,趁著週末的時候把相關內容看了下,下面的主要內容就是關於RocketMQ事務相關內容介紹了。 說明:今天這篇僅僅是入門介紹,並沒有涉及到很多細節,先把大概流程說明白,後續再具體細節進行開篇說

SpringBoot整合RocketMQ事務訊息

pom.xml<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId>

整合RocketMQ事務訊息

一、 選擇RocketMQ原因 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ選型 二、 整合思路

關於阿里訊息佇列RocketMQ(安裝、使用和),你需要知道的事情

為什麼選擇RocketMQ Apache RocketMQ作為阿里開源的一款高效能、高吞吐量的分散式訊息中介軟體。因為阿里有海量的資料量,無數業務場景的應用,是RocketMQ搶盡風頭風頭,成為不可多得中介軟體專案,加上已經正式加入Apach俱樂部,作為頂級的開源專案! 一、關於

深入理解分散式事務(XArocketmq事務

深入理解分散式事務(XA及rocketmq事務) 釋出時間:2018-04-16 來源:網路 上傳者:使用者 關鍵字: 事務 分散式 RocketMq 深入 理解 發表文章 摘要:本文由碼農網–吳極心原創,轉載請看清文末的轉載要求

MyBatis的學習(二)——MyBatis事務核心物件配置

一、獲取SqlSession物件 MyBatis框架中涉及到的幾個API SqlSessionFactoryBuilder:該物件負責根據MyBatis配置檔案mybatis-config.xml構建SqlSessionFactory例項  負責生產session SqlSes

RocketMq-延遲訊息 程式碼實現

支援延遲訊息 RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 配置 開啟安裝目錄的

訊息中介軟體kafka(0.9以及0.10版本)學習實踐

目錄 一、介紹 二、特點 三、何時需要訊息佇列 四、元件 五、訊息傳送的流程 六、原理 七、實踐 八、版本比較

Hadoop偽分散式叢集搭建過程指南

一個偶然的機會,讓我進了hadoop這個坑。我不得不說,Google真是個非常厲害的公司。為計算機行業貢獻了很多亮瞎雙眼額技術。初入Hadoop一般都要了解HDFS,YARN,Mapreduce。現在來總結一下Hadoop分散式叢集的搭建過程。1.首先準備好相應的安裝包,同時