1. 程式人生 > 其它 >分散式事務解決方案之可靠訊息最終一致性

分散式事務解決方案之可靠訊息最終一致性

什麼是可靠訊息最終一致性事務

可靠訊息最終一致性方案是指當事務發起方執行完成本地事務後併發出一條訊息,事務參與方(訊息消費者)一定能
夠接收訊息並處理事務成功,此方案強調的是隻要訊息發給事務參與方最終事務要達到一致。
此方案是利用訊息中介軟體完成,如下圖:
事務發起方(訊息生產方)將訊息發給訊息中介軟體,事務參與方從訊息中介軟體接收訊息,事務發起方和訊息中介軟體
之間,事務參與方(訊息消費方)和訊息中介軟體之間都是通過網路通訊,由於網路通訊的不確定性會導致分散式事
務問題。


可靠訊息最終一致性方案要解決以下幾個問題:

1.本地事務與訊息傳送的原子性問題

本地事務與訊息傳送的原子性問題即:事務發起方在本地事務執行成功後訊息必須發出去,否則就丟棄訊息。即實
現本地事務和訊息傳送的原子性,要麼都成功,要麼都失敗。本地事務與訊息傳送的原子性問題是實現可靠訊息最
終一致性方案的關鍵問題。
先來嘗試下這種操作,先發送訊息,再操作資料庫:

begin transaction;
//1.傳送MQ
//2.資料庫操作
commit transation;
這種情況下無法保證資料庫操作與傳送訊息的一致性,因為可能傳送訊息成功,資料庫操作失敗。
你立馬想到第二種方案,先進行資料庫操作,再發送訊息:

begin transaction;
//1.資料庫操作
//2.傳送MQ
commit transation;
這種情況下貌似沒有問題,如果傳送MQ訊息失敗,就會丟擲異常,導致資料庫事務回滾。但如果是超時異常,數
據庫回滾,但MQ其實已經正常傳送了,同樣會導致不一致。

2、事務參與方接收訊息的可靠性

事務參與方必須能夠從訊息佇列接收到訊息,如果接收訊息失敗可以重複接收訊息。

3、訊息重複消費的問題

由於網路2的存在,若某一個消費節點超時但是消費成功,此時訊息中介軟體會重複投遞此訊息,就導致了訊息的重
復消費。
要解決訊息重複消費的問題就要實現事務參與方的方法冪等性。

解決方案

(1)本地訊息表方案

本地訊息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證資料業務操作和訊息的一致性,然後
通過定時任務將訊息傳送至訊息中介軟體,待確認訊息傳送給消費方成功再將訊息刪除。
下面以註冊送積分為例來說明:
下例共有兩個微服務互動,使用者服務和積分服務,使用者服務負責新增使用者,積分服務負責增加積分。互動流程如下:


1、使用者註冊

使用者服務在本地事務新增使用者和增加 ”積分訊息日誌“。(使用者表和訊息表通過本地事務保證一致)
下邊是虛擬碼

begin transaction;
//1.新增使用者
//2.儲存積分訊息日誌
commit transation;
這種情況下,本地資料庫操作與儲存積分訊息日誌處於同一個事務中,本地資料庫操作與記錄訊息日誌操作具備原子性。

2、定時任務掃描日誌

如何保證將訊息傳送給訊息佇列呢?
經過第一步訊息已經寫到訊息日誌表中,可以啟動獨立的執行緒,定時對訊息日誌表中的訊息進行掃描併發送至訊息中介軟體,在訊息中介軟體反饋傳送成功後刪除該訊息日誌,否則等待定時任務下一週期重試。

3、消費訊息

