spring 多資料來源手動管理事務,最大程度保障資料一致性
阿新 • • 發佈:2019-02-10
模板程式碼如下:
@Autowired
private DataSourceTransactionManager transactionManager;
public void insert(CardEntity card, String dataSourceCloud, String tableNo) {
TransactionStatus statusInsert = null;
TransactionStatus statusMobile = null;
TransactionStatus statusTel = null;
try {
DataSourceKeyUtils.set(dataSourceCloud);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
statusInsert = transactionManager.getTransaction(def);
cardService.insert(card, tableNo, dataSourceCloud);
//插入索引
statusMobile = insertIndex(card, IndexType.mobile);
statusTel = insertIndex(card, IndexType.tel);
if (statusTel != null) {
transactionManager.commit(statusTel);
}
if (statusMobile != null) {
transactionManager.commit(statusMobile);
}
transactionManager.commit(statusInsert);
} catch (Exception e) {
if (statusTel != null) {
transactionManager.rollback(statusTel);
}
if (statusMobile != null) {
transactionManager.rollback(statusMobile);
}
if (statusInsert != null) {
transactionManager.rollback(statusInsert);
}
throw e;
}
}
在建立事務物件TransactionStatus 的時候,先手動切換資料來源。事務的提交和回滾要安裝建立順序的逆順序來提交或回滾。原因分析:當第一次建立TransactionStatus 的時候會呼叫doGetTransaction()方法,這個方法會獲取當前執行緒繫結的ConnectionHolder,第一次是null,然後就建立一個事務物件並開始。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status= newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status;
doBegin方法主要是設定一個連線
if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder()當第二建立TransactionStatus 時,由於當前已存在事務所以要建立新的事務,同時掛起當前事務。.isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); }
if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); }
SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }掛起當前事務,實際就是將當前事務物件繫結的連線設定為null,同時解綁當前執行緒資源。掛起的事務會儲存,
等當前事務提交之後會恢復之前的事務狀態。所以要逆序提交或回滾。
掛起當前事務:
@Override protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(this.dataSource); }
提交之後恢復之前的事務:
private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } }
後續:今天測試了一下有鎖的情況,發現會堵塞,因為事務沒提交,導致其他事務不能進行。原因在於就是同一個庫我也開了多個事務。
改進方法:所有屬於一個庫的操作,使用同一個事務。
適用範圍:這些sql操作沒有先後關係。
關鍵程式碼如下:
private void sqlRun(Map<String, List<Runnable>> map) { List<TransactionStatus> transactionStatusList = new ArrayList<>(); try { for (Map.Entry<String, List<Runnable>> entry : map.entrySet()) { DataSourceKeyUtils.set(entry.getKey()); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus transaction = transactionManager.getTransaction(def); transactionStatusList.add(transaction); List<Runnable> runnableList = entry.getValue(); for (Runnable runnable : runnableList) { runnable.run(); } } Collections.reverse(transactionStatusList); for (TransactionStatus status : transactionStatusList) { transactionManager.commit(status); } } catch (Exception e) { Collections.reverse(transactionStatusList); for (TransactionStatus status : transactionStatusList) { transactionManager.rollback(status); } throw e; } }