1. 程式人生 > >sharding-jdbc原始碼閱讀之soft transaction

sharding-jdbc原始碼閱讀之soft transaction

先看一段作者張亮,對sharding-jdbc分散式事務理解:
張亮:分散式事務這塊,我們認為XA多階段提交的方式,雖然對分散式資料的完整性有比較好的保障,但會極大的降影響應用效能,並未考慮採用。我們採用的是兩種方式,一種稱之為弱XA,另一種是柔性事務,即BASE。
弱XA就是分庫之後的資料庫各自負責自己事務的提交和回滾,沒有統一的排程器集中處理。這樣做的好處是天然就支援,對效能也沒有影響。但一旦出問題,比如兩個庫的資料都需要提交,一個提交成功,另一個提交時斷網導致失敗,則會發生資料不一致的問題,而且這種資料不一致是永久存在的。
柔性事務是對弱XA的有效補充。柔性事務型別很多。
Sharding-JDBC主要實現的是最大努力送達型。即認為事務經過反覆嘗試一定能夠成功。如果每次事務執行失敗,則記錄至事務庫,並通過非同步的手段不斷的嘗試,直至事務成功(可以設定嘗試次數,如果嘗試太多仍然失敗則入庫並需要人工干預)。在嘗試的途中,資料會有一定時間的不一致,但最終是一致的。通過這種手段可以在效能不受影響的情況下犧牲強一致性,達到資料的最終一致性。最大努力送達型事務的缺點是假定事務一定是成功的,無法回滾,因此不夠靈活。
還有一種柔性事務型別是TCC,即Try Confirm Cancel。可以通過事務管理器控制事務的提交或回滾,更加接近原生事務,但仍然是最終一致性。其缺點是需要業務程式碼自行實現Try Confirm Cancel的介面,對現有業務帶來一定衝擊。未來Sharding-JDBC會帶來對TCC的支援。
還有一些其他的分散式事務,如google提出的F1等,由於Shariding-JDBC仍然使用資料庫的原有儲存引擎,並未改變,因此暫時不考慮對此型別事務的支援。
再看一下sharding-jdbc最大努力送達型架構圖:
這裡寫圖片描述

程式碼閱讀

SoftTransactionManager柔性事務管理器這個類是入口,init()方法
1、init方法

    /**
     * 初始化事務管理器.
     */
    public void init() throws SQLException {
        //DML類SQL執行時的事件釋出匯流排,雛形是guava的EventBus,包含register監聽器,post釋出事件等基本操作,訊息投遞員EventPostman類呼叫這些方法
        //採用事件監聽器模式,將事務提交執行,做成事件佇列。
        DMLExecutionEventBus.register(new
BestEffortsDeliveryListener()); //事務日誌有記憶體和RDB資料庫儲存2種形式,如果是RDB,需執行建表語句 if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) { //guava斷言 Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource()); createTable(); } //重要物件用的都是普通factory模式,此處做最大努力送達型Job的zk環境和JobScheduler初始化
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) { new NestedBestEffortsDeliveryJobFactory(transactionConfig).init(); } }

2、根據列舉型別獲取柔性事務方法getTransaction()

/**
     * 獲取柔性事務管理器.
     * 
     * @param type 柔性事務型別
     * @return 柔性事務
     */
    public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
        AbstractSoftTransaction result;
        switch (type) {
            case BestEffortsDelivery: 
                result = new BEDSoftTransaction();
                break;
            case TryConfirmCancel:
                result = new TCCSoftTransaction();
                break;
            default: 
                throw new UnsupportedOperationException(type.toString());
        }
        // TODO 目前使用不支援巢狀事務,以後這裡需要可配置
        if (getCurrentTransaction().isPresent()) {
            throw new UnsupportedOperationException("Cannot support nested transaction.");
        }
        ExecutorDataMap.getDataMap().put(TRANSACTION, result);
        ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
        return result;
    }

BEDSoftTransaction繼承AbstractSoftTransaction,自己擴充套件了一個方法,begin開啟事務方法,其實就是呼叫父類公用beginInternal方法,另外父類end方法直接繼承使用。
這裡提一下2個擴充套件ThreadLocal的類,用來做執行緒級別資料共享的。BEDSoftTransaction裡的ExecutorDataMap和AbstractSoftTransaction裡的ExecutorExceptionHandler
ExecutorDataMap做執行緒級別快取transaction物件和transactionConfig

ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);

