1. 程式人生 > >分布式事務中間件 Fescar—RM 模塊源碼解讀

分布式事務中間件 Fescar—RM 模塊源碼解讀

auto connect ring manager 分布式系 dcb 架構 圖片 基礎上

前言
在SOA、微服務架構流行的年代,許多復雜業務上需要支持多資源占用場景,而在分布式系統中因為某個資源不足而導致其它資源占用回滾的系統設計一直是個難點。我所在的團隊也遇到了這個問題,為解決這個問題上,團隊采用的是阿裏開源的分布式中間件Fescar的解決方案,並詳細了解了Fescar內部的工作原理,解決在使用Fescar中間件過程中的一些疑慮的地方,也為後續團隊在繼續使用該中間件奠定理論基礎。

目前分布式事務解決方案基本是圍繞兩階段提交模式來設計的,按對業務是有侵入分為:對業務無侵入的基於XA協議的方案,但需要數據庫支持XA協議並且性能較低;對業務有侵入的方案包括:TCC等。Fescar就是基於兩階段提交模式設計的,以高效且對業務零侵入的方式,解決微服務場景下面臨的分布式事務問題。Fescar設計上將整體分成三個大模塊,即TM、RM、TC,具體解釋如下:

TM(Transaction Manager):全局事務管理器,控制全局事務邊界,負責全局事務開啟、全局提交、全局回滾。
RM(Resource Manager):資源管理器,控制分支事務,負責分支註冊、狀態匯報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。
TC(Transaction Coordinator):事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾。
技術分享圖片
本文將深入到Fescar的RM模塊源碼去介紹Fescar是如何在完成分支提交和回滾的基礎上又做到零侵入,進而極大方便業務方進行業務系統開發。

一、從配置開始解讀
技術分享圖片
上圖是Fescar源碼examples模塊dubbo-order-service.xml內的配置,數據源采用druid的DruidDataSource,但實際jdbcTemplate執行時並不是用該數據源,而用的是Fescar對DruidDataSource的代理DataSourceProxy,所以,與RM相關的代碼邏輯基本上都是從DataSourceProxy這個代理數據源開始的。

Fescar采用2PC來完成分支事務的提交與回滾,具體怎麽做到的呢,下面就分別介紹Phase1、Phase2具體做了些什麽。

二、Phase1—分支(本地)事務執行
Fescar將一個本地事務做為一個分布式事務分支,所以若幹個分布在不同微服務中的本地事務共同組成了一個全局事務,結構如下。
技術分享圖片

那麽,一個本地事務中SQL是如何執行呢?在Spring中,本質上都是從jdbcTemplate開始的,比如下面的SQL語句:

jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});

一般JdbcTemplate執行流程如下圖所示:
技術分享圖片

由於在配置中,JdbcTemplate數據源被配置成了Fescar實現DataSourceProxy,進而控制了後續的數據庫連接使用的是Fescar提供的ConnectionProxy,Statment使用的是Fescar實現的StatmentProxy,最終Fescar就順理成章地實現了在本地事務執行前後增加所需要的邏輯,比如:完成分支事務的快照記錄和分支事務執行狀態的上報等等。

DataSourceProxy獲取ConnectionProxy:
技術分享圖片

ConnectionProxy獲取StatmentProxy:
技術分享圖片

在獲取到StatmentProxy後,可以調用excute方法執行sql了
技術分享圖片

而真正excute實現邏輯如下:
技術分享圖片

首先會檢查當前本地事務是否處於全局事務中,如果不處於,直接使用默認的Statment執行,避免因引入Fescar導致非全局事務中的SQL執行性能下降。
解析Sql,有緩存機制,因為有些sql解析會比較耗時,可能會導致在應用啟動後剛開始的那段時間裏處理全局事務中的sql執行效率降低。
對於INSERT、UPDATE、DELETE、SELECT..FOR UPDATE這四種類型的sql會專門實現的SQL執行器進行處理,其它SQL直接是默認的Statment執行。
返回執行結果,如有異常則直接拋給上層業務代碼進行處理。
再來看一下關鍵的INSERT、UPDATE、DELETE、SELECT..FOR UPDATE這四種類型的sql如何執行的,先看一下具體類圖結構:
技術分享圖片

為結省篇幅,選擇UpdateExecutor實現源碼看一下,先看入口BaseTransactionalExecutor.execute,該方法將ConnectionProxy與Xid(事務ID)進行綁定,這樣後續判斷當前本地事務是否處理全局事務中只需要看ConnectionProxy中Xid是否為空。
技術分享圖片

然後,執行AbstractDMLBaseExecutor中實現的doExecute方法
技術分享圖片

基本邏輯如下:

先判斷是否為Auto-Commit模式
如果非Auto-Commit模式,則先查詢Update前對應行記錄的快照beforeImage,再執行Update語句,完成後再查詢Update後對應行記錄的快照afterImage,最後將beforeImage、afterImage生成UndoLog追加到Connection上下文ConnectionContext中。(註:獲取beforeImage、afterImage方法在UpdateExecutor類下,一般是構造一條Select...For Update語句獲取執行前後的行記錄,同時會檢查是否有全局鎖沖突,具體可參考源碼)
如果是Auto-Commit模式,先將提交模式設置成非自動Commit,再執行2中的邏輯,再執行connectionProxy.commit()方法,由於執行2過程和commit時都可能會出現全局鎖沖突問題,增加了一個循環等待重試邏輯,最後將connection的模式設置成Auto-Commit模式
如果本地事務執行過程中發生異常,業務上層會接收到該異常,至於是給TM模塊返回成功還是失敗,由業務上層實現決定,如果返回失敗,則TM裁決對全局事務進行回滾;如果本地事務執行過程未發生異常,不管是非Auto-Commit還是Auto-Commit模式,最後都會調用connectionProxy.commit()對本地事務進行提交,在這裏會創建分支事務、上報分支事務的狀態以及將UndoLog持久化到undo_log表中,具體代碼如下圖:
技術分享圖片
基本邏輯:

判斷當前本地事務是否處於全局事務中(也就判斷ConnectionContext中的xid是否為空)。
如果不處於全局事務中,則調用targetConnection對本地事務進行commit。
如果處於全局事務中,首先創建分支事務,再將ConnectionContext中的UndoLog寫入到undo_log表中,然後調用targetConnection對本地事務進行commit,將UndoLog與業務SQL一起提交,最後上報分支事務的狀態(成功 or 失敗),並將ConnectionContext上下文重置。
綜上所述,RM模塊通過對JDBC數據源進行代理,幹預業務SQL執行過程,加入了很多流程,比如業務SQL解析、業務SQL執行前後的數據快照查詢並組織成UndoLog、全局鎖檢查、分支事務註冊、UndoLog寫入並隨本地事務一起Commit、分支事務狀態上報等。通過這種方式,Fescar真正做到了對業務代碼無侵入,只需要通過簡單的配置,業務方就可以輕松享受Fescar所帶來的功能。Phase1整體流程引用Fescar官方圖總結如下:

技術分享圖片

三、Phase2-分支事務提交或回滾
階段2完成的是全局事物的最終提交或回滾,當全局事務中所有分支事務全部完成並且都執行成功,這時TM會發起全局事務提交,TC收到全全局事務提交消息後,會通知各分支事務進行提交;同理,當全局事務中所有分支事務全部完成並且某個分支事務失敗了,TM會通知TC協調全局事務回滾,進而TC通知各分支事務進行回滾。

在業務應用啟動過程中,由於引入了Fescar客戶端,RmRpcClient會隨應用一起啟動,該RmRpcClient采用Netty實現,可以接收TC消息和向TC發送消息,因此RmRpcClient是與TC收發消息的關鍵模塊。

public class RMClientAT {

public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    AsyncWorker asyncWorker = new AsyncWorker();
    asyncWorker.init();
    DataSourceManager.init(asyncWorker);
    rmRpcClient.setResourceManager(DataSourceManager.get());
    rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
    rmRpcClient.init();
}

}
上述代碼展示是的RmRpcClient初始化過程,有三個關鍵類RMHandlerAT、AsyncWorker和DataSourceManager。RMHandlerAT具有了分支提交和回滾兩個方法,分支提交或回滾的邏輯可以從這裏開始看;AsyncWorker是一個異步Worker,主要是完成分支事務異步提交的功能,具有失敗重試功能;DataSourceManager對數據源管理和維護。

下面分成兩部分來講:分支事務提交、分去事務回滾。

3.1、分支事務提交
在接收到TC發起的全局提交消息後,經RmRpcClient對通信協議的處理,再交由RMHandlerAT來完成對分支事務的提交,分支事務提交從RMHandlerAT.doBranchCommit()開始,但最後由AsyncWorker異步Worker完成,直接看AsyncWorker中的代碼實現:
技術分享圖片

分支事務提交關鍵邏輯在doBranchCommits方法中:
技術分享圖片

該方法主要是批量刪除UndoLog日誌,但並未使用ConnectionProxy去執行刪除SQL,可能原因是:1、完全沒必要 2、考慮效率優先

同樣,對於分支事務提交也引用Fescar官方一張圖來結尾:
技術分享圖片

3.2、分支事務回滾
同樣,分支事務回滾是從RMHandlerAT.doBranchRollback開始的,然後到了dataSourceManager.branchRollback,最後完成分支事務回滾邏輯的是UndoLogManager.undo方法。

@Override
protected void RMHandlerAT:doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);
BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
response.setBranchStatus(status);
LOGGER.info("AT Branch rollback result: " + status);
}

 @Override
public BranchStatus DataSourceManager:branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

UndoLogManager.undo方法源碼如下:
技術分享圖片
從上圖可以看出,整個回滾到全局事務之前狀態的代碼邏輯集中在如下代碼中:

AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
首先通過UndoExecutorFactory獲取到對應的UndoExecutor,然後再執行UndoExecutor的executeOn方法完成回滾操作。目前三種類型的UndoExecutor結構如下:
技術分享圖片

undoExecutor.executeOn源碼如下:
技術分享圖片

至此,整個分支事務回滾就結束了,分支事務回滾整體時序圖如下:

技術分享圖片

引入Fescar官方對分支事務回滾原理介紹圖作為結尾:

技術分享圖片

綜合上述,Fescar在Phase2通過UndoLog自動完成分支事務提交與回滾,在這個過程中不需要業務方做任何處理,業務方無感知,因些在該階段對業務代碼也是無侵入的。

四、總結
本文主要介紹了RM模塊的相關代碼,將RM模塊按2PC模式分成Phase1和Phase2分別進行介紹,從Fescar源碼上看,整個源碼結構清晰,有利於研發人員快速學習Fescar的原理。在使用方面,只需進行簡單的配置,就可以享受Fescar帶來的便捷功能,對業務做到了無侵入;同時在性能方面,Fescar在分支事務提交過程中采用異步模式,減少了全局鎖的占用時間,進而提升了整體性能。後續,將繼續學習Fescar的其它模塊(TM、TC)與全局鎖的實現邏輯,並做相關總結介紹。

分布式事務中間件 Fescar—RM 模塊源碼解讀