1. 程式人生 > >分散式事務原理及解決方案

分散式事務原理及解決方案

1 引言

分散式事務是企業整合中的一個技術難點,也是每一個分散式系統架構中都會涉及到的一個東西,特別是在這幾年越來越火的微服務架構中,幾乎可以說是無法避免,本文就圍繞單機事務,分散式事務以及分散式事務的處理方式來展開。

2 事務

事務提供一種“要麼什麼都不做,要麼做全套(All or Nothing)”的機制,她有ACID四大特性

  • 原子性(Atomicity):事務作為一個整體被執行,包含在其中的對資料庫的操作要麼全部被執行,要麼都不執行。
  • 一致性(Consistency):事務應確保資料庫的狀態從一個一致狀態轉變為另一個一致狀態。一致狀態是指資料庫中的資料應滿足完整性約束。除此之外,一致性還有另外一層語義,就是事務的中間狀態不能被觀察到(這層語義也有說應該屬於原子性)。
  • 隔離性(Isolation):多個事務併發執行時,一個事務的執行不應影響其他事務的執行,如同只有這一個操作在被資料庫所執行一樣。
  • 永續性(Durability):已被提交的事務對資料庫的修改應該永久儲存在資料庫中。在事務結束時,此操作將不可逆轉。

2.1 單機事務

以mysql的InnoDB儲存引擎為例,來了解單機事務是如何保證ACID特性的。

Image text
事務的隔離性是通過資料庫鎖的機制實現的,永續性通過redo log(重做日誌)來實現,原子性和一致性通過Undo log來實現。

2.2 分散式事務

單機事務是通過將操作限制在一個會話內通過資料庫本身的鎖以及日誌來實現ACID,那麼分散式環境下該如何保證ACID特性那?

2.2.1 XA協議實現分散式事務

2.2.1.1 XA描述

X/Open DTP(X/Open Distributed Transaction Processing Reference Model) 是X/Open 這個組織定義的一套分散式事務的標準,也就是了定義了規範和API介面,由各個廠商進行具體的實現。 X/Open DTP 定義了三個元件: AP,TM,RM

Image text

  • AP(Application Program):也就是應用程式,可以理解為使用DTP的程式
  • RM(Resource Manager):資源管理器,這裡可以理解為一個DBMS系統,或者訊息伺服器管理系統,應用程式通過資源管理器對資源進行控制。資源必須實現XA定義的介面
  • TM(Transaction Manager):事務管理器,負責協調和管理事務,提供給AP應用程式程式設計介面以及管理資源管理器

其中在DTP定義了以下幾個概念

  • 事務:一個事務是一個完整的工作單元,由多個獨立的計算任務組成,這多個任務在邏輯上是原子的
  • 全域性事務:對於一次性操作多個資源管理器的事務,就是全域性事務
  • 分支事務:在全域性事務中,某一個資源管理器有自己獨立的任務,這些任務的集合作為這個資源管理器的分支任務
  • 控制執行緒:用來表示一個工作執行緒,主要是關聯AP,TM,RM三者的一個執行緒,也就是事務上下文環境。簡單的說,就是需要標識一個全域性事務以及分支事務的關係

如果一個事務管理器管理著多個資源管理器,DTP是通過兩階段提交協議來控制全域性事務和分支事務。

  • 第一階段:準備階段 事務管理器通知資源管理器準備分支事務,資源管理器告之事務管理器準備結果
  • 第二階段:提交階段 事務管理器通知資源管理器提交分支事務,資源管理器告之事務管理器結果
2.2.1.2 XA的ACID特性
  • 原子性:XA議使用2PC原子提交協議來保證分散式事務原子性
  • 隔離性:XA要求每個RMs實現本地的事務隔離,子事務的隔離來保證整個事務的隔離。
  • 一致性:通過原子性、隔離性以及自身一致性的實現來保證“資料庫從一個一致狀態轉變為另一個一致狀態”;通過MVCC來保證中間狀態不能被觀察到。
