1. 程式人生 > 程式設計 >如何基於RocketMQ的事務訊息特性實現分散式系統的最終一致性?

如何基於RocketMQ的事務訊息特性實現分散式系統的最終一致性?

前言

在這篇文章中我們將介紹RocketMQ的事務訊息相關的內容,並通過一些實踐和大家一起來探索下事務訊息如何解決分散式系統中的分散式事務問題。

事務訊息原理

事務訊息特性可以看作是兩階段協議的訊息實現方式,用以確保在以訊息中介軟體解耦的分散式系統中本地事務的執行和訊息的傳送,可以以原子的方式進行

舉個例子,以某網際網路公司的使用者餘額充值為例,因為有充返活動(充值100元贈送20元),優惠比較大,使用者Joe禁不住誘惑用支付寶向自己的餘額賬戶充值了100元,支付成功後Joe的餘額賬戶有了120元錢。

而該公司的關於使用者餘額充值的系統設計是這樣的:

在這個設計流程中,該公司通過自建支付系統完成使用者Joe的支付寶扣款操作,成功後需要更新支付流水的狀態,因為使用者的餘額賬戶系統與支付系統之間通過MQ解耦

了,所以支付系統在完成支付流水狀態更新後需要通過傳送MQ訊息到訊息中介軟體服務,然後使用者餘額系統作為消費者通過訊息消費的方式完成使用者餘額的增加操作。

這裡有個問題:“支付系統如何確保這筆餘額充值訊息一定會成功傳送到MQ,並且使用者餘額系統一定能處理成功呢”?如果支付系統在完成支付訂單狀態更新後,MQ訊息傳送失敗或者使用者餘額系統訊息處理失敗的話,都會導致Joe支付扣款成功,而自己的餘額賬戶卻沒到賬的情況發生。

為瞭解決這個問題,按照目前的系統設計是需要**“支付系統-MQ服務-使用者餘額系統”**三者的處理滿足資料的一致性要求。例如,如果支付系統感知到訊息傳送失敗後還可以進行重新投遞,從而確保支付系統與使用者餘額資料的最終一致性。

而上述問題就是事務訊息要解決的問題,在具體瞭解RocketMQ提供的事務訊息機制之前,我們先來看下在RocketMQ的早期版本不支援事務訊息,或者因為歷史原因選擇的訊息中介軟體本身就不支援事務訊息的情況下,一些大公司是怎麼解決這個問題的?

早期為了實現基於MQ非同步呼叫的多個服務間,業務邏輯執行要麼一起成功、要麼一起失敗,具備事務特點,通常會採用可靠訊息最終一致性方案,來實現分散式事務。還是以Joe充值這件事來舉例,可靠訊息方案實現過程如下:

在可靠訊息最終一致性方案中,為了實現分散式事務,需要確保上游服務本地事務的處理與MQ訊息的投遞具有原子性,也就是說上游服務本地事務處理成功後要確保訊息一定要成功投遞到MQ服務,否則訊息就不應該被投遞到MQ服務;同樣,被成功投遞到MQ服務的訊息,也一定要被下游服務成功處理,否則就需要重新投遞MQ訊息。

為了實現雙向的原子性,可靠訊息服務需要對訊息進行狀態標記,與此同時還需要對訊息進行狀態檢查,從而實現重新投遞及訊息狀態的最終一致性。核心流程說明如下

1、上游服務(支付系統)如何確保完成自身支付成功狀態更新後訊息100%的能夠投遞到下游服務(使用者餘額系統)指定的Topic中?

在這個流程中上游服務在進行本地資料庫事務操作前,會先傳送一個狀態為**“待確認”**的訊息至可靠訊息服務,而不是直接將訊息投遞到MQ服務的指定Topic。可靠訊息服務此時會將該訊息記錄到自身服務的訊息資料庫中(訊息狀態為->待確認),完成後可靠訊息服務會回撥上游服務表示收到了訊息,你們可以進行本地事務的操作了。

