1. 程式人生 > 實用技巧 >分散式事務訊息

分散式事務訊息

事務訊息_訊息型別_功能與特性_產品簡介_訊息佇列 RocketMQ 版-阿里雲 https://help.aliyun.com/document_detail/43348.html

更新時間:2020-04-02 22:30:07

訊息佇列 RocketMQ 版提供的分散式事務訊息適用於所有對資料最終一致性有強需求的場景。本文介紹訊息佇列 RocketMQ 版事務訊息的概念、優勢、典型場景、互動流程以及使用過程中的注意事項。

概念介紹

  • 事務訊息:訊息佇列 RocketMQ 版提供類似 X/Open XA 的分散式事務功能,通過訊息佇列 RocketMQ 版事務訊息能達到分散式事務的最終一致。
  • 半事務訊息:暫不能投遞的訊息,傳送方已經成功地將訊息傳送到了訊息佇列 RocketMQ 版服務端,但是服務端未收到生產者對該訊息的二次確認,此時該訊息被標記成“暫不能投遞”狀態,處於該種狀態下的訊息即半事務訊息。
  • 訊息回查:由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,訊息佇列 RocketMQ 版服務端通過掃描發現某條訊息長期處於“半事務訊息”時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit 或是 Rollback),該詢問過程即訊息回查。

分散式事務訊息的優勢

訊息佇列 RocketMQ 版分散式事務訊息不僅可以實現應用之間的解耦,又能保證資料的最終一致性。同時,傳統的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關聯應用的不可用導致整體回滾,從而最大限度保證核心系統的可用性。在極端情況下,如果關聯的某一個應用始終無法處理成功,也只需對當前應用進行補償或資料訂正處理,而無需對整體業務進行回滾。

典型場景

在淘寶購物車下單時,涉及到購物車系統和交易系統,這兩個系統之間的資料最終一致性可以通過分散式事務訊息的非同步處理實現。在這種場景下,交易系統是最為核心的系統,需要最大限度地保證下單成功。而購物車系統只需要訂閱訊息佇列 RocketMQ 版的交易訂單訊息,做相應的業務處理,即可保證最終的資料一致性。

互動流程

事務訊息互動流程如下圖所示。

事務訊息傳送步驟如下:

  1. 傳送方將半事務訊息傳送至訊息佇列 RocketMQ 版服務端。
  2. 訊息佇列 RocketMQ 版服務端將訊息持久化成功之後,向傳送方返回 Ack 確認訊息已經發送成功,此時訊息為半事務訊息。
  3. 傳送方開始執行本地事務邏輯。
  4. 傳送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務訊息標記為可投遞,訂閱方最終將收到該訊息;服務端收到 Rollback 狀態則刪除半事務訊息,訂閱方將不會接受該訊息。

事務訊息回查步驟如下:

  1. 在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間後服務端將對該訊息發起訊息回查。
  2. 傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
  3. 傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半事務訊息進行操作。

注意事項

  1. 事務訊息的 Group ID 不能與其他型別訊息的 Group ID 共用。與其他型別的訊息不同,事務訊息有回查機制,回查時訊息佇列 RocketMQ 版服務端會根據 Group ID 去查詢客戶端。
  2. 通過ONSFactory.createTransactionProducer建立事務訊息的 Producer 時必須指定LocalTransactionChecker的實現類,處理異常情況下事務訊息的回查。
  3. 事務訊息傳送完成本地事務後,可在execute方法中返回以下三種狀態:
    • TransactionStatus.CommitTransaction:提交事務,允許訂閱方消費該訊息。
    • TransactionStatus.RollbackTransaction:回滾事務,訊息將被丟棄不允許消費。
    • TransactionStatus.Unknow:暫時無法判斷狀態,等待固定時間以後訊息佇列 RocketMQ 版服務端向傳送方進行訊息回查。
  4. 可通過以下方式給每條訊息設定第一次訊息回查的最快時間:
    Message message = new Message();
    // 在訊息屬性中新增第一次訊息回查的最快時間,單位秒。例如,以下設定實際第一次回查時間為 120 秒 ~ 125 秒之間
    message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
    // 以上方式只確定事務訊息的第一次回查的最快時間,實際回查時間向後浮動 0 秒 ~ 5 秒;如第一次回查後事務仍未提交,後續