2.2.1.3 XA的優缺點
  • 優點:
    對業務無侵入,對RM要求高
  • 缺點:
    同步阻塞:在二階段提交的過程中,所有的節點都在等待其他節點的響應,無法進行其他操作。這種同步阻塞極大的限制了分散式系統的效能。

    單點問題:協調者在整個二階段提交過程中很重要,如果協調者在提交階段出現問題,那麼整個流程將無法運轉。更重要的是,其他參與者將會處於一直鎖定事務資源的狀態中,而無法繼續完成事務操作。

    資料不一致:假設當協調者向所有的參與者傳送commit請求之後,發生了局部網路異常,或者是協調者在尚未傳送完所有 commit請求之前自身發生了崩潰,導致最終只有部分參與者收到了commit請求。這將導致嚴重的資料不一致問題。

    容錯性不好:如果在二階段提交的提交詢問階段中,參與者出現故障,導致協調者始終無法獲取到所有參與者的確認資訊,這時協調者只能依靠其自身的超時機制,判斷是否需要中斷事務。顯然,這種策略過於保守。換句話說,二階段提交協議沒有設計較為完善的容錯機制,任意一個節點是失敗都會導致整個事務的失敗。

2.2.2 TCC協議實現分散式事務

2.2.2.1 TCC描述

TCC(Try-Confirm-Cancel)分散式事務模型相對於 XA 等傳統模型,其特徵在於它不依賴資源管理器(RM)對分散式事務的支援,而是通過對業務邏輯的分解來實現分散式事務。

Image text

  • 第一階段:CanCommit

3PC的CanCommit階段其實和2PC的準備階段很像。協調者向參與者傳送commit請求,參與者如果可以提交就返回Yes響應,否則返回No響應。
事務詢問:協調者向參與者傳送CanCommit請求。詢問是否可以執行事務提交操作。然後開始等待參與者的響應
響應反饋:參與者接到CanCommit請求之後,正常情況下,如果其自身認為可以順利執行事務,則返回Yes響應,並進入預備狀態;否則反饋No。

  • 第二階段:PreCommit
    協調者在得到所有參與者的響應之後,會根據結果執行2種操作:執行事務預提交,或者中斷事務

執行事務預提交
傳送預提交請求:協調者向所有參與者節點發出 preCommit 的請求,並進入 prepared 狀態。
事務預提交:參與者受到 preCommit 請求後,會執行事務操作,對應 2PC 準備階段中的 “執行事務”,也會 Undo 和 Redo 資訊記錄到事務日誌中。
各參與者響應反饋:如果參與者成功執行了事務,就反饋 ACK 響應,同時等待指令:提交(commit) 或終止(abort)

中斷事務
傳送中斷請求:協調者向所有參與者節點發出 abort 請求 。
中斷事務:參與者如果收到 abort 請求或者超時了,都會中斷事務。

  • 第三階段:Do Commit
    該階段進行真正的事務提交,也可以分為以下兩種情況

執行提交
傳送提交請求:協調者接收到各參與者傳送的ACK響應,那麼他將從預提交狀態進入到提交狀態。並向所有參與者傳送 doCommit 請求。
事務提交:參與者接收到 doCommit 請求之後,執行正式的事務提交。並在完成事務提交之後釋放所有事務資源。
響應反饋:事務提交完之後,向協調者傳送 ACK 響應。
完成事務:協調者接收到所有參與者的 ACK 響應之後,完成事務。

中斷事務
協調者沒有接收到參與者傳送的 ACK 響應(可能是接受者傳送的不是ACK響應,也可能響應超時),那麼就會執行中斷事務。
傳送中斷請求:協調者向所有參與者傳送 abort 請求。
事務回滾:參與者接收到 abort 請求之後,利用其在階段二記錄的 undo 資訊來執行事務的回滾操作,並在完成回滾之後釋放所有的事務資源。
反饋結果:參與者完成事務回滾之後,向協調者傳送 ACK 訊息。
中斷事務:協調者接收到參與者反饋的 ACK 訊息之後,完成事務的中斷。

