1. 程式人生 > 實用技巧 >微服務架構下分散式事務方案(三)之 RocketMQ之事務訊息

微服務架構下分散式事務方案(三)之 RocketMQ之事務訊息

由於工作流引擎專案中,工作流引擎服務和業務服務是分開的,所以就涉及到了分散式事務的問題。綜合考慮到併發量和分散式事務的保障,最終選擇了事務訊息的方式。

首先我們來介紹下本地訊息表這種方案,當訊息佇列不支援事務訊息的時候,我們可以考慮這種方案。

本地訊息表

基本流程

1、A 系統在自己本地一個事務裡操作同時,插入一條資料到訊息表;
2、接著 A 系統將這個訊息傳送到 MQ 中去;
3、B 系統接收到訊息之後,在一個事務裡,往自己本地訊息表裡插入一條資料,同時執行其他的業務操作,如果這個訊息已經被處理過了,那麼此時這個事務會回滾,這樣保證不會重複處理訊息;
4、B 系統執行成功之後,就會更新自己本地訊息表的狀態以及 A 系統訊息表的狀態;
5、如果 B 系統處理失敗了,那麼就不會更新訊息表狀態,那麼此時 A 系統會定時掃描自己的訊息表,如果有未處理的訊息,會再次傳送到 MQ 中去,讓 B 再次處理;
6、這個方案保證了最終一致性,哪怕 B 事務失敗了,但是 A 會不斷重發訊息,直到 B 那邊成功為止。

備註:A的訊息表用於保證B正確消費了A傳送的訊息,B的訊息表用於保證不重複消費同一條訊息。

這個方案說實話最大的問題就在於嚴重依賴於資料庫的訊息表來管理事務,高平發場景下不好擴充套件,所以應用好像也不太多。
本地訊息表是 BASE 理論,是最終一致模型,適用於對一致性要求不高的。實現這個模型時需要注意重試的冪等。

聊聊可靠訊息最終一致性方案

這個方案的思路其實跟上面講的本地訊息表基本相同,但是不基於資料庫,而是基於MQ來實現事務,RocketMQ提供了事務訊息來支援這種方式。

基本原理

訊息傳送:
(A)傳送方將半事務訊息傳送至訊息佇列 RocketMQ 版服務端。
(B)訊息佇列 RocketMQ 服務端將訊息持久化成功之後,向傳送方返回 Ack 確認訊息已經發送成功,此時訊息為半事務訊息。
(C)傳送方開始執行本地事務邏輯。
(D)傳送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務訊息標記為可投遞,訂閱方最終將收到該訊息;服務端收到 Rollback 狀態則刪除半事務訊息,訂閱方將不會接受該訊息。

訊息回查:
(A)在斷網或者是應用重啟的特殊情況下,上述步驟 D 提交的二次確認最終未到達服務端,經過固定時間後RocketMQ服務端將對該訊息發起訊息回查。(mq會自動定時輪詢所有 prepared 訊息回撥你的介面,問你,這個訊息是不是本地事務處理失敗了,所有沒傳送確認的訊息,是繼續重試還是回滾?一般來說這裡你就可以查下資料庫看之前本地事務是否執行,如果回滾了,那麼這裡也回滾吧。這個就是避免可能本地事務執行成功了,而確認訊息卻傳送失敗了。)
(B)傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
(C)傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 D 對半事務訊息進行操作。

具體用法

在分散式訊息佇列中,目前唯一提供完整的事務訊息的,只有 RocketMQ 。

可能會有網友說,RabbitMQ 和 Kafka 也有事務訊息啊,也支援傳送事務訊息的傳送,以及後續的事務訊息的 commit提交或 rollbackc 回滾。但是要考慮一個極端的情況,在本地資料庫事務已經提交的時時候,如果因為網路原因,又或者崩潰等等意外,導致事務訊息沒有被 commit ,最終導致這條事務訊息丟失,分散式事務出現問題。

相比來說,RocketMQ 提供事務回查機制,如果應用超過一定時長未 commit 或 rollback 這條事務訊息,RocketMQ 會主動回查應用,詢問這條事務訊息是 commit 還是 rollback ,從而實現事務訊息的狀態最終能夠被 commit 或是 rollback ,達到最終事務的一致性。

// RocketMQ事務訊息監聽

@RocketMQTransactionListener(txProducerGroup = TX_PRODUCER_GROUP)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // ... local transaction process, return rollback, commit or unknown
        logger.info("[executeLocalTransaction][執行本地事務,訊息:{} arg:{}]", msg, arg);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // ... check transaction status and return rollback, commit or unknown
        logger.info("[checkLocalTransaction][回查訊息:{}]", msg);
        return RocketMQLocalTransactionState.COMMIT;
    }

}

一般來說,有兩種方式實現本地事務回查時,返回事務訊息的狀態。

第一種,通過msg訊息,獲得某個業務上的標識或者編號,然後去資料庫中查詢業務記錄,從而判斷該事務訊息的狀態是提交還是回滾。

第二種,記錄msg的事務編號,與事務狀態到資料庫中。

  • 第一步,在#executeLocalTransaction(...)方法中,先儲存一條idmsg的事務編號,狀態為RocketMQLocalTransactionState.UNKNOWN的記錄。
  • 第二步,呼叫帶有事務的業務 Service 的方法。在該 Service 方法中,在邏輯都執行成功的情況下,更新idmsg的事務編號,狀態變更為RocketMQLocalTransactionState.COMMIT。這樣,我們就可以伴隨這個事務的提交,更新idmsg的事務編號的記錄的狀為RocketMQLocalTransactionState.COMMIT,美滋滋。。
  • 第三步,要以try-catch的方式,呼叫業務 Service 的方法。如此,如果發生異常,回滾事務的時候,可以在catch中,更新idmsg的事務編號的記錄的狀態為RocketMQLocalTransactionState.ROLLBACK。極端情況下,可能更新失敗,則列印 error 日誌,告警知道,人工介入。
  • 如此三步之後,我們在#executeLocalTransaction(...)方法中,就可以通過查詢資料庫,idmsg的事務編號的記錄的狀態,然後返回。