之後上游服務就會開啟本地資料庫事務執行業務邏輯操作,這裡支付系統就會將該筆支付訂單狀態更新為“已成功”。(注意,這裡只是舉個示例場景,在真正的實踐中一般是不會把支付訂單本身的狀態與業務端回撥放在一個事務流程中的,關於這部分的詳細說明我們在下面的場景說明中再討論)。

如果上游服務本地資料庫事務執行成功,則繼續向可靠訊息服務傳送訊息確認訊息,此時可靠訊息服務就會正式將訊息投遞到MQ服務,並且同時更新訊息資料庫中的訊息狀態為“已傳送”。(注意,這裡可靠訊息服務更新訊息狀態與投遞訊息至MQ也必須是在一個原子操作中,即訊息投遞成功則一定要將訊息狀態更新為“已傳送”,所以在程式設計的細節中,可靠訊息服務一般會先更新訊息狀態,然後再進行訊息投遞,這樣即使訊息投遞失敗,也可以對訊息狀態進行回滾->“待確認”,相反如果先進行訊息投遞再更新訊息狀態,可能就不好控制了)。

相反,如果上游本地資料庫事務執行失敗,則需要向可靠訊息服務傳送訊息刪除訊息,可靠訊息服務此時就會將訊息刪除,這樣就意味著事務在上游訊息投遞過程中就被回滾了,而流程也就此結束了,此時上游服務可以需要通過業務邏輯的設計進行重發,這個就不再分散式事務的討論範疇了。

說到這裡,大家可能會有疑問了!因為在上述描述中,即使上游服務本地資料庫事務執行成功了,但是在傳送確認訊息至可靠訊息服務的過程中,以及可靠訊息服務在投遞訊息至MQ服務的過程中,還是會存在失敗的風險,這樣的話還是會導致支付服務更新了狀態,但是使用者餘額系統連訊息都沒有收到的情況發生?

實際上,實現資料一致性是一個複雜的活。在這個方案中可靠訊息服務作為基礎性的服務除了執行正常的邏輯外,還得處理複雜的異常場景。在實現過程中可靠訊息服務需要啟動相應的後臺執行緒,不斷輪訓訊息的狀態,這裡會輪訓訊息狀態為**“待確認”的訊息,並判斷該訊息的狀態的持續時間是否超過了規定的時間,如果超過規定時間的訊息還處於“待確認”的狀態,就會觸發上游服務狀態詢問機制**。

可靠訊息服務就會呼叫上游服務提供的相關藉口,詢問這筆訊息的處理情況,如果這筆訊息在上游服務處理成功,則後臺執行緒就會繼續觸發上圖中的步驟5,更新訊息狀態為**“已傳送”**並投遞訊息至MQ服務;反之如果這筆訊息上游服務處理失敗,可靠訊息服務則會進行訊息刪除。通過這樣以上機制就確保了“上游服務本地事務成功處理+訊息成功投遞”處於一個原子操作了。

2、下游服務(使用者餘額系統)如何確保對MQ服務Topic訊息的消費100%都能處理成功?

在1的過程中,確保了上游服務邏輯處理與MQ訊息的投遞具備原子性,那麼當訊息被成功投遞到了MQ服務的指定Topic後,下游服務如何才能確保訊息的消費一定能被成功處理呢?

在正常的流程中,下游服務等待消費Topic的訊息並進行自身本地資料庫事務的處理,如果處理成功則會主動通知可靠訊息服務,可靠訊息服務此時就會將訊息的狀態更新為“已完成”;反之,處理失敗下游服務就無法再主動向可靠訊息服務傳送通知訊息了。

此時,與訊息投遞過程中的異常邏輯一樣,可靠訊息服務也會啟動相應的後臺執行緒,輪詢一直處於“**已傳送”**狀態的訊息,判斷狀態持續時間是否超過了規定時間,如果超時,可靠訊息服務就會再次向MQ服務投遞此訊息,從而確保訊息能被再次消費處理。(注意,也可能出現下游服務處理成功,但是通知訊息傳送失敗的情況,所以為了確保冪等,下游服務也需要在業務邏輯上做好相應的防重處理)。