ExecutorExceptionHandler做執行緒級別異常處理,裡面3個方法,setExceptionThrown可能在A類設定,在B類根據isExceptionThrown判斷是丟擲還是處理,最後傳入handleException處理,整個設計就是使用者只需關心setExceptionThrown,handleException都是一樣的。

/**
     * 設定是否將異常丟擲.
     *
     * @param isExceptionThrown 是否將異常丟擲
     */
    public static void setExceptionThrown(final boolean isExceptionThrown) {
        ExecutorExceptionHandler.IS_EXCEPTION_THROWN.set(isExceptionThrown);
    }

    /**
     * 獲取是否將異常丟擲.
     * 
     * @return 是否將異常丟擲
     */
    public static boolean isExceptionThrown() {
        return null == IS_EXCEPTION_THROWN.get() ? true : IS_EXCEPTION_THROWN.get();
    }

    /**
     * 處理異常. 
     * 
     * @param ex 待處理的異常
     */
    public static void handleException(final Exception ex) {
        if (isExceptionThrown()) {
            throw new ShardingJdbcException(ex);
        }
        log.error("exception occur: ", ex);
    }

重點看一下BestEffortsDeliveryListener類,這個方法是同步event的處理方法

    @Subscribe
    @AllowConcurrentEvents
    public void listen(final DMLExecutionEvent event) {
        if (!isProcessContinuously()) {
            return;
        }
        //config主要內容是1、最大嘗試次數2、事務日誌型別預設RDB 3、儲存事務日誌的資料來源
        //4、內嵌的最大努力送達型非同步作業配置物件(非同步最大執行次數、非同步執行延遲毫秒數、zk目錄和埠)
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
        switch (event.getEventExecutionType()) {
            case BEFORE_EXECUTE:
                //TODO 對於批量執行的SQL需要解析成兩層列表
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
                        event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
                return;
            case EXECUTE_SUCCESS: 
            //執行成功刪除tranlog
                transactionLogStorage.remove(event.getId());
                return;
            case EXECUTE_FAILURE: 
            //失敗繼續執行
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLStatementType.UPDATE);
                        //驗證不通過重新獲取conn
                        if (!isValidConnection(conn)) {
                            bedSoftTransaction.getConnection().releaseBrokenConnection(conn);
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLStatementType.UPDATE);
                            isNewConnection = true;
                        }
                        preparedStatement = conn.prepareStatement(event.getSql());
                        //TODO 對於批量事件需要解析成兩層列表
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
                        }
                        preparedStatement.executeUpdate();
                        deliverySuccess = true;
                        transactionLogStorage.remove(event.getId());
                    } catch (final SQLException ex) {
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
                        close(isNewConnection, conn, preparedStatement);
                    }
                }
                return;
            default: 
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }

這段程式碼很簡單,但是滿足不斷重試,資料最終一致,對事務操作,還是有一定要求的。
最大努力送達型,要求多次操作不涉及狀態累積或變遷
(1)、要求update操作,冪等性:
一個冪等的操作典型如:
把編號為5的記錄的A欄位設定為0
這種操作不管執行多少次都是冪等的。

一個非冪等的操作典型如:
把編號為5的記錄的A欄位增加1
這種操作顯然就不是冪等的。

要做到冪等性,從介面設計上來說不設計任何非冪等的操作即可。

譬如說需求是:
當用戶點選贊同時,將答案的贊同數量+1。
改為:
當用戶點選贊同時,確保答案贊同表中存在一條記錄,使用者、答案。
贊同數量由答案贊同表統計出來。
(2)、要求INSERT語句要求必須包含主鍵(不能是自增主鍵)。
(3)、DELETE語句無要求。

對應的有非同步最大送達型類NestedBestEffortsDeliveryJob,
繼承AbstractIndividualThroughputDataFlowElasticJob,這套非同步作業使用了當當網另外一個分散式作業排程系統elastic-job,這裡就不說了。主要方法processData

    @Override
    public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
        try {
        //transactionLogStorage是一個介面,具體processData實現看RdbTransactionLogStorage和MemoryTransactionLogStorage這2個類都很簡單,針對TransactionLog的操作。
            return transactionLogStorage.processData(
                    transactionConfig.getTargetConnection(data.getDataSource()), data, transactionConfig.getBestEffortsDeliveryJobConfiguration().get().getAsyncMaxDeliveryTryTimes());
        } catch (final SQLException ex) {
            throw new ShardingJdbcException(ex);
        }
    }