1. 程式人生 > >Fescar分布式事務實現原理解析探秘

Fescar分布式事務實現原理解析探秘

sun 問題 oba 普通 move override during 路由 parameter

fescar的TXC模型
技術分享圖片
上圖為fescar官方針對TXC模型制作的示意圖。不得不說大廠的圖制作的真的不錯,結合示意圖我們可以看到TXC實現的全貌。TXC的實現通過三個組件來完成。也就是上圖的三個深×××部分,其作用如下,:

TM:全局事務管理器,在標註開啟fescar分布式事務的服務端開啟,並將全局事務發送到TC事務控制端管理
TC:事務控制中心,控制全局事務的提交或者回滾。這個組件需要獨立部署維護,目前只支持單機版本,後續叠代計劃會有集群版本
RM:資源管理器,主要負責分支事務的上報,本地事務的管理
一段話簡述其實現過程:服務起始方發起全局事務並註冊到TC。在調用協同服務時,協同服務的事務分支事務會先完成階段一的事務提交或回滾,並生成事務回滾的undo_log日誌,同時註冊當前協同服務到TC並上報其事務狀態,歸並到同一個業務的全局事務中。此時若沒有問題繼續下一個協同服務的調用,期間任何協同服務的分支事務回滾,都會通知到TC,TC在通知全局事務包含的所有已完成一階段提交的分支事務回滾。如果所有分支事務都正常,最後回到全局事務發起方時,也會通知到TC,TC在通知全局事務包含的所有分支刪除回滾日誌。在這個過程中為了解決寫隔離和度隔離的問題會涉及到TC管理的全局鎖。

本博文的目標是深入代碼細節,探究其基本思路是如何實現的。首先會從項目的結構來簡述每個模塊的作用,繼而結合官方自帶的examples實例來探究整個分布式事務的實現過程。

項目結構解析
項目拉下來,用IDE打開後的目錄結構如下,下面先大致的看下每個模塊的實現
技術分享圖片
common :公共組件,提供常用輔助類,靜態變量、擴展機制類加載器、以及定義全局的異常等
config : 配置加載解析模塊,提供了配置的基礎接口,目前只有文件配置實現,後續會有nacos等配置中心的實現
core : 核心模塊主要封裝了TM、RM和TC通訊用RPC相關內容
dubbo :dubbo模塊主要適配dubbo通訊框架,使用dubbo的filter機制來傳統全局事務的信息到分支

examples :簡單的演示實例模塊,等下從這個模塊入手探索
rm-datasource :資源管理模塊,比較核心的一個模塊,個人認為這個模塊命名為core要更合理一點。代理了JDBC的一些類,用來解析sql生成回滾日誌、協調管理本地事務
server : TC組件所在,主要協調管理全局事務,負責全局事務的提交或者回滾,同時管理維護全局鎖。
spring :和spring集成的模塊,主要是aop邏輯,是整個分布式事務的入口,研究fescar的突破口
tm : 全局事務事務管理模塊,管理全局事務的邊界,全局事務開啟回滾點都在這個模塊控制
通過【examples】模塊的實例看下效果
第一步、先啟動TC也就是【Server】模塊,main方法直接啟動就好,默認服務端口8091

第二步、回到examples模塊,將訂單,業務,賬戶、倉庫四個服務的配置文件配置好,主要是mysql數據源和zookeeper連接地址,這裏要註意下,默認dubbo的zk註冊中心依賴沒有,啟動的時候回拋找不到class的異常,需要添加如下的依賴:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>

第三步、在BusinessServiceImpl中的模擬拋異常的地方打個斷點,依次啟動OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四個服務、等進斷點後,查看數據庫account_tbl表,金額已減去400元,變成了599元。然後放開斷點、BusinessServiceImpl模塊模擬的異常觸發,全局事務回滾,account_tbl表的金額就又回滾到999元了

如上,我們已經體驗到fescar事務的控制能力了,下面我們具體看下它是怎麽控制的。

fescar事務過程分析
首先分析配置文件
這個是一個鐵律,任何一個技術或框架要集成,配置文件肯定是一個突破口。從上面的例子我們了解到,實例模塊的配置文件中配置了一個全局事務掃描器實例,如:

<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
    <constructor-arg value="dubbo-demo-app"/>
    <constructor-arg value="my_test_tx_group"/>
</bean>

這個實例在項目啟動時會掃描所有實例,具體實現見【spring】模塊。並將標註了@GlobalTransactional註解的方法織入GlobalTransactionalInterceptor的invoke方法邏輯。同時應用啟動時,會初始化TM(TmRpcClient)和RM(RmRpcClient)的實例,這個時候,服務已經和TC事務控制中心勾搭上了。在往下看就涉及到TM模塊的事務模板類TransactionalTemplate。

【TM】模塊啟動全局事務
全局事務的開啟,提交、回滾都被封裝在TransactionalTemplate中完成了,代碼如:

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 2. begin transaction
    try {
        tx.begin(business.timeout(), business.name());
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
    Object rs = null;
    try {
        // Do Your Business
        rs = business.execute();
    } catch (Throwable ex) {
        // 3. any business exception, rollback.
        try {
            tx.rollback();
            // 3.1 Successfully rolled back
            throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
        } catch (TransactionException txe) {
            // 3.2 Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.RollbackFailure, ex);
        }
    }
    // 4. everything is fine, commit.
    try {
        tx.commit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
    return rs;
}

更詳細的實現在【TM】模塊中被分成了兩個Class實現,如下:

DefaultGlobalTransaction :全局事務具體的開啟,提交、回滾動作

DefaultTransactionManager :負責使用TmRpcClient向TC控制中心發送指令,如開啟全局事務(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滾(GlobalRollbackRequest)、查詢狀態(GlobalStatusRequest)等。

以上是TM模塊核心內容點,TM模塊完成全局事務開啟後,接下來就開始看看全局事務iD,xid是如何傳遞、RM組件是如何介入的

【dubbo】全局事務xid的傳遞
首先是xid的傳遞,目前已經實現了dubbo框架實現的微服務架構下的傳遞,其他的像spring cloud和motan等的想要實現也很容易,通過一般RPC通訊框架都有的filter機制,將xid從全局事務的發起節點傳遞到服務協從節點,從節點接收到後綁定到當前線程上線文環境中,用於在分支事務執行sql時判斷是否加入全局事務。fescar的實現見【dubbo】模塊如下:

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

上面代碼rpcXid不為空時,就加入到了RootContext的ContextCore中,這裏稍微深入講下。ContextCore是一個可擴展實現的接口,目前默認的實現是ThreadLocalContextCore,基於ThreadLocal來保存維護當前的xid。這裏fescar提供了可擴展的機制,實現在【common】模塊中,通過一個自定義的類加載器EnhancedServiceLoader加載需要擴展的服務類,這樣只需要在擴展類加上@LoadLevel註解。標記order屬性聲明高優先級別,就可以達到擴展實現的目的。

【RM】模塊本地資源管理的介入
fescar針對本地事務相關的接口,通過代理機制都實現了一遍代理類,如數據源(DataSourceProxy)、ConnectionProxy、StatementProxy等。這個在配置文件中也可以看出來,也就是說,我們要使用fescar分布式事務,一定要配置fescar提供的代理數據源。如:
技術分享圖片
配置好代理數據源後,從DataSourceProxy出發,本地針對數據庫的所有操作過程我們就可以隨意控制了。從上面xid傳遞,已經知道了xid被保存在RootContext中了,那麽請看下面的代碼,就非常清楚了:

首先看StatementProxy的一段代碼
技術分享圖片
在看ExecuteTemplate中的代碼
技術分享圖片
和【TM】模塊中的事務管理模板類TransactionlTemplate類似,這裏非常關鍵的邏輯代理也被封裝在了ExecuteTemplate模板類中。因重寫了Statement有了StatementProxy實現,在執行原JDBC的executeUpdate方法時,會調用到ExecuteTemplate的execute邏輯。在sql真正執行前,會判斷RootCOntext當前上下文中是否包含xid,也就是判斷當前是否是全局分布式事務。如果不是,就直接使用本地事務,如果是,這裏RM就會增加一些分布式事務相關的邏輯了。這裏根據sql的不同的類型,fescar封裝了五個不同的執行器來處理,分別是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,結構如下圖:
技術分享圖片
PlainExecutor:
原生的JDBC接口實現,未做任何處理,提供給全局事務中的普通的select查詢使用

UpdateExecutor、DeleteExecutor、InsertExecutor:
三個DML增刪改執行器實現,主要在sql執行的前後對sql語句進行了解析,實現了如下兩個抽象接口方法:

protected abstract TableRecords beforeImage() throws SQLException;

protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

在這個過程中通過解析sql生成了提供回滾操作的undo_log日誌,日誌目前是保存在msyql中的,和業務sql操作共用同一個事務。表的結構如下:
技術分享圖片
rollback_info保存的undo_log詳細信息,是longblob類型的,結構如下:

{
    "branchId":3958194,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":98
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PrimaryKey",
                                "name":"ID",
                                "type":4,
                                "value":10
                            },
                            {
                                "keyType":"NULL",
                                "name":"COUNT",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"storage_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"storage_tbl"
        }
    ],
    "xid":"192.168.7.77:8091:3958193"
}