如何保證消費者一定能消費到訊息呢?
這裡可以使用MQ的ack(即訊息確認)機制,消費者監聽MQ,如果消費者接收到訊息並且業務處理完成後向MQ傳送ack(即訊息確認),此時說明消費者正常消費訊息完成,MQ將不再向消費者推送訊息,否則消費者會不斷重試向消費者來發送訊息。
積分服務接收到”增加積分“訊息,開始增加積分,積分增加成功後向訊息中介軟體迴應ack,否則訊息中介軟體將重複投遞此訊息。
由於訊息會重複投遞,積分服務的”增加積分“功能需要實現冪等性。

(2)RocketMQ事務訊息方案

RocketMQ 是一個來自阿里巴巴的分散式訊息中介軟體,於 2012 年開源,並在 2017 年正式成為 Apache 頂級專案。據瞭解,包括阿里雲上的訊息產品以及收購的子公司在內,阿里集團的訊息產品全線都執行在 RocketMQ 之上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之後的版本正式支援事務訊息,為分散式事務實現提供了便利性支援。

RocketMQ 事務訊息設計則主要是為了解決 Producer 端的訊息傳送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與producer 端的雙向通訊能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ本身提供的儲存機制為事務訊息提供了持久化能力;RocketMQ 的高可用機制以及可靠訊息設計則為事務訊息在系統發生異常時依然能夠保證達成事務的最終一致性。

在RocketMQ 4.3後實現了完整的事務訊息,實際上其實是對本地訊息表的一個封裝,將本地訊息表移動到了MQ內部,解決 Producer 端的訊息傳送與本地事務執行的原子性問題。


執行流程如下:

為方便理解我們還以註冊送積分的例子來描述 整個流程。
Producer 即MQ傳送方,本例中是使用者服務,負責新增使用者。MQ訂閱方即訊息消費方,本例中是積分服務,負責新增積分。

1、Producer 傳送事務訊息
Producer (MQ傳送方)傳送事務訊息至MQ Server,MQ Server將訊息狀態標記為Prepared(預備狀態),注意此時這條訊息消費者(MQ訂閱方)是無法消費到的。
本例中,Producer 傳送 ”增加積分訊息“ 到MQ Server。

2、MQ Server迴應訊息傳送成功
MQ Server接收到Producer 傳送給的訊息則迴應傳送成功表示MQ已接收到訊息。

3、Producer 執行本地事務
Producer 端執行業務程式碼邏輯,通過本地資料庫事務控制。
本例中,Producer 執行新增使用者操作。

4、訊息投遞
若Producer 本地事務執行成功則自動向MQServer傳送commit訊息,MQ Server接收到commit訊息後將”增加積
分訊息“ 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費訊息;
若Producer 本地事務執行失敗則自動向MQServer傳送rollback訊息,MQ Server接收到rollback訊息後 將刪除”增加積分訊息“ 。
MQ訂閱方(積分服務)消費訊息,消費成功則向MQ迴應ack,否則將重複接收訊息。這裡ack預設自動迴應,即
程式執行正常則自動迴應ack。

5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞訊息。
以上主幹流程已由RocketMQ實現,對使用者側來說,使用者需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可。
RoacketMQ提供RocketMQLocalTransactionListener介面:

public interface RocketMQLocalTransactionListener {undefined
/**
‐ 傳送prepare訊息成功此方法被回撥,該方法用於執行本地事務
‐ @param msg 回傳的訊息,利用transactionId即可獲取到該訊息的唯一Id
‐ @param arg 呼叫send方法時傳遞的引數,當send時候若有額外的引數可以傳遞到send方法中,這裡能獲取到
‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回撥
/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/*
‐ @param msg 通過獲取transactionId來判斷這條訊息的本地事務執行狀態
‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回撥
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
傳送事務訊息:
以下是RocketMQ提供用於傳送事務訊息的API:

TransactionMQProducer producer = new TransactionMQProducer(“ProducerGroup”);
producer.setNamesrvAddr(“127.0.0.1:9876”);
producer.start();
//設定TransactionListener實現
producer.setTransactionListener(transactionListener);
//傳送事務訊息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);