Fescar分散式事務實現原理解析探祕
前言
fescar釋出已有時日,分散式事務一直是業界備受關注的領域,fescar釋出一個月左右便受到了近5000個star足以說明其熱度。當然,在fescar出來之前,已經有比較成熟的分散式事務的解決方案開源了,比較典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型無侵入事務,目前lcn已發展到5.0,已支援和fescar事務模型類似的TCX型事務。還有如TCC型事務實現hmily(https://github.com/yu199195/hmily)、tcc-transaction(https://github.com/changmingxie/tcc-transaction)等。在微服務架構流行的當下、阿里這種開源大戶背景下,fescar的釋出無疑又掀起了研究分散式事務的熱潮。fescar脫胎於阿里雲商業分散式事務服務GTS,在線上環境提供這種公共服務其模式肯定經受了非常嚴苛的考驗。其分散式事務模型TXC又仿於傳統事務模型XA方案,主要區別在於資源管理器的定位一個在應用層一個在資料庫層。博主覺得fescar的txc模型實現非常有研究的價值,所以今天我們來好好翻一翻fescar專案的程式碼。本文篇幅較長,瀏覽並理解本文大概耗時30~60分鐘左右。
專案地址
fescar:https://github.com/alibaba/fescar
本博文所述程式碼為fescar的0.1.2-SNAPSHOT版本,根據fescar後期的迭代計劃,其專案結構和模組實現都可能有很大的改變,特此說明。
fescar的TXC模型
上圖為fescar官方針對TXC模型製作的示意圖。不得不說大廠的圖製作的真的不錯,結合示意圖我們可以看到TXC實現的全貌。TXC的實現通過三個元件來完成。也就是上圖的三個深黃色部分,其作用如下,:
- TM:全域性事務管理器,在標註開啟fescar分散式事務的服務端開啟,並將全域性事務傳送到TC事務控制端管理
- TC:事務控制中心,控制全域性事務的提交或者回滾。這個元件需要獨立部署維護,目前只支援單機版本,後續迭代計劃會有叢集版本
- RM:資源管理器,主要負責分支事務的上報,本地事務的管理
一段話簡述其實現過程:服務起始方發起全域性事務並註冊到TC。在呼叫協同服務時,協同服務的事務分支事務會先完成階段一的事務提交或回滾,並生成事務回滾的undo_log日誌,同時註冊當前協同服務到TC並上報其事務狀態,歸併到同一個業務的全域性事務中。此時若沒有問題繼續下一個協同服務的呼叫,期間任何協同服務的分支事務回滾,都會通知到TC,TC在通知全域性事務包含的所有已完成一階段提交的分支事務回滾。如果所有分支事務都正常,最後回到全域性事務發起方時,也會通知到TC,TC在通知全域性事務包含的所有分支刪除回滾日誌。在這個過程中為了解決寫隔離和度隔離的問題會涉及到TC管理的全域性鎖。
本博文的目標是深入程式碼細節,探究其基本思路是如何實現的。首先會從專案的結構來簡述每個模組的作用,繼而結合官方自帶的examples例項來探究整個分散式事務的實現過程。
專案結構解析
專案拉下來,用IDE開啟後的目錄結構如下,下面先大致的看下每個模組的實現
- common :公共元件,提供常用輔助類,靜態變數、擴充套件機制類載入器、以及定義全域性的異常等
- config : 配置載入解析模組,提供了配置的基礎介面,目前只有檔案配置實現,後續會有nacos等配置中心的實現
- core : 核心模組主要封裝了TM、RM和TC通訊用RPC相關內容
- dubbo :dubbo模組主要適配dubbo通訊框架,使用dubbo的filter機制來傳遞全域性事務的資訊到分支
- examples :簡單的演示例項模組,等下從這個模組入手探索
- rm-datasource :資源管理模組,比較核心的一個模組,個人認為這個模組命名為core要更合理一點。代理了JDBC的一些類,用來解析sql生成回滾日誌、協調管理本地事務
- server : TC元件所在,主要協調管理全域性事務,負責全域性事務的提交或者回滾,同時管理維護全域性鎖。
- spring :和spring整合的模組,主要是aop邏輯,是整個分散式事務的入口,研究fescar的突破口
- tm : 全域性事務事務管理模組,管理全域性事務的邊界,全域性事務開啟回滾點都在這個模組控制
通過【examples】模組的例項看下效果
第一步、先啟動TC也就是【Server】模組,main方法直接啟動就好,預設服務埠8091
第二步、回到examples模組,將訂單,業務,賬戶、倉庫四個服務的配置檔案配置好,主要是mysql資料來源和zookeeper連線地址,這裡要注意下,預設dubbo的zk註冊中心依賴沒有,啟動的時候回拋找不到class的異常,需要新增如下的依賴:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
第三步、在BusinessServiceImpl中的模擬拋異常的地方打個斷點,依次啟動OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四個服務、等進斷點後,檢視資料庫account_tbl表,金額已減去400元,變成了599元。然後放開斷點、BusinessServiceImpl模組模擬的異常觸發,全域性事務回滾,account_tbl表的金額就又回滾到999元了
如上,我們已經體驗到fescar事務的控制能力了,下面我們具體看下它是怎麼控制的。
fescar事務過程分析
首先分析配置檔案
這個是一個鐵律,任何一個技術或框架要整合,配置檔案肯定是一個突破口。從上面的例子我們瞭解到,例項模組的配置檔案中配置了一個全域性事務掃描器例項,如:
<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner"> <constructor-arg value="dubbo-demo-app"/> <constructor-arg value="my_test_tx_group"/> </bean>
這個例項在專案啟動時會掃描所有例項,具體實現見【spring】模組。並將標註了@GlobalTransactional註解的方法織入GlobalTransactionalInterceptor的invoke方法邏輯。同時應用啟動時,會初始化TM(TmRpcClient)和RM(RmRpcClient)的例項,這個時候,服務已經和TC事務控制中心勾搭上了。在往下看就涉及到TM模組的事務模板類TransactionalTemplate。
【TM】模組啟動全域性事務
全域性事務的開啟,提交、回滾都被封裝在TransactionalTemplate中完成了,程式碼如:
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException { // 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 2. begin transaction try { tx.begin(business.timeout(), business.name()); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } Object rs = null; try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3. any business exception, rollback. try { tx.rollback(); // 3.1 Successfully rolled back throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); } catch (TransactionException txe) { // 3.2 Failed to rollback throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex); } } // 4. everything is fine, commit. try { tx.commit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } return rs; }
更詳細的實現在【TM】模組中被分成了兩個Class實現,如下:
DefaultGlobalTransaction :全域性事務具體的開啟,提交、回滾動作
DefaultTransactionManager :負責使用TmRpcClient向TC控制中心傳送指令,如開啟全域性事務(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滾(GlobalRollbackRequest)、查詢狀態(GlobalStatusRequest)等。
以上是TM模組核心內容點,TM模組完成全域性事務開啟後,接下來就開始看看全域性事務iD,xid是如何傳遞、RM元件是如何介入的
【dubbo】全域性事務xid的傳遞
首先是xid的傳遞,目前已經實現了dubbo框架實現的微服務架構下的傳遞,其他的像spring cloud和motan等的想要實現也很容易,通過一般RPC通訊框架都有的filter機制,將xid從全域性事務的發起節點傳遞到服務協從節點,從節點接收到後繫結到當前執行緒上線文環境中,用於在分支事務執行sql時判斷是否加入全域性事務。fescar的實現見【dubbo】模組如下:
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100) public class TransactionPropagationFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String xid = RootContext.getXID(); String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]"); } boolean bind = false; if (xid != null) { RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid); } else { if (rpcXid != null) { RootContext.bind(rpcXid); bind = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind[" + rpcXid + "] to RootContext"); } } } try { return invoker.invoke(invocation); } finally { if (bind) { String unbindXid = RootContext.unbind(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("unbind[" + unbindXid + "] from RootContext"); } if (!rpcXid.equalsIgnoreCase(unbindXid)) { LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid); if (unbindXid != null) { RootContext.bind(unbindXid); LOGGER.warn("bind [" + unbindXid + "] back to RootContext"); } } } } } }
上面程式碼rpcXid不為空時,就加入到了RootContext的ContextCore中,這裡稍微深入講下。ContextCore是一個可擴充套件實現的介面,目前預設的實現是ThreadLocalContextCore,基於ThreadLocal來儲存維護當前的xid。這裡fescar提供了可擴充套件的機制,實現在【common】模組中,通過一個自定義的類載入器EnhancedServiceLoader載入需要擴充套件的服務類,這樣只需要在擴充套件類加上@LoadLevel註解。標記order屬性宣告高優先級別,就可以達到擴充套件實現的目的。
【RM】模組本地資源管理的介入
fescar針對本地事務相關的介面,通過代理機制都實現了一遍代理類,如資料來源(DataSourceProxy)、ConnectionProxy、StatementProxy等。這個在配置檔案中也可以看出來,也就是說,我們要使用fescar分散式事務,一定要配置fescar提供的代理資料來源。如:
配置好代理資料來源後,從DataSourceProxy出發,本地針對資料庫的所有操作過程我們就可以隨意控制了。從上面xid傳遞,已經知道了xid被儲存在RootContext中了,那麼請看下面的程式碼,就非常清楚了:
首先看StatementProxy的一段程式碼
在看ExecuteTemplate中的程式碼
和【TM】模組中的事務管理模板類TransactionlTemplate類似,這裡非常關鍵的邏輯代理也被封裝在了ExecuteTemplate模板類中。因重寫了Statement有了StatementProxy實現,在執行原JDBC的executeUpdate方法時,會呼叫到ExecuteTemplate的execute邏輯。在sql真正執行前,會判斷RootCOntext當前上下文中是否包含xid,也就是判斷當前是否是全域性分散式事務。如果不是,就直接使用本地事務,如果是,這裡RM就會增加一些分散式事務相關的邏輯了。這裡根據sql的不同的型別,fescar封裝了五個不同的執行器來處理,分別是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,結構如下圖:
PlainExecutor:
原生的JDBC介面實現,未做任何處理,提供給全域性事務中的普通的select查詢使用
UpdateExecutor、DeleteExecutor、InsertExecutor:
三個DML增刪改執行器實現,主要在sql執行的前後對sql語句進行了解析,實現瞭如下兩個抽象介面方法:
protected abstract TableRecords beforeImage() throws SQLException; protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
在這個過程中通過解析sql生成了提供回滾操作的undo_log日誌,日誌目前是儲存在msyql中的,和業務sql操作共用同一個事務。表的結構如下:
rollback_info儲存的undo_log詳細資訊,是longblob型別的,結構如下:
{
"branchId":3958194,
"sqlUndoLogs":[
{
"afterImage":{
"rows":[
{
"fields":[
{
"keyType":"PrimaryKey",
"name":"ID",
"type":4,
"value":10
},
{
"keyType":"NULL",
"name":"COUNT",
"type":4,
"value":98
}
]
}
],
"tableName":"storage_tbl"
},
"beforeImage":{
"rows":[
{
"fields":[
{
"keyType":"PrimaryKey",
"name":"ID",
"type":4,
"value":10
},
{
"keyType":"NULL",
"name":"COUNT",
"type":4,
"value":100
}
]
}
],
"tableName":"storage_tbl"
},
"sqlType":"UPDATE",
"tableName":"storage_tbl"
}
],
"xid":"192.168.7.77:8091:3958193"
}
這裡貼的是一個update的操作,undo_log記錄的非常的詳細,通過全域性事務xid關聯branchid,記錄資料操作的表名,操作欄位名,以及sql執行前後的記錄數,如這個記錄,表名=storage_tbl,sql執行前ID=10,count=100,sql執行後id=10,count=98。如果整個全域性事務失敗,需要回滾的時候就可以生成:
update storage_tbl set count = 100 where id = 10;
這樣的回滾sql語句執行了。
SelectForUpdateExecutor:
fescar的AT模式在本地事務之上預設支援讀未提交的隔離級別,但是通過SelectForUpdateExecutor執行器,可以支援讀已提交的隔離級別。程式碼如:
@Override public Object doExecute(Object... args) throws Throwable { SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer; Connection conn = statementProxy.getConnection(); ResultSet rs = null; Savepoint sp = null; LockRetryController lockRetryController = new LockRetryController(); boolean originalAutoCommit = conn.getAutoCommit(); StringBuffer selectSQLAppender = new StringBuffer("SELECT "); selectSQLAppender.append(getTableMeta().getPkName()); selectSQLAppender.append(" FROM " + getTableMeta().getTableName()); String whereCondition = null; ArrayList<Object> paramAppender = new ArrayList<>(); if (statementProxy instanceof ParametersHolder) { whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender); } else { whereCondition = recognizer.getWhereCondition(); } if (!StringUtils.isEmpty(whereCondition)) { selectSQLAppender.append(" WHERE " + whereCondition); } selectSQLAppender.append(" FOR UPDATE"); String selectPKSQL = selectSQLAppender.toString(); try { if (originalAutoCommit) { conn.setAutoCommit(false); } sp = conn.setSavepoint(); rs = statementCallback.execute(statementProxy.getTargetStatement(), args); while (true) { // Try to get global lock of those rows selected Statement stPK = null; PreparedStatement pstPK = null; ResultSet rsPK = null; try { if (paramAppender.isEmpty()) { stPK = statementProxy.getConnection().createStatement(); rsPK = stPK.executeQuery(selectPKSQL); } else { pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL); for (int i = 0; i < paramAppender.size(); i++) { pstPK.setObject(i + 1, paramAppender.get(i)); } rsPK = pstPK.executeQuery(); } TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK); statementProxy.getConnectionProxy().checkLock(selectPKRows); break; } catch (LockConflictException lce) { conn.rollback(sp); lockRetryController.sleep(lce); } finally { if (rsPK != null) { rsPK.close(); } if (stPK != null) { stPK.close(); } if (pstPK != null) { pstPK.close(); } } } } finally { if (sp != null) { conn.releaseSavepoint(sp); } if (originalAutoCommit) { conn.setAutoCommit(true); } } return rs; }
關鍵程式碼見:
TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK); statementProxy.getConnectionProxy().checkLock(selectPKRows);
通過selectPKRows表操作記錄拿到lockKeys,然後到TC控制器端查詢是否被全域性鎖定了,如果被鎖定了,就重新嘗試,直到鎖釋放返回查詢結果。
分支事務的註冊和上報
在本地事務提交前,fescar會註冊和上報分支事務相關的資訊,見ConnectionProxy類的commit部分程式碼:
@Override public void commit() throws SQLException { if (context.inGlobalTransaction()) { try { register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e); } try { if (context.hasUndoLog()) { UndoLogManager.flushUndoLogs(this); } targetConnection.commit(); } catch (Throwable ex) { report(false); if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } report(true); context.reset(); } else { targetConnection.commit(); } }
從這段程式碼我們可以看到,首先是判斷是了是否是全域性事務,如果不是,就直接提交了,如果是,就先向TC控制器註冊分支事務,為了寫隔離,在TC端會涉及到全域性鎖的獲取。然後儲存了用於回滾操作的undo_log日誌,繼而真正提交本地事務,最後向TC控制器上報事務狀態。此時,階段一的本地事務已完成了。
【server】模組協調全域性
關於server模組,我們可以聚焦在DefaultCoordinator這個類,這個是AbstractTCInboundHandler控制處理器預設實現。主要實現了全域性事務開啟,提交,回滾,狀態查詢,分支事務註冊,上報,鎖檢查等介面,如:
回到一開始的TransactionlTemplate,如果整個分散式事務失敗需要回滾了,首先是TM向TC發起回滾的指令,然後TC接收到後,解析請求後會被路由到預設控制器類的doGlobalRollback方法內,最終在TC控制器端執行的程式碼如下:
@Override public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException { for (BranchSession branchSession : globalSession.getReverseSortedBranches()) { BranchStatus currentBranchStatus = branchSession.getStatus(); if (currentBranchStatus == BranchStatus.PhaseOne_Failed) { continue; } try { BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(), branchSession.getResourceId(), branchSession.getApplicationData()); switch (branchStatus) { case PhaseTwo_Rollbacked: globalSession.removeBranch(branchSession); LOGGER.error("Successfully rolled back branch " + branchSession); continue; case PhaseTwo_RollbackFailed_Unretryable: GlobalStatus currentStatus = globalSession.getStatus(); if (currentStatus.name().startsWith("Timeout")) { globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed); } else { globalSession.changeStatus(GlobalStatus.RollbackFailed); } globalSession.end(); LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed"); return; default: LOGGER.info("Failed to rollback branch " + branchSession); if (!retrying) { queueToRetryRollback(globalSession); } return; } } catch (Exception ex) { LOGGER.info("Exception rollbacking branch " + branchSession, ex); if (!retrying) { queueToRetryRollback(globalSession); if (ex instanceof TransactionException) { throw (TransactionException) ex; } else { throw new TransactionException(ex); } } } } GlobalStatus currentStatus = globalSession.getStatus(); if (currentStatus.name().startsWith("Timeout")) { globalSession.changeStatus(GlobalStatus.TimeoutRollbacked); } else { globalSession.changeStatus(GlobalStatus.Rollbacked); } globalSession.end(); }
如上程式碼可以看到,回滾時從全域性事務會話中迭代每個分支事務,然後通知每個分支事務回滾。分支服務接收到請求後,首先會被路由到RMHandlerAT中的doBranchRollback方法,繼而呼叫了RM中的branchRollback方法,程式碼如下:
@Override public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { UndoLogManager.undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
RM分支事務端最後執行的是UndoLogManager的undo方法,通過xid和branchid從資料庫查詢出回滾日誌,完成資料回滾操作,整個過程都是同步完成的。如果全域性事務是成功的,TC也會有類似的上述協調過程,只不過是非同步的將本次全域性事務相關的undo_log清除了而已。至此,就完成了2階段的提交或回滾,也就完成了完整的全域性事務事務的控制。
結語
如果你看到這裡,那麼非常感謝你,在繁忙工作之餘耐心的花時間來學習。同時,我相信花的時間沒白費,完整的瀏覽理解估計對fescar實現的大致流程瞭解的十之八九了。本文從構思立題到完成大概耗時1人天左右,博主在這個過程中,對fescar的實現也有了更加深入的瞭解。由於篇幅原因,並沒有面面俱到的對每個實現的細節去深究,如sql是如何解析的等,更多的是在fescar的TXC模型的實現過程的關鍵點做了詳細闡述。本文已校對,但由於個人知識水平及精力有限,文中不免出現錯誤或理解不當的地