1. 程式人生 > >spring 多資料來源手動管理事務,最大程度保障資料一致性

spring 多資料來源手動管理事務,最大程度保障資料一致性

模板程式碼如下:

  @Autowired
    private DataSourceTransactionManager transactionManager;

    public void insert(CardEntity card, String dataSourceCloud, String tableNo) {
        TransactionStatus statusInsert = null;
        TransactionStatus statusMobile = null;
        TransactionStatus statusTel = null;
        try {
            DataSourceKeyUtils.set(dataSourceCloud);
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            statusInsert = transactionManager.getTransaction(def);
            cardService.insert(card, tableNo, dataSourceCloud);

            //插入索引
            statusMobile = insertIndex(card, IndexType.mobile);
            statusTel = insertIndex(card, IndexType.tel);

            if (statusTel != null) {
                transactionManager.commit(statusTel);
            }
            if (statusMobile != null) {
                transactionManager.commit(statusMobile);
            }
            transactionManager.commit(statusInsert);
        } catch (Exception e) {
            if (statusTel != null) {
                transactionManager.rollback(statusTel);
            }
            if (statusMobile != null) {
                transactionManager.rollback(statusMobile);
            }
            if (statusInsert != null) {
                transactionManager.rollback(statusInsert);
            }
            throw e;
        }
    }
在建立事務物件TransactionStatus 的時候,先手動切換資料來源。事務的提交和回滾要安裝建立順序的逆順序來提交或回滾。

原因分析:當第一次建立TransactionStatus 的時候會呼叫doGetTransaction()方法,這個方法會獲取當前執行緒繫結的ConnectionHolder,第一次是null,然後就建立一個事務物件並開始。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status 
= newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status;

doBegin方法主要是設定一個連線

if (txObject.getConnectionHolder() == null ||
      txObject.getConnectionHolder()
.isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); }
當第二建立TransactionStatus 時,由於當前已存在事務所以要建立新的事務,同時掛起當前事務。
if (isExistingTransaction(transaction)) {
   // Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
   return status;
}
掛起當前事務,實際就是將當前事務物件繫結的連線設定為null,同時解綁當前執行緒資源。掛起的事務會儲存,

等當前事務提交之後會恢復之前的事務狀態。所以要逆序提交或回滾

掛起當前事務:
@Override
protected Object doSuspend(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
   return TransactionSynchronizationManager.unbindResource(this.dataSource);
}
提交之後恢復之前的事務:
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
   status.setCompleted();
   if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.clear();
}
   if (status.isNewTransaction()) {
      doCleanupAfterCompletion(status.getTransaction());
}
   if (status.getSuspendedResources() != null) {
      if (status.isDebug()) {
         logger.debug("Resuming suspended transaction after completion of inner transaction");
}
      resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
}
}

後續:今天測試了一下有鎖的情況,發現會堵塞,因為事務沒提交,導致其他事務不能進行。原因在於就是同一個庫我也開了多個事務。

改進方法:所有屬於一個庫的操作,使用同一個事務。

適用範圍:這些sql操作沒有先後關係。

關鍵程式碼如下:

private void sqlRun(Map<String, List<Runnable>> map) {
    List<TransactionStatus> transactionStatusList = new ArrayList<>();
    try {
        for (Map.Entry<String, List<Runnable>> entry : map.entrySet()) {
            DataSourceKeyUtils.set(entry.getKey());
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus transaction = transactionManager.getTransaction(def);
transactionStatusList.add(transaction);
List<Runnable> runnableList = entry.getValue();
            for (Runnable runnable : runnableList) {
                runnable.run();
}
        }
        Collections.reverse(transactionStatusList);
        for (TransactionStatus status : transactionStatusList) {
            transactionManager.commit(status);
}
    } catch (Exception e) {
        Collections.reverse(transactionStatusList);
        for (TransactionStatus status : transactionStatusList) {
            transactionManager.rollback(status);
}
        throw e;
}
}