2.2.2.2 TCC的ACID特性
  • 原子性:TCC 模型也使用 2PC 原子提交協議來保證事務原子性。Try 操作對應2PC 的一階段準備(Prepare);Confirm 對應 2PC 的二階段提交(Commit),Cancel 對應 2PC 的二階段回滾(Rollback),可以說 TCC 就是應用層的 2PC。
  • 隔離性:隔離的本質是控制併發,放棄在資料庫層面加鎖通過在業務層面加鎖來實現。【比如在賬戶管理模組設計中,增加可用餘額和凍結金額的設定】
  • 一致性:通過原子性保證事務的原子提交、業務隔離性控制事務的併發訪問,實現分散式事務的一致性狀態轉變;事務的中間狀態不能被觀察到這點並不保證[本協議是基於柔性事務理論提出的]。
2.2.2.3 TCC的優缺點
  • 優點:
    相對於二階段提交,三階段提交主要解決的單點故障問題,並減少了阻塞的時間。因為一旦參與者無法及時收到來自協調者的資訊之後,他會預設執行 commit。而不會一直持有事務資源並處於阻塞狀態。
  • 缺點:
    三階段提交也會導致資料一致性問題。由於網路原因,協調者傳送的 Cancel 響應沒有及時被參與者接收到,那麼參與者在等待超時之後執行了 commit 操作。這樣就和其他接到 Cancel 命令並執行回滾的參與者之間存在資料不一致的情況。

2.2.3 SAGA協議實現分散式事務

2.2.3.1 SAGA協議介紹

Saga的組成:

  • 每個Saga由一系列sub-transaction Ti 組成
  • 每個Ti 都有對應的補償動作Ci,補償動作用於撤銷Ti造成的結果

saga的執行順序有兩種:

  • T1, T2, T3, ..., Tn
  • T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n

Saga定義了兩種恢復策略:

  • backward recovery,向後恢復,即上面提到的第二種執行順序,其中j是發生錯誤的sub-transaction,這種做法的效果是撤銷掉之前所有成功的sub-transation,使得整個Saga的執行結果撤銷。
  • forward recovery,向前恢復,適用於必須要成功的場景,執行順序是類似於這樣的:T1, T2, ..., Tj(失敗), Tj(重試),..., Tn,其中j是發生錯誤的sub-transaction。該情況下不需要Ci。

Saga的注意事項

  • Ti和Ci是冪等的。舉個例子,假設在執行Ti的時候超時了,此時我們是不知道執行結果的,如果採用forward recovery策略就會再次傳送Ti,那麼就有可能出現Ti被執行了兩次,所以要求Ti冪等。如果採用backward recovery策略就會發送Ci,而如果Ci也超時了,就會嘗試再次傳送Ci,那麼就有可能出現Ci被執行兩次,所以要求Ci冪等。
  • Ci必須是能夠成功的,如果無法成功則需要人工介入。如果Ci不能執行成功就意味著整個Saga無法完全撤銷,這個是不允許的。但總會出現一些特殊情況比如Ci的程式碼有bug、服務長時間崩潰等,這個時候就需要人工介入了
  • Ti - Ci和Ci - Ti的執行結果必須是一樣的:sub-transaction被撤銷了。舉例說明,還是考慮Ti執行超時的場景,我們採用了backward recovery,傳送一個Ci,那麼就會有三種情況:
    1:Ti的請求丟失了,服務之前沒有、之後也不會執行Ti
    2:Ti在Ci之前執行
    3:Ci在Ti之前執行
    對於第1種情況,容易處理。對於第2、3種情況,則要求Ti和Ci是可交換的(commutative),並且其最終結果都是sub-transaction被撤銷。

Saga架構

Image text

  • Saga Execution Component解析請求JSON並構建請求圖
  • TaskRunner 用任務佇列確保請求的執行順序
  • TaskConsumer 處理Saga任務,將事件寫入saga log,並將請求傳送到遠端服務
2.2.3.2 SAGA的ACID特性
  • 原子性:通過SAGA協調器實現
  • 一致性:本地事務+SAGA Log
  • 永續性:SAGA Log
  • 隔離性:不保證(同TCC)

3 分散式事務的處理方案

3.1 XA

僅在同一個事務上下文中需要協調多種資源(即資料庫,以及訊息主題或佇列)時,才有必要使用 X/Open XA 介面。資料庫接入XA需要使用XA版的資料庫驅動,訊息佇列要實現XA需要實現javax.transaction.xa.XAResource介面。

