1. 程式人生 > >MQ傳送事務訊息

MQ傳送事務訊息

MQ事務訊息互動流程如下:

MQ事務訊息互動流程

傳送事務訊息包含以下兩個步驟:

  1. 傳送半訊息及執行本地事務

    package com.alibaba.webx.TryHsf.app1;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    public class TransactionProducerClient {
     private final static Logger log = ClientLogger.getLog(); // 使用者需要設定自己的log, 記錄日誌便於排查問題
     public static void main(String[] args) throws InterruptedException {
         final BusinessService businessService = new BusinessService(); // 本地業務Service
         Properties properties = new Properties();
         properties.put(PropertyKeyConst.ProducerId, ""); // 您在控制檯建立的Producer ID
         properties.put(PropertyKeyConst.AccessKey, ""); // 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
         properties.put(PropertyKeyConst.SecretKey, ""); // 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
         //PropertyKeyConst.ONSAddr地址請根據實際情況對應以下幾類進行輸入:
         //公共雲生產環境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         //公共雲公測環境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         //杭州金融雲環境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         //杭州深圳雲環境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
         //亞太東南1公共雲環境(只適用於新加坡ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
                 new LocalTransactionCheckerImpl());
         producer.start();
         Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
         // 輸入您在控制檯建立的Topic
         SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
             @Override
             public TransactionStatus execute(Message msg, Object arg) {
                 // 訊息ID(有可能訊息體一樣,但訊息ID不一樣, 當前訊息ID在控制檯無法查詢)
                 String msgId = msg.getMsgID();
                 // 訊息體內容進行crc32, 也可以使用其它的如MD5
                 long crc32Id = HashUtil.crc32Code(msg.getBody());
                 // 訊息ID和crc32id主要是用來防止訊息重複
                 // 如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
                 // 如果要求訊息絕對不重複, 推薦做法是對訊息體body使用crc32或md5來防止重複訊息 
                 Object businessServiceArgs = new Object();
                 TransactionStatus transactionStatus = TransactionStatus.Unknow;
                 try {
                     boolean isCommit =
                             businessService.execbusinessService(businessServiceArgs);
                     if (isCommit) {
                         // 本地事務成功、提交訊息
                         transactionStatus = TransactionStatus.CommitTransaction;
                     } else {
                         // 本地事務失敗、回滾訊息
                         transactionStatus = TransactionStatus.RollbackTransaction;
                     }
                 } catch (Exception e) {
                     log.error("Message Id:{}", msgId, e);
                 }
                 System.out.println(msg.getMsgID());
                 log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
                 return transactionStatus;
             }
         }, null);
         // demo example 防止程序退出(實際使用不需要這樣)
         TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
     }
    }

     

  2. 提交事務訊息狀態

    當本地事務執行完成(執行成功或執行失敗),需要通知伺服器當前訊息的事務狀態。通知方式有以下兩種:

    • 執行本地事務完成後提交
    • 執行本地事務一直沒提交狀態,等待伺服器回查訊息的事務狀態

    事務狀態有以下三種:

    • TransactionStatus.CommitTransaction 提交事務,允許訂閱方消費該訊息。
    • TransactionStatus.RollbackTransaction 回滾事務,訊息將被丟棄不允許消費。
    • TransactionStatus.Unknow 無法判斷狀態,期待 MQ Broker 向傳送方再次詢問該訊息對應的本地事務的狀態。
      import com.alibaba.rocketmq.client.producer.LocalTransactionState;
       public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
          private final static Logger log = ClientLogger.getLog();
          final  BusinessService businessService = new BusinessService();
          @Override
          public TransactionStatus check(Message msg) {
              //訊息ID(有可能訊息體一樣,但訊息ID不一樣, 當前訊息屬於Half 訊息,所以訊息ID在控制檯無法查詢)
              String msgId = msg.getMsgID();
              //訊息體內容進行crc32, 也可以使用其它的方法如MD5
              long crc32Id = HashUtil.crc32Code(msg.getBody());
              //訊息ID、訊息本 crc32Id主要是用來防止訊息重複
              //如果業務本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來做冪等
              //如果要求訊息絕對不重複, 推薦做法是對訊息體使用crc32或md5來防止重複訊息.
              //業務自己的引數物件, 這裡只是一個示例, 實際需要使用者根據情況來處理
              Object businessServiceArgs = new Object();
              TransactionStatus transactionStatus = TransactionStatus.Unknow;
              try {
                  boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
                  if (isCommit) {
                      //本地事務已成功、提交訊息
                      transactionStatus = TransactionStatus.CommitTransaction;
                  } else {
                      //本地事務已失敗、回滾訊息
                      transactionStatus = TransactionStatus.RollbackTransaction;
                  }
              } catch (Exception e) {
                  log.error("Message Id:{}", msgId, e);
              }
              log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
              return transactionStatus;
          }
       }

       

  3. 工具類
    import java.util.zip.CRC32;
    public class HashUtil {
        public static long crc32Code(byte[] bytes) {
            CRC32 crc32 = new CRC32();
            crc32.update(bytes);
            return crc32.getValue();
        }
    }

     

事務回查機制說明

  1. 傳送事務訊息為什麼必須要實現回查 Check 機制?

    當步驟(1)中 Half 訊息傳送完成,但本地事務返回狀態為 TransactionStatus.Unknow,或者應用退出導致本地事務未提交任何狀態時,從 MQ Broker 的角度看,這條 Half 狀態的訊息的狀態是未知的。因此 MQ Broker 會定期要求傳送方能 Check 該 Half 狀態訊息,並上報其最終狀態。

  2. Check 被回撥時,業務邏輯都需要做些什麼?

    MQ 事務訊息的 check 方法裡面,應該寫一些檢查事務一致性的邏輯。MQ 傳送事務訊息時需要實現 LocalTransactionChecker 介面,用來處理 MQ Broker 主動發起的本地事務狀態回查請求;因此在事務訊息的 Check 方法中,需要完成兩件事情:

    (1) 檢查該 Half 訊息對應的本地事務的狀態(commited or rollback);

    (2) 向 MQ Broker 提交該 Half 訊息本地事務的狀態。