sharding-jdbc原始碼閱讀之soft transaction
先看一段作者張亮,對sharding-jdbc分散式事務理解:
張亮:分散式事務這塊,我們認為XA多階段提交的方式,雖然對分散式資料的完整性有比較好的保障,但會極大的降影響應用效能,並未考慮採用。我們採用的是兩種方式,一種稱之為弱XA,另一種是柔性事務,即BASE。
弱XA就是分庫之後的資料庫各自負責自己事務的提交和回滾,沒有統一的排程器集中處理。這樣做的好處是天然就支援,對效能也沒有影響。但一旦出問題,比如兩個庫的資料都需要提交,一個提交成功,另一個提交時斷網導致失敗,則會發生資料不一致的問題,而且這種資料不一致是永久存在的。
柔性事務是對弱XA的有效補充。柔性事務型別很多。
Sharding-JDBC主要實現的是最大努力送達型。即認為事務經過反覆嘗試一定能夠成功。如果每次事務執行失敗,則記錄至事務庫,並通過非同步的手段不斷的嘗試,直至事務成功(可以設定嘗試次數,如果嘗試太多仍然失敗則入庫並需要人工干預)。在嘗試的途中,資料會有一定時間的不一致,但最終是一致的。通過這種手段可以在效能不受影響的情況下犧牲強一致性,達到資料的最終一致性。最大努力送達型事務的缺點是假定事務一定是成功的,無法回滾,因此不夠靈活。
還有一種柔性事務型別是TCC,即Try Confirm Cancel。可以通過事務管理器控制事務的提交或回滾,更加接近原生事務,但仍然是最終一致性。其缺點是需要業務程式碼自行實現Try Confirm Cancel的介面,對現有業務帶來一定衝擊。未來Sharding-JDBC會帶來對TCC的支援。
還有一些其他的分散式事務,如google提出的F1等,由於Shariding-JDBC仍然使用資料庫的原有儲存引擎,並未改變,因此暫時不考慮對此型別事務的支援。
再看一下sharding-jdbc最大努力送達型架構圖:
程式碼閱讀
SoftTransactionManager柔性事務管理器這個類是入口,init()方法
1、init方法
/**
* 初始化事務管理器.
*/
public void init() throws SQLException {
//DML類SQL執行時的事件釋出匯流排,雛形是guava的EventBus,包含register監聽器,post釋出事件等基本操作,訊息投遞員EventPostman類呼叫這些方法
//採用事件監聽器模式,將事務提交執行,做成事件佇列。
DMLExecutionEventBus.register(new BestEffortsDeliveryListener());
//事務日誌有記憶體和RDB資料庫儲存2種形式,如果是RDB,需執行建表語句
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
//guava斷言
Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
createTable();
}
//重要物件用的都是普通factory模式,此處做最大努力送達型Job的zk環境和JobScheduler初始化
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
}
}
2、根據列舉型別獲取柔性事務方法getTransaction()
/**
* 獲取柔性事務管理器.
*
* @param type 柔性事務型別
* @return 柔性事務
*/
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
AbstractSoftTransaction result;
switch (type) {
case BestEffortsDelivery:
result = new BEDSoftTransaction();
break;
case TryConfirmCancel:
result = new TCCSoftTransaction();
break;
default:
throw new UnsupportedOperationException(type.toString());
}
// TODO 目前使用不支援巢狀事務,以後這裡需要可配置
if (getCurrentTransaction().isPresent()) {
throw new UnsupportedOperationException("Cannot support nested transaction.");
}
ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
return result;
}
BEDSoftTransaction繼承AbstractSoftTransaction,自己擴充套件了一個方法,begin開啟事務方法,其實就是呼叫父類公用beginInternal方法,另外父類end方法直接繼承使用。
這裡提一下2個擴充套件ThreadLocal的類,用來做執行緒級別資料共享的。BEDSoftTransaction裡的ExecutorDataMap和AbstractSoftTransaction裡的ExecutorExceptionHandler
ExecutorDataMap做執行緒級別快取transaction物件和transactionConfig
ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
ExecutorExceptionHandler做執行緒級別異常處理,裡面3個方法,setExceptionThrown可能在A類設定,在B類根據isExceptionThrown判斷是丟擲還是處理,最後傳入handleException處理,整個設計就是使用者只需關心setExceptionThrown,handleException都是一樣的。
/**
* 設定是否將異常丟擲.
*
* @param isExceptionThrown 是否將異常丟擲
*/
public static void setExceptionThrown(final boolean isExceptionThrown) {
ExecutorExceptionHandler.IS_EXCEPTION_THROWN.set(isExceptionThrown);
}
/**
* 獲取是否將異常丟擲.
*
* @return 是否將異常丟擲
*/
public static boolean isExceptionThrown() {
return null == IS_EXCEPTION_THROWN.get() ? true : IS_EXCEPTION_THROWN.get();
}
/**
* 處理異常.
*
* @param ex 待處理的異常
*/
public static void handleException(final Exception ex) {
if (isExceptionThrown()) {
throw new ShardingJdbcException(ex);
}
log.error("exception occur: ", ex);
}
重點看一下BestEffortsDeliveryListener類,這個方法是同步event的處理方法
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
if (!isProcessContinuously()) {
return;
}
//config主要內容是1、最大嘗試次數2、事務日誌型別預設RDB 3、儲存事務日誌的資料來源
//4、內嵌的最大努力送達型非同步作業配置物件(非同步最大執行次數、非同步執行延遲毫秒數、zk目錄和埠)
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE:
//TODO 對於批量執行的SQL需要解析成兩層列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS:
//執行成功刪除tranlog
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
//失敗繼續執行
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLStatementType.UPDATE);
//驗證不通過重新獲取conn
if (!isValidConnection(conn)) {
bedSoftTransaction.getConnection().releaseBrokenConnection(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLStatementType.UPDATE);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO 對於批量事件需要解析成兩層列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess = true;
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}
這段程式碼很簡單,但是滿足不斷重試,資料最終一致,對事務操作,還是有一定要求的。
最大努力送達型,要求多次操作不涉及狀態累積或變遷
(1)、要求update操作,冪等性:
一個冪等的操作典型如:
把編號為5的記錄的A欄位設定為0
這種操作不管執行多少次都是冪等的。
一個非冪等的操作典型如:
把編號為5的記錄的A欄位增加1
這種操作顯然就不是冪等的。
要做到冪等性,從介面設計上來說不設計任何非冪等的操作即可。
譬如說需求是:
當用戶點選贊同時,將答案的贊同數量+1。
改為:
當用戶點選贊同時,確保答案贊同表中存在一條記錄,使用者、答案。
贊同數量由答案贊同表統計出來。
(2)、要求INSERT語句要求必須包含主鍵(不能是自增主鍵)。
(3)、DELETE語句無要求。
對應的有非同步最大送達型類NestedBestEffortsDeliveryJob,
繼承AbstractIndividualThroughputDataFlowElasticJob,這套非同步作業使用了當當網另外一個分散式作業排程系統elastic-job,這裡就不說了。主要方法processData
@Override
public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
try {
//transactionLogStorage是一個介面,具體processData實現看RdbTransactionLogStorage和MemoryTransactionLogStorage這2個類都很簡單,針對TransactionLog的操作。
return transactionLogStorage.processData(
transactionConfig.getTargetConnection(data.getDataSource()), data, transactionConfig.getBestEffortsDeliveryJobConfiguration().get().getAsyncMaxDeliveryTryTimes());
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
}