3.1.1 jotm的分散式事務

程式碼如下:

public class UserService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private LogDao logDao;
    @Transactional
    public void save(User user){
        userDao.save(user);
        logDao.save(user);
        throw new RuntimeException();
    }
}
@Resource
public class UserDao {
    @Resource(name="jdbcTemplateA")
    private JdbcTemplate jdbcTemplate;
    public void save(User user){
        jdbcTemplate.update("insert into user(name,age) values(?,?)",user.getName(),user.getAge());
    }
}
@Repository
public class LogDao {
    @Resource(name="jdbcTemplateB")
    private JdbcTemplate jdbcTemplate;
    public void save(User user){
        jdbcTemplate.update("insert into log(name,age) values(?,?)",user.getName(),user.getAge());
    }
}
複製程式碼

配置:

    <bean id="jotm" class="org.objectweb.jotm.Current" />
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="jotm" />
    </bean>
    <tx:annotation-driven transaction-manager="transactionManager"/>
    <!-- 配置資料來源 -->
    <bean id="dataSourceA" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource"  destroy-method="shutdown">
        <property name="dataSource">
            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
                <property name="transactionManager" ref="jotm" />
                <property name="driverName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8" />
            </bean>
        </property>
        <property name="user" value="xxx" />
        <property name="password" value="xxx" />
    </bean>
    <!-- 配置資料來源 -->
    <bean id="dataSourceB"   class="org.enhydra.jdbc.pool.StandardXAPoolDataSource"  destroy-method="shutdown">
        <property name="dataSource">
            <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
                <property name="transactionManager" ref="jotm" />
                <property name="driverName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://localhost:3306/test2?useUnicode=true&amp;characterEncoding=utf-8" />
            </bean>
        </property>
        <property name="user" value="xxx" />
        <property name="password" value="xxx" />
    </bean>
    <bean id="jdbcTemplateA" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceA" />
    </bean>
    <bean id="jdbcTemplateB" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceB" />
    </bean>
複製程式碼

使用到的JAR包:

  compile 'org.ow2.jotm:jotm-core:2.3.1-M1'
  compile 'org.ow2.jotm:jotm-datasource:2.3.1-M1'
  compile 'com.experlog:xapool:1.5.0'
複製程式碼

事務配置: 我們知道分散式事務中需要一個事務管理器即介面javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於jotm來說,他們的實現類都是Current

public class Current implements UserTransaction, TransactionManager

我們如果想使用分散式事務的同時,又想使用Spring帶給我們的@Transactional便利,就需要配置一個JtaTransactionManager,而該JtaTransactionManager是需要一個userTransaction例項的,所以用到了上面的Current,如下配置:

<bean id="jotm" class="org.objectweb.jotm.Current" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
    <property name="userTransaction" ref="jotm" />  
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
複製程式碼

同時上述StandardXADataSource是需要一個TransactionManager例項的,所以上述StandardXADataSource配置把jotm加了進去.

執行過程:

  • 第一步:事務攔截器開啟事務
    我們知道加入了@Transactional註解,同時開啟tx:annotation-driven,會對本物件進行代理,加入事務攔截器。在事務攔截器中,獲取javax.transaction.UserTransaction,這裡即org.objectweb.jotm.Current,然後使用它開啟事務,並和當前執行緒進行繫結,繫結關係資料存放在org.objectweb.jotm.Current中。
  • 第二步:使用jdbcTemplate進行業務操作
    dbcTemplateA要從dataSourceA中獲取Connection,和當前執行緒進行繫結,同時以對應的dataSourceA作為key。同時判斷當前執行緒是否含有事務,通過dataSourceA中的org.objectweb.jotm.Current發現當前執行緒有事務,則把Connection自動提交設定為false,同時將該連線納入當前事務中。
    jdbcTemplateB要從dataSourceB中獲取Connection,和當前執行緒進行繫結,同時以對應的dataSourceB作為key。同時判斷當前執行緒是否含有事務,通過dataSourceB中的org.objectweb.jotm.Current發現當前執行緒有事務,則把Connection自動提交設定為false,同時將該連線納入當前事務中。
  • 第三步:異常回滾 一旦丟擲異常,則需要進行事務的回滾操作。回滾就是將當前事務進行回滾,該事務的回滾會呼叫和它關聯的所有Connection的回滾。

3.1.2 Atomikos的分散式事務

程式碼同上,配置為:

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="springTransactionManager"/>

    <!-- 配置資料來源 -->
    <bean id="dataSourceC" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="XA1DBMS" />
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="URL">jdbc:mysql://localhost:3306/test?useUnicode=true&amp;characterEncoding=utf-8</prop>
                <prop key="user">xxx</prop>
                <prop key="password">xxx</prop>
            </props>
        </property>
        <property name="poolSize" value="3" />
        <property name="minPoolSize" value="3" />
        <property name="maxPoolSize" value="5" />
    </bean>

    <!-- 配置資料來源 -->
    <bean id="dataSourceD" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="XA2DBMS" />
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="URL">jdbc:mysql://localhost:3306/test2?useUnicode=true&amp;characterEncoding=utf-8</prop>
                <prop key="user">xxx</prop>
                <prop key="password">xxx</prop>
            </props>
        </property>
        <property name="poolSize" value="3" />
        <property name="minPoolSize" value="3" />
        <property name="maxPoolSize" value="5" />
    </bean>

    <bean id="jdbcTemplateC" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceC" />
    </bean>

    <bean id="jdbcTemplateD" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSourceD" />
    </bean>
複製程式碼

事務配置:
我們知道分散式事務中需要一個事務管理器即介面javax.transaction.TransactionManager、面向開發人員的javax.transaction.UserTransaction。對於Atomikos來說分別對應如下:

  • com.atomikos.icatch.jta.UserTransactionImp
  • com.atomikos.icatch.jta.UserTransactionManager 我們如果想使用分散式事務的同時,又想使用Spring帶給我們的@Transactional便利,就需要配置一個JtaTransactionManager,而該JtaTransactionManager是需要一個userTransaction例項的
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">  
    <property name="transactionTimeout" value="300" />  
</bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">  
    <property name="userTransaction" ref="userTransaction" />   
</bean>
<tx:annotation-driven transaction-manager="springTransactionManager"/>
複製程式碼

可以對比下jotm的案例配置jotm的分散式事務配置。可以看到jotm中使用的xapool中的StandardXADataSource是需要一個transactionManager的,而Atomikos使用的AtomikosNonXADataSourceBean則不需要。我們知道,StandardXADataSource中有了transactionManager就可以獲取當前執行緒的事務,同時把XAResource加入進當前事務中去,而AtomikosNonXADataSourceBean卻沒有,它是怎麼把XAResource加入進當前執行緒繫結的事務呢?這時候就需要可以通過靜態方法隨時獲取當前執行緒繫結的事務。 使用到的JAR包:

compile 'com.atomikos:transactions-jdbc:4.0.0M4'
複製程式碼

3.2 單機事務+同步回撥(非同步)

以訂單子系統和支付子系統為例,如下圖:

Image text

如上圖,payment是支付系統,trade是訂單系統,兩個系統對應的資料庫是分開的。支付完成之後,支付系統需要通知訂單系統狀態變更。
對於payment要執行的操作可以用偽程式碼表示如下:

begin tx;
  count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount
  if (count <= 0) return false
    update payment_record set status = paid where trade_id = ${tradeId}
commit;
複製程式碼

對於trade要執行的操作可以用偽程式碼表示如下:

begin tx;
  count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid
  if (count <= 0) return false
    do other things ...
commit;
複製程式碼

但是對於這兩段程式碼如何串起來是個問題,我們增加一個事務表,即圖中的tx_info,來記錄成功完成的支付事務,tx_info中需要有可以標示被支付系統處理狀態的欄位,為了和支付資訊一致,需要放入事務中,程式碼如下:

begin tx;
  count = update account set amount = amount - ${cash} where uid = ${uid} and amount >= amount
  if (count <= 0) return false
    update payment_record set status = paid where trade_id = ${tradeId}
    insert into tx_info values(${trade_id},${amount}...)
commit;
複製程式碼

支付系統邊界到此為止,接下來就是訂單系統輪詢訪問tx_info,拉取已經支付成功的訂單資訊,對每一條資訊都執行trade系統的邏輯,虛擬碼如下:

foreach trade_id in tx_info
  do trade_tx
  save tx_info.id to some store
複製程式碼

事無延遲取決於時間程式輪詢間隔,這樣我們做到了一致性,最終訂單都會在支付之後的最大時間間隔內完成狀態遷移。
當然,這裡也可以採用支付系統通過RPC方式同步通知訂單系統的方式來實現,處理狀態通過tx_info中的欄位來表示。
另外,交易系統每次拉取資料的起點以及消費記錄需要記錄下來,這樣才能不遺漏不重複地執行,所以需要增加一張表用於排重,即上圖中的tx_duplication。但是每次對tx_duplication表的插入要在trade_tx的事務中完成,虛擬碼如下:

begin tx;
  c = insert ignore tx_duplication values($trade_id...)
  if (c <= 0) return false
    count = update trade_record set status = paid where trade_id = ${trade_id} and status = unpaid
  if (count <= 0) return false
    do other things ...
commit;
複製程式碼

另外,tx_duplication表中trade_id表上必須有唯一鍵,這個算是結合之前的冪等篇來保證trade_tx的操作是冪等的。

3.3 MQ做中間表角色

在上面的方案中,tx_info表所起到的作用就是佇列作用,記錄一個系統的表更,作為通知給需要感知的系統的事件。而時間程式去拉取只是系統去獲取感興趣事件的一個方式,而對應交易系統的本地事務只是對應消費事件的一個過程。在這樣的描述下,這些功能就是一個MQ——訊息中介軟體。如下圖

Image text

這樣tx_info表的功能就交給了MQ,訊息消費的偏移量也不需要關心了,MQ會搞定的,但是tx_duplication還是必須存在的,因為MQ並不能避免訊息的重複投遞,這其中的原因有很多,主要是還是分散式的CAP造成的,再次不詳細描述。
這要求MQ必須支援事務功能,可以達到本地事務和訊息發出是一致性的,但是不必是強一致的。通常使用的方式如下的虛擬碼:

  sendPrepare();
  isCommit = local_tx()
  if (isCommit) sendCommit()
    else sendRollback()
複製程式碼

在做本地事務之前,先向MQ傳送一個prepare訊息,然後執行本地事務,本地事務提交成功的話,向MQ傳送一個commit訊息,否則傳送一個abort訊息,取消之前的訊息。MQ只會在收到commit確認才會將訊息投遞出去,所以這樣的形式可以保證在一切正常的情況下,本地事務和MQ可以達到一致性。
但是分散式存在異常情況,網路超時,機器宕機等等,比如當系統執行了local_tx()成功之後,還沒來得及將commit訊息傳送給MQ,或者說傳送出去了,網路超時了等等原因,MQ沒有收到commit,即commit訊息丟失了,那麼MQ就不會把prepare訊息投遞出去。如果這個無法保證的話,那麼這個方案是不可行的。針對這種情況,需要一個第三方異常校驗模組來對MQ中在一定時間段內沒有commit/abort 的訊息和發訊息的系統進行檢查,確認該訊息是否應該投遞出去或者丟棄,得到系統的確認之後,MQ會做投遞還是丟棄,這樣就完全保證了MQ和發訊息的系統的一致性,從而保證了接收訊息系統的一致性。
這個方案要求MQ的系統可用性必須非常高,至少要超過使用MQ的系統(推薦rocketmq,kafka都支援傳送預備訊息和業務回查),這樣才能保證依賴他的系統能穩定執行。

3.4 SAGA方案

專案地址:github.com/apache/serv… Saga處理場景是要求相關的子事務提供事務處理函式同時也提供補償函式。Saga協調器alpha會根據事務的執行情況向omega傳送相關的指令,確定是否向前重試或者向後恢復。

成功場景

成功場景下,每個事務都會有開始和有對應的結束事件。

Image text

異常場景

異常場景下,omega會向alpha上報中斷事件,然後alpha會向該全域性事務的其它已完成的子事務傳送補償指令,確保最終所有的子事務要麼都成功,要麼都回滾。

Image text

超時場景

超時場景下,已超時的事件會被alpha的定期掃描器檢測出來,與此同時,該超時事務對應的全域性事務也會被中斷。

Image text

例子

假設要租車、預訂酒店滿足分散式事務。
租車服務

@Service
class CarBookingService {
  private Map<Integer, CarBooking> bookings = new ConcurrentHashMap<>();

  @Compensable(compensationMethod = "cancel")
  void order(CarBooking booking) {
    booking.confirm();
    bookings.put(booking.getId(), booking);
  }

  void cancel(CarBooking booking) {
    Integer id = booking.getId();
    if (bookings.containsKey(id)) {
      bookings.get(id).cancel();
    }
  }

  Collection<CarBooking> getAllBookings() {
    return bookings.values();
  }

  void clearAllBookings() {
    bookings.clear();
  }
}
複製程式碼

酒店預訂

@Service
class HotelBookingService {
  private Map<Integer, HotelBooking> bookings = new ConcurrentHashMap<>();

  @Compensable(compensationMethod = "cancel")
  void order(HotelBooking booking) {
    if (booking.getAmount() > 2) {
      throw new IllegalArgumentException("can not order the rooms large than two");
    }
    booking.confirm();
    bookings.put(booking.getId(), booking);
  }

  void cancel(HotelBooking booking) {
    Integer id = booking.getId();
    if (bookings.containsKey(id)) {
      bookings.get(id).cancel();
    }
  }

  Collection<HotelBooking> getAllBookings() {
    return bookings.values();
  }

  void clearAllBookings() {
    bookings.clear();
  }
}
複製程式碼

主服務

@RestController
public class BookingController {

  @Value("${car.service.address:http://car.servicecomb.io:8080}")
  private String carServiceUrl;

  @Value("${hotel.service.address:http://hotel.servicecomb.io:8080}")
  private String hotelServiceUrl;

  @Autowired
  private RestTemplate template;

  @SagaStart
  @PostMapping("/booking/{name}/{rooms}/{cars}")
  public String order(@PathVariable String name,  @PathVariable Integer rooms, @PathVariable Integer cars) {
    template.postForEntity(
        carServiceUrl + "/order/{name}/{cars}",
        null, String.class, name, cars);

    postCarBooking();

    template.postForEntity(
        hotelServiceUrl + "/order/{name}/{rooms}",
        null, String.class, name, rooms);

    postBooking();

    return name + " booking " + rooms + " rooms and " + cars + " cars OK";
  }

  // This method is used by the byteman to inject exception here
  private void postCarBooking() {

  }

  // This method is used by the byteman to inject the faults such as the timeout or the crash
  private void postBooking() {

  }
}
複製程式碼

執行流程

  • 在Alpha目錄執行 mvn clean package -DskipTests -Pdemo
  • 執行 java -Dspring.profiles.active=prd -D"spring.datasource.url=jdbc:postgresql://{host_address}:5432/saga?useSSL=false" -jar alpha-server-{saga_version}-exec.jar
  • 在saga spring demo目錄執行 mvn clean package -DskipTests -Pdemo
  • java -Dserver.port=8081 -Dalpha.cluster.address={alpha_address}:8080 -jar hotel-{saga_version}-exec.jar
  • java -Dserver.port=8082 -Dalpha.cluster.address={alpha_address}:8080 -jar car-{saga_version}-exec.jar
  • java -Dserver.port=8083 -Dalpha.cluster.address={alpha_address}:8080 -Dcar.service.address={host_address}:8082 -Dhotel.service.address={host_address}:8081  -jar booking-{saga_version}-exec.jar[alpha_address不帶http其他地址要帶上http]

3.5 TCC方案

專案地址https://github.com/QNJR-GROUP/EasyTransaction[對比tcc-transaction,Hmily,ByteTCC來說EasyTransaction效能最好,壓測未發現錯誤], 當然你也可以使用上面提到的SAGA專案,也是支援TCC協議的。下面我們舉個例子來看TCC是如何處理業務邏輯的。

eg:訂單支付

  • 1:訂單服務->修改訂單狀態
  • 2:庫存服務->扣減庫存
  • 3:積分服務->增加積分
  • 4:倉庫服務->創建出庫單

try階段

  • 1:訂單服務->狀態變更為“UpDating”
  • 2:庫存服務->可用庫存減少1,凍結庫存增加1
  • 3:積分服務->積分不變,增加預備積分
  • 4:倉庫服務->創建出庫單,狀態設定為“UnKnown”

confirm階段

  • 1:訂單服務->狀態變更為“已支付”
  • 2:庫存服務->凍結庫存清零
  • 3:積分服務->積分增加,預備積分清零
  • 4:倉庫服務->狀態設定為“出庫單已建立”

cancel階段

  • 1:訂單服務->狀態變更為“已取消”
  • 2:庫存服務->可用庫存增加,凍結庫存清零
  • 3:積分服務->預備積分清零
  • 4:倉庫服務->狀態設定為“已取消”

4 小結

基本概念 優點 缺點
本地事務。事務由資源管理器(如DBMS)本地管理 嚴格的ACID 不具備分佈事務處理能力
全域性事務(DTP模型)
TX協議:應用或應用伺服器與事務管理器的介面
XA協議:全域性事務管理器與資源管理器的介面
嚴格的ACID 效率非常低
JTA:面向應用、應用伺服器與資源管理器的高層事務介面
JTS:JTA事務管理器的實現標準,向上支援JTA,向下通過CORBA OTS實現跨事務域的互操作性
EJB
簡單一致的程式設計模型
跨域分佈處理的ACID保證
DTP模型本身的侷限
缺少充分公開的大規模、高可用、密集事務應用的成功案例
基於MQ 訊息資料獨立儲存、獨立伸縮
降低業務系統與訊息系統間的耦合
一次訊息傳送需要兩次請求
業務處理服務需實現訊息狀態回查介面
二階段提交 原理簡單,實現方便 同步阻塞:在二階段提交的過程中,所有的節點都在等待其他節點的響應,無法進行其他操作。這種同步阻塞極大的限制了分散式系統的效能。
單點問題:協調者在整個二階段提交過程中很重要,如果協調者在提交階段出現問題,那麼整個流程將無法運轉。更重要的是,其他參與者將會處於一直鎖定事務資源的狀態中,而無法繼續完成事務操作。
資料不一致:假設當協調者向所有的參與者傳送commit請求之後,發生了局部網路異常,或者是協調者在尚未傳送完所有 commit請求之前自身發生了崩潰,導致最終只有部分參與者收到了commit請求。這將導致嚴重的資料不一致問題。
容錯性不好:如果在二階段提交的提交詢問階段中,參與者出現故障,導致協調者始終無法獲取到所有參與者的確認資訊,這時協調者只能依靠其自身的超時機制,判斷是否需要中斷事務。顯然,這種策略過於保守。換句話說,二階段提交協議沒有設計較為完善的容錯機制,任意一個節點是失敗都會導致整個事務的失敗。
TCC 相對於二階段提交,三階段提交主要解決的單點故障問題,並減少了阻塞的時間。因為一旦參與者無法及時收到來自協調者的資訊之後,他會預設執行 commit。而不會一直持有事務資源並處於阻塞狀態。 三階段提交也會導致資料一致性問題。由於網路原因,協調者傳送的 abort 響應沒有及時被參與者接收到,那麼參與者在等待超時之後執行了 commit 操作。這樣就和其他接到 abort 命令並執行回滾的參與者之間存在資料不一致的情況。
SAGA 簡單業務使用TCC需要修改原來業務邏輯,saga只需要新增一個補償動作
由於沒有預留動作所以不用擔心資源釋放的問題異常處理簡單
由於沒有預留動作導致補償處理麻煩

業務各有各的不同,有些業務能容忍短期不一致,有些業務的操作可以冪等,無論什麼樣的分散式事務解決方案都有其優缺點,沒有一個銀彈能夠適配所有。因此,業務需要什麼樣的解決方案,還需要結合自身的業務需求、業務特點、技術架構以及各解決方案的特性,綜合分析,才能找到最適合的方案。