RocketMQ事務訊息機制

在?面第2小節的內容中,我們演示了一個自編寫的中間服務+MQ來實現事務訊息的示例。但是在現實的工作場景中,開發和維護一套可靠訊息服務是一件很耗費資源和成本的事情,實際上,RocketMQ的最新版本(4.3.0+)中已經實現了可靠訊息服務的所有功能,並且在保證高併發、高可用、高效能方面做了更為優秀的架構實現。

從設計邏輯上看RocketMQ所支援的分散式事務特性與上節中闡述的可靠訊息服務基本上是一致的。只是RocketMQ在實現上相比較於可靠訊息服務而言做了更為複雜的設計,並且因為天然與MQ服務本身緊密結合,所以在高可用、可靠性、效能等方面直接繼承了MQ服務本身的架構優勢。

下面我們就結合流程並通過示例程式碼的分析來和大家一起理解下利用RocketMQ是如何實現分散式事務操作的?

在應用場景中分散式服務通過MQ通訊的過程中,傳送訊息的一方我們稱之為Producer,接收消費訊息的一方我們稱之為Consumer。如果Producer自身業務邏輯本地事務執行成功與否希望和訊息的傳送保持一個原子性(也就是說如果Producer本地事務執行成功,那麼這筆訊息就一定要被成功的傳送到RocketMQ服務的指定Topic,並且Consumer一定要被消費成功;反之,如果Producer本地事務執行失敗,那麼這筆訊息就應該被RocketMQ伺服器丟棄)的話,RocketMQ是怎麼做的呢?

1、Producer選擇使用RockerMQ提供的事務訊息方法向RocketMQ服務傳送事務訊息(設定訊息屬性***TRAN_MSG=TRUE***);

2、RocketMQ服務端在收到訊息後會判斷訊息的屬性是否為事務訊息,如果是普通訊息就直接Push給Consumer;如果是事務訊息就會對該訊息進行特殊處理設定事務ID,並暫時設定該訊息對Consumer不可見,之後向Producer返回Pre訊息傳送狀態*(*SEND_OK)。

3、之後Producer就會開始執行本地事務邏輯,並設定本地事務處理狀態後向RocketMQ伺服器傳送該事務訊息的確認/回滾訊息(COMMIT_MESSAGE/ROLLBACK_MESSAGE)

4、RocketMQ伺服器根據該筆事務訊息的本地事務執行狀態決定是否將訊息Push給Consumer還是刪除該訊息。

5、之後Consumer就會消費該訊息,執行Consumer的本地事務邏輯,如果執行成功則向RocketMQ返回“CONSUME_SUCCESS”;反之出現異常則需要返回“RECONSUME_LATER”,以便RocketMQ再次Push該訊息,這一點在實際程式設計中需要控制好。

正常情況下以上就是RocketMQ事務訊息的基本執行流程了,但是從異常情況考慮,理論上也是存在Producer遲遲不傳送確認或回滾訊息的情況。與可靠訊息服務一樣,RocketMQ服務端也會設定後臺執行緒去掃描訊息狀態,之後會呼叫Producer的本地**checkLocalTransaction函式獲取本地事務狀態後繼續進行第3步操作。

相信看到這裡,大家對於RocketMQ的分散式事務訊息的理解應該有了一個相對清晰的概念了,那麼在程式碼中如何編寫呢?

在開發中使用RocketMQ的分散式事務訊息Consumer的程式碼不需要有什麼特別的變化與普通訊息Consumer程式碼一致就可以。

Consumer示例程式碼:

public static void main(String[] args) throws InterruptedException,MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT");

        // Specify name server addresses.
        consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("PAY_ACCOUNT","*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println(new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
複製程式碼

主要的改變是在Producer程式碼,我們需要額外編寫一個實現執行本地事務邏輯,以及檢查本地事務狀態的類。示例程式碼如下:

public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String,Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg,Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(),status);
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
複製程式碼

Producer示例程式碼:

public class TransactionProducerTest {
    public static void main(String[] args) throws MQClientException,InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT");
        producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

        ExecutorService executorService = new ThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2000),new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA","TagB","TagC","TagD","TagE"};

        try {
            Map<String,String> paramMap = new HashMap<>();
            paramMap.put("type","6");
            paramMap.put("bizOrderId","15414012438257823");
            paramMap.put("payOrderId","15414012438257823");
            paramMap.put("amount","10");
            paramMap.put("userId","200001");
            paramMap.put("tradeType","charge");
            paramMap.put("financeStatus","0");//財務狀態,應收
            paramMap.put("channel","a");//餘額
            paramMap.put("tradeTime","20190101202022");
            paramMap.put("nonce_str","xkdkskskdksk");

            //拼湊訊息體
            Message msg = new Message("PAY_ACCOUNT","pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg,null);
            System.out.printf("%s%n",sendResult);

            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        Thread.sleep(10*1000);
        producer.shutdown();
    }
}
複製程式碼

與非事務訊息直接呼叫RocketMQ Client的send方法不同,事務訊息傳送需要設定事務監聽器類,並呼叫*sendMessageInTransaction方法,*而這個方法的具體邏輯也就是上述流程中描述的那樣,具體大家可以看下。

以上程式碼只是示例程式碼,在實際的專案中我們是需要進行一些封裝設計的,以便與專案上下文環境整合。例如對於Springboot專案,我們一般會編寫一個stater工程進行整合。

場景說明

目前RocketMQ訊息中介軟體的使用場景比較廣泛,對於需要通過MQ進行非同步解耦的分散式應用系統來說,RocketMQ無疑是一個不錯的技術選擇。接下來,我們就以對資料一致性要求非常高的分散式支付系統為例,來看看基於RocketMQ的事務訊息適用於哪些特定場景,從而實現支付系統資料的高度一致性。

事實上,支付系統的資料一致性是一個複雜的問題,原因在於支付流程的各個環節都存在非同步的不確定性,例如支付系統需要跟第三方渠道進行互動,不同的支付渠道互動流程存在差異,並且有非同步支付結果回撥的情況。

除此以外,支付系統內部本身又是由多個不同子系統組成,除核心支付系統外,還有賬務系統、商戶通知系統等等,而核心支付系統本身也會被拆分為多個不同的服務模組,如風控、路由等用以實現不同的功能邏輯。某些場景我們無法通過分散式事務來實現資料一致性,只能通過額外的業務補償手段,如二次輪訓、支付對賬等來實現資料最終一致性

綜上所述,支付系統是一個複雜的系統,要完全實現資料的一致性單靠某一種手段是無法實現的,大部分情況下我們可以通過額外的業務補償邏輯來實現資料最終一致性,只是這樣補償邏輯需要以更多的業務開發邏輯為代價,並且在時效性上會存在延遲的問題。

舉個例子,支付核心系統支付成功後會更新自己的訂單狀態為支付成功,整個核心交易流程是一個比較實時同步的場景,如果出現資料不一致,會有額外的補償邏輯如二次支付訂單狀態輪詢、T+1日對賬等用以確保支付狀態資料的最終一致性。但是除了核心支付外,支付成功的結果是需要通知到支付賬務系統、以及業務端系統,而為了確保效能,一般後續的通知就不會與主流程一樣設計成實時同步,而是通過MQ非同步解耦傳送訊息給獨立的**“通知響應模組”,而“通知響應模組”此時就可以通過分散式事務訊息來與支付賬戶系統、業務端等系統實現資料一致性,從而減少需要補償手段處理的範圍,提高系統的資料一致性等級和靈敏度**。