這裏貼的是一個update的操作,undo_log記錄的非常的詳細,通過全局事務xid關聯branchid,記錄數據操作的表名,操作字段名,以及sql執行前後的記錄數,如這個記錄,表名=storage_tbl,sql執行前ID=10,count=100,sql執行後id=10,count=98。如果整個全局事務失敗,需要回滾的時候就可以生成:

update storage_tbl set count = 100 where id = 10;

這樣的回滾sql語句執行了。

SelectForUpdateExecutor:
fescar的AT模式在本地事務之上默認支持讀未提交的隔離級別,但是通過SelectForUpdateExecutor執行器,可以支持讀已提交的隔離級別。代碼如:

@Override
public Object doExecute(Object... args) throws Throwable {
    SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;

    Connection conn = statementProxy.getConnection();
    ResultSet rs = null;
    Savepoint sp = null;
    LockRetryController lockRetryController = new LockRetryController();
    boolean originalAutoCommit = conn.getAutoCommit();

    StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
    selectSQLAppender.append(getTableMeta().getPkName());
    selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
    String whereCondition = null;
    ArrayList<Object> paramAppender = new ArrayList<>();
    if (statementProxy instanceof ParametersHolder) {
        whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
    } else {
        whereCondition = recognizer.getWhereCondition();
    }
    if (!StringUtils.isEmpty(whereCondition)) {
        selectSQLAppender.append(" WHERE " + whereCondition);
    }
    selectSQLAppender.append(" FOR UPDATE");
    String selectPKSQL = selectSQLAppender.toString();

    try {
        if (originalAutoCommit) {
            conn.setAutoCommit(false);
        }
        sp = conn.setSavepoint();
        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

        while (true) {
            // Try to get global lock of those rows selected
            Statement stPK = null;
            PreparedStatement pstPK = null;
            ResultSet rsPK = null;
            try {
                if (paramAppender.isEmpty()) {
                    stPK = statementProxy.getConnection().createStatement();
                    rsPK = stPK.executeQuery(selectPKSQL);
                } else {
                    pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                    for (int i = 0; i < paramAppender.size(); i++) {
                        pstPK.setObject(i + 1, paramAppender.get(i));
                    }
                    rsPK = pstPK.executeQuery();
                }

                TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                statementProxy.getConnectionProxy().checkLock(selectPKRows);
                break;

            } catch (LockConflictException lce) {
                conn.rollback(sp);
                lockRetryController.sleep(lce);

            } finally {
                if (rsPK != null) {
                    rsPK.close();
                }
                if (stPK != null) {
                    stPK.close();
                }
                if (pstPK != null) {
                    pstPK.close();
                }
            }
        }

    } finally {
        if (sp != null) {
            conn.releaseSavepoint(sp);
        }
        if (originalAutoCommit) {
            conn.setAutoCommit(true);
        }
    }
    return rs;
}

關鍵代碼見:

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);

通過selectPKRows表操作記錄拿到lockKeys,然後到TC控制器端查詢是否被全局鎖定了,如果被鎖定了,就重新嘗試,直到鎖釋放返回查詢結果。

分支事務的註冊和上報
在本地事務提交前,fescar會註冊和上報分支事務相關的信息,見ConnectionProxy類的commit部分代碼:

@Override
public void commit() throws SQLException {
    if (context.inGlobalTransaction()) {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        try {
            if (context.hasUndoLog()) { 
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();

    } else {
        targetConnection.commit();
    }
}

從這段代碼我們可以看到,首先是判斷是了是否是全局事務,如果不是,就直接提交了,如果是,就先向TC控制器註冊分支事務,為了寫隔離,在TC端會涉及到全局鎖的獲取。然後保存了用於回滾操作的undo_log日誌,繼而真正提交本地事務,最後向TC控制器上報事務狀態。此時,階段一的本地事務已完成了。

【server】模塊協調全局
關於server模塊,我們可以聚焦在DefaultCoordinator這個類,這個是AbstractTCInboundHandler控制處理器默認實現。主要實現了全局事務開啟,提交,回滾,狀態查詢,分支事務註冊,上報,鎖檢查等接口,如:
技術分享圖片
回到一開始的TransactionlTemplate,如果整個分布式事務失敗需要回滾了,首先是TM向TC發起回滾的指令,然後TC接收到後,解析請求後會被路由到默認控制器類的doGlobalRollback方法內,最終在TC控制器端執行的代碼如下:

@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            continue;
        }
        try {
            BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

            switch (branchStatus) {
                case PhaseTwo_Rollbacked:
                    globalSession.removeBranch(branchSession);
                    LOGGER.error("Successfully rolled back branch " + branchSession);
                    continue;
                case PhaseTwo_RollbackFailed_Unretryable:
                    GlobalStatus currentStatus = globalSession.getStatus();
                    if (currentStatus.name().startsWith("Timeout")) {
                        globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
                    } else {
                        globalSession.changeStatus(GlobalStatus.RollbackFailed);
                    }
                    globalSession.end();
                    LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
                    return;
                default:
                    LOGGER.info("Failed to rollback branch " + branchSession);
                    if (!retrying) {
                        queueToRetryRollback(globalSession);
                    }
                    return;

            }
        } catch (Exception ex) {
            LOGGER.info("Exception rollbacking branch " + branchSession, ex);
            if (!retrying) {
                queueToRetryRollback(globalSession);
                if (ex instanceof TransactionException) {
                    throw (TransactionException) ex;
                } else {
                    throw new TransactionException(ex);
                }
            }

        }

    }
    GlobalStatus currentStatus = globalSession.getStatus();
    if (currentStatus.name().startsWith("Timeout")) {
        globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
    } else {
        globalSession.changeStatus(GlobalStatus.Rollbacked);
    }
    globalSession.end();
}

如上代碼可以看到,回滾時從全局事務會話中叠代每個分支事務,然後通知每個分支事務回滾。分支服務接收到請求後,首先會被路由到RMHandlerAT中的doBranchRollback方法,繼而調用了RM中的branchRollback方法,代碼如下:

@Override
public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

M分支事務端最後執行的是UndoLogManager的undo方法,通過xid和branchid從數據庫查詢出回滾日誌,完成數據回滾操作,整個過程都是同步完成的。如果全局事務是成功的,TC也會有類似的上述協調過程,只不過是異步的將本次全局事務相關的undo_log清除了而已。至此,就完成了2階段的提交或回滾,也就完成了完整的全局事務事務的控制。

結語
如果你看到這裏,那麽非常感謝你,在繁忙工作之余耐心的花時間來學習。同時,我相信花的時間沒白費,完整的瀏覽理解估計對fescar實現的大致流程了解的十之八九了。本文從構思立題到完成大概耗時1人天左右,博主在這個過程中,對fescar的實現也有了更加深入的了解。由於篇幅原因,並沒有面面俱到的對每個實現的細節去深究,如sql是如何解析的等,更多的是在fescar的TXC模型的實現過程的關鍵點做了詳細闡述。本文已校對,但由於個人知識水平及精力有限,文中不免出現錯誤或理解不當的地方,歡迎指正。

Fescar分布式事務實現原理解析探秘