1. 程式人生 > 實用技巧 >分散式事務(五)原始碼詳解

分散式事務(五)原始碼詳解

目錄

正文

回到頂部

系列目錄

分散式事務(一)原理概覽

分散式事務(二)JTA規範

分散式事務(三)mysql對XA協議的支援

分散式事務(四)簡單樣例

分散式事務(五)原始碼詳解

分散式事務(六)總結提高

回到頂部

引子

本節我們將會從上一節的”簡單樣例“入手:Spring Boot+Atomikos(TM)+Mybatis(ORM)+Mysql(DB),深入原始碼,看看這個分散式事務是怎麼定義、執行的。

先來回憶一下第二節講的JTA規範,如下圖。Atomikos是什麼角色?起到什麼作用?

角色:

Atomikos根本上是一個事務管理器(TM)也就是JTA模型的核心,上圖扇形的中心位置。

作用:

TM呼叫 【Resource Manager資源管理器】 的XAResource介面來實現事務操作。

TM依賴 【Application Server應用伺服器】 的TransactionManager介面當然如果伺服器不支援事務管理,自然也就只能使用第三方包,例如Atomikos。

TM依賴 【Application應用程式】 設定事務邊界、屬性,application呼叫UserTransaction介面控制事務開始、提交、回滾。

回到頂部

一、bean定義

1.1JtaTransactionManager

org.springframework.transaction.jta.JtaTransactionManager類是spring提供的分散式事務管理器。

JtaTransactionManager類圖如下:

實現了介面如下:

  • PlatformTransactionManager :獲取事務,提交事務,回滾事務
  • TransactionFactory:建立事務
  • InitializingBean:初始化bean

JtaTransactionManager實現了InitializingBean介面的afterPropertiesSet()方法,處於bean生命週期的容器初始化->例項化期->初始化中期,如下圖:

下面我們看一下JtaTransactionManager在bean初始化中期InitializingBean介面的afterPropertiesSet()做了什麼:

 1 /**
 2  * Initialize the UserTransaction as well as the TransactionManager handle.
 3  * @see #initUserTransactionAndTransactionManager()
 4  */
 5 @Override
 6 public void afterPropertiesSet() throws TransactionSystemException {
 7     initUserTransactionAndTransactionManager();
 8     checkUserTransactionAndTransactionManager();
 9     initTransactionSynchronizationRegistry();
10 }
  • 1.initUserTransactionAndTransactionManager:初始化UserTransaction和TransactionManager介面。主要是如果沒有定義的話,可以支援JNDI。

  • 2.checkUserTransactionAndTransactionManager:校驗2個介面是否存在。UserTransaction必須定義,TransactionManager可以不定義。

      原始碼如下:

      

      對應控制檯列印:

o.s.t.jta.JtaTransactionManager          : Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@614aeccc
o.s.t.jta.JtaTransactionManager          : Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@5116ac09
  • 3.initTransactionSynchronizationRegistry:初始化事務同步註冊,這個不使用JNDI的話沒啥用。

上一節分散式事務(三)簡單樣例中我們配置了JtaTransactionManagerConfig類,如下:

 1 package study.config.datasource;
 2 
 3 import com.atomikos.icatch.jta.UserTransactionImp;
 4 import com.atomikos.icatch.jta.UserTransactionManager;
 5 import org.springframework.context.annotation.Bean;
 6 import org.springframework.context.annotation.Configuration;
 7 import org.springframework.transaction.jta.JtaTransactionManager;
 8 
 9 import javax.transaction.UserTransaction;
10 
11 /**
12  * 事務管理器配置類
13  *
14  * @author denny
15  */
16 @Configuration
17 public class JtaTransactionManagerConfig {
18 
19     @Bean(name = "atomikosTransactionManager")
20     public JtaTransactionManager regTransactionManager() {
21         UserTransactionManager userTransactionManager = new UserTransactionManager();
22         UserTransaction userTransaction = new UserTransactionImp();
23         return new JtaTransactionManager(userTransaction, userTransactionManager);
24     }
25 }

如上圖,我們定義了一個name = "atomikosTransactionManager"的bean,具體型別為JtaTransactionManager。其中構造了2個實現類UserTransactionImp(javax.transaction.UserTransaction介面)、UserTransactionManager(javax.transaction.TransactionManager介面)。並用這2個實現類構造了一個JtaTransactionManager。

1.UserTransaction介面

提供給使用者操控事務的:開啟,提交,回滾等等。原始碼如下:

2 TransactionManager介面

原始碼如下:

相比UserTransaction,TransactionManager介面多了介面的掛起、恢復、獲取事務3個介面。這3個方法明顯是留給系統自己呼叫的。

1.2 AtomikosDataSourceBean

Spring 為Atomikos定製了一個org.springframework.boot.jta.atomikos.AtomikosDataSourceBean,提供了bean生命週期的一些介面:

  1. BeanNameAware:設定bean名稱
  2. InitializingBean:初始化bean
  3. DisposableBean:銷燬bean

我們只需要定義這個bean即可輕鬆使得spring來維護。

com.atomikos.jdbc.AtomikosDataSourceBean類圖如下:

其中核心介面:

DataSource介面:getConnection獲取資料庫連線

ConnectionPoolProperties介面:用於載入連線池的屬性

回到頂部

二、原始碼剖析

2.1 自動配置類

老套路哈,spring boot就這麼點花花腸子,既然使用@Transactional這種註解的方式,那麼我們就從springboot 容器啟動時的自動配置載入(spring boot容器啟動詳解)開始看。在/META-INF/spring.factories中配置檔案中查詢,如下圖:

載入2個關於事務的自動配置類:

org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
org.springframework.boot.autoconfigure.transaction.jta.JtaAutoConfiguration,

由於本文是分散式事務,故2個配置檔案都生效了,我們先看JtaAutoConfiguration

2.2 JtaAutoConfiguration

 1 /**
 2  * {@link EnableAutoConfiguration Auto-configuration} for JTA.
 3  *
 4  * @author Josh Long
 5  * @author Phillip Webb
 6  * @since 1.2.0
 7  */
 8 @ConditionalOnClass(javax.transaction.Transaction.class)
 9 @ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true)
10 @AutoConfigureBefore({ XADataSourceAutoConfiguration.class,
11         ActiveMQAutoConfiguration.class, ArtemisAutoConfiguration.class,
12         HibernateJpaAutoConfiguration.class })
13 @Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class,
14         AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class })
15 @EnableConfigurationProperties(JtaProperties.class)
16 public class JtaAutoConfiguration {
17 
18 }

如上,JtaAutoConfiguration這個類竟然是個空殼,只有一堆註解,挑幾個重要的講一講:

1.@ConditionalOnClass(javax.transaction.Transaction.class):代表類路徑下存在javax.transaction.Transaction.class這個類,那麼JtaAutoConfiguration生效。

2.@ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing =true),自動開啟spring.jta.enabled=true.

3.@Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class,AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class }),又是spring套路哈,用來匯入類。這裡匯入了4個配置類,可見支援4種第三方事務管理器。AtomikosJtaConfiguration.class自然就是Atomikos了。

AtomikosJtaConfiguration.class這個配置類

 1 @Configuration
 2 @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
 3 @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
 4 @ConditionalOnMissingBean(PlatformTransactionManager.class)
 5 class AtomikosJtaConfiguration {
 6 
 7     private final JtaProperties jtaProperties;
 8 
 9     private final TransactionManagerCustomizers transactionManagerCustomizers;
10 
11     AtomikosJtaConfiguration(JtaProperties jtaProperties,
12             ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
13         this.jtaProperties = jtaProperties;
14         this.transactionManagerCustomizers = transactionManagerCustomizers
15                 .getIfAvailable();
16     }
17 
18     @Bean(initMethod = "init", destroyMethod = "shutdownForce")
19     @ConditionalOnMissingBean(UserTransactionService.class)
20     public UserTransactionServiceImp userTransactionService(
21             AtomikosProperties atomikosProperties) {
22         Properties properties = new Properties();
23         if (StringUtils.hasText(this.jtaProperties.getTransactionManagerId())) {
24             properties.setProperty("com.atomikos.icatch.tm_unique_name",
25                     this.jtaProperties.getTransactionManagerId());
26         }
27         properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir());
28         properties.putAll(atomikosProperties.asProperties());
29         return new UserTransactionServiceImp(properties);
30     }
31 
32     private String getLogBaseDir() {
33         if (StringUtils.hasLength(this.jtaProperties.getLogDir())) {
34             return this.jtaProperties.getLogDir();
35         }
36         File home = new ApplicationHome().getDir();
37         return new File(home, "transaction-logs").getAbsolutePath();
38     }
39 
40     @Bean(initMethod = "init", destroyMethod = "close")
41     @ConditionalOnMissingBean
42     public UserTransactionManager atomikosTransactionManager(
43             UserTransactionService userTransactionService) throws Exception {
44         UserTransactionManager manager = new UserTransactionManager();
45         manager.setStartupTransactionService(false);
46         manager.setForceShutdown(true);
47         return manager;
48     }
49 
50     @Bean
51     @ConditionalOnMissingBean(XADataSourceWrapper.class)
52     public AtomikosXADataSourceWrapper xaDataSourceWrapper() {
53         return new AtomikosXADataSourceWrapper();
54     }
55 
56     @Bean
57     @ConditionalOnMissingBean
58     public static AtomikosDependsOnBeanFactoryPostProcessor atomikosDependsOnBeanFactoryPostProcessor() {
59         return new AtomikosDependsOnBeanFactoryPostProcessor();
60     }
61 
62     @Bean
63     public JtaTransactionManager transactionManager(UserTransaction userTransaction,
64             TransactionManager transactionManager) {
65         JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(
66                 userTransaction, transactionManager);
67         if (this.transactionManagerCustomizers != null) {
68             this.transactionManagerCustomizers.customize(jtaTransactionManager);
69         }
70         return jtaTransactionManager;
71     }
72 
73     @Configuration
74     @ConditionalOnClass(Message.class)
75     static class AtomikosJtaJmsConfiguration {
76 
77         @Bean
78         @ConditionalOnMissingBean(XAConnectionFactoryWrapper.class)
79         public AtomikosXAConnectionFactoryWrapper xaConnectionFactoryWrapper() {
80             return new AtomikosXAConnectionFactoryWrapper();
81         }
82 
83     }
84 
85 }

2.3 TransactionAutoConfiguration

這裡和本地事務分析過程一致,就不再重複,飛機票spring事務詳解(三)原始碼詳解,一直看到第二節結束.這裡只截個圖:

最終原始碼呼叫具體事務管理器的PlatformTransactionManager介面的3個方法:

1 public interface PlatformTransactionManager {
2     // 獲取事務狀態
3     TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
4   // 事務提交
5     void commit(TransactionStatus status) throws TransactionException;
6   // 事務回滾
7     void rollback(TransactionStatus status) throws TransactionException;
8 }

回到頂部

三、核心原始碼

核心實現類圖:

如上提所示,PlatformTransactionManager頂級介面定義了最核心的事務管理方法,下面一層是AbstractPlatformTransactionManager抽象類,實現了PlatformTransactionManager介面的方法並定義了一些抽象方法,供子類拓展。最下面一層是2個經典事務管理器:

1.DataSourceTransactionmanager: 即本地單資源事務管理器。

2.JtaTransactionManager:即多資源事務管理器(又叫做分散式事務管理器),其實現了JTA規範,使用XA協議進行兩階段提交。

我們這裡自然是JTA分散式環境,我們只需要從JtaTransactionManager這個實現類入手即可。

3.1getTransaction獲取事務

AbstractPlatformTransactionManager實現了getTransaction()方法如下:

 1 @Override
 2     public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
 3         Object transaction = doGetTransaction();
 4 
 5         // Cache debug flag to avoid repeated checks.
 6         boolean debugEnabled = logger.isDebugEnabled();
 7 
 8         if (definition == null) {
 9             // Use defaults if no transaction definition given.
10             definition = new DefaultTransactionDefinition();
11         }
12       // 如果當前已經存在事務
13         if (isExistingTransaction(transaction)) {
14             // 根據不同傳播機制不同處理
15             return handleExistingTransaction(definition, transaction, debugEnabled);
16         }
17 
18         // 超時不能小於預設值
19         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
20             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
21         }
22 
23         // 當前不存在事務,傳播機制=MANDATORY(支援當前事務,沒事務報錯),報錯
24         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
25             throw new IllegalTransactionStateException(
26                     "No existing transaction found for transaction marked with propagation 'mandatory'");
27         }// 當前不存在事務,傳播機制=REQUIRED/REQUIRED_NEW/NESTED,這三種情況,需要新開啟事務,且加上事務同步
28         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
29                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
30                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
31             SuspendedResourcesHolder suspendedResources = suspend(null);
32             if (debugEnabled) {
33                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
34             }
35             try {// 是否需要新開啟同步// 開啟// 開啟
36                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
37                 DefaultTransactionStatus status = newTransactionStatus(
38                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
39                 doBegin(transaction, definition);// 開啟新事務
40                 prepareSynchronization(status, definition);//預備同步
41                 return status;
42             }
43             catch (RuntimeException ex) {
44                 resume(null, suspendedResources);
45                 throw ex;
46             }
47             catch (Error err) {
48                 resume(null, suspendedResources);
49                 throw err;
50             }
51         }
52         else {
53             // 當前不存在事務當前不存在事務,且傳播機制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,這三種情況,建立“空”事務:沒有實際事務,但可能是同步。警告:定義了隔離級別,但並沒有真實的事務初始化,隔離級別被忽略有隔離級別但是並沒有定義實際的事務初始化,有隔離級別但是並沒有定義實際的事務初始化,
54             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
55                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
56                         "isolation level will effectively be ignored: " + definition);
57             }
58             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
59             return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
60         }
61     }

上圖核心步驟就是:

  • 1.doGetTransaction():獲取事務
  • 2.doBegin:準備工作

3.1.1 JtaTransactionManager的doGetTransaction()

其實也就是把UserTransaction封裝成一個JtaTransactionObject返回。

 1     @Override
 2     protected Object doGetTransaction() {
 3         UserTransaction ut = getUserTransaction();
 4         if (ut == null) {
 5             throw new CannotCreateTransactionException("No JTA UserTransaction available - " +
 6                     "programmatic PlatformTransactionManager.getTransaction usage not supported");
 7         }
 8         if (!this.cacheUserTransaction) {
 9             ut = lookupUserTransaction(
10                     this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME);
11         }
12         return doGetJtaTransaction(ut);
13     }
14 
15     /**
16      * Get a JTA transaction object for the given current UserTransaction.
17      * <p>Subclasses can override this to provide a JtaTransactionObject
18      * subclass, for example holding some additional JTA handle needed.
19      * @param ut the UserTransaction handle to use for the current transaction
20      * @return the JtaTransactionObject holding the UserTransaction
21      */
22     protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) {
23         return new JtaTransactionObject(ut);
24     }

3.1.2 JtaTransactionManager.doBegin

 1 @Override
 2     protected void doBegin(Object transaction, TransactionDefinition definition) {
 3         JtaTransactionObject txObject = (JtaTransactionObject) transaction;
 4         try {
 5             doJtaBegin(txObject, definition);
 6         }
 7         catch (NotSupportedException ex) {
 8             // assume nested transaction not supported
 9             throw new NestedTransactionNotSupportedException(
10                     "JTA implementation does not support nested transactions", ex);
11         }
12         catch (UnsupportedOperationException ex) {
13             // assume nested transaction not supported
14             throw new NestedTransactionNotSupportedException(
15                     "JTA implementation does not support nested transactions", ex);
16         }
17         catch (SystemException ex) {
18             throw new CannotCreateTransactionException("JTA failure on begin", ex);
19         }
20     }

呼叫JtaTransactionManager.doJtaBegin:

1 protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
2             throws NotSupportedException, SystemException {
3         
4         applyIsolationLevel(txObject, definition.getIsolationLevel());
5         int timeout = determineTimeout(definition);
6         applyTimeout(txObject, timeout);
7         txObject.getUserTransaction().begin();
8     }

UserTransactionImp.begin->TransactionManagerImp.begin

 1 public void begin ( int timeout ) throws NotSupportedException,
 2             SystemException
 3     {
 4         CompositeTransaction ct = null;
 5         ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;
 6         
 7         ct = compositeTransactionManager.getCompositeTransaction();
 8         if ( ct != null && ct.getProperty (  JTA_PROPERTY_NAME ) == null ) {
 9             LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
10                     " (will be resumed after JTA transaction ends)" );
11             ct = compositeTransactionManager.suspend();
12             resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
13         }
14         
15         try {
16             ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
17             if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
18             if ( ct.isRoot () && getDefaultSerial () )
19                 ct.getTransactionControl ().setSerial ();
20             ct.setProperty ( JTA_PROPERTY_NAME , "true" );
21         } catch ( SysException se ) {
22             String msg = "Error in begin()";
23             LOGGER.logWarning( msg , se );
24             throw new ExtendedSystemException ( msg , se
25                     .getErrors () );
26         }
27         recreateCompositeTransactionAsJtaTransaction(ct);
28     }

createCompositeTransaction建立混合事務

 1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
 2     {
 3         Stack errors = new Stack();
 4         CompositeTransaction ct = null , ret = null;
 5         // 獲取當前執行緒繫結的事務
 6         ct = getCurrentTx ();
       // 當前執行緒不存在事務 7 if ( ct == null ) {
// 建立組合事務 8 ret = service_.createCompositeTransaction ( timeout ); 9 if(LOGGER.isInfoEnabled()){ 10 LOGGER.logInfo("createCompositeTransaction ( " + timeout + " ): " 11 + "created new ROOT transaction with id " + ret.getTid ()); 12 }
        // 當前執行緒存在事務 13 } else { 14 if(LOGGER.isInfoEnabled()) LOGGER.logInfo("createCompositeTransaction ( " + timeout + " )");
          // 建立子事務 15 ret = ct.getTransactionControl ().createSubTransaction (); 16 17 } 18 Thread thread = Thread.currentThread ();
       // 綁定當前執行緒和事務的2個對映map 19 setThreadMappings ( ret, thread ); 20 21 return ret; 22 }

如果當前執行緒不存在事務,建立組合事務。如果當前執行緒存在事務,建立子事務。

呼叫TransactionServiceImp的createCompositeTransaction建立混合事務

 1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
 2     {
 3         if ( !initialized_ ) throw new IllegalStateException ( "Not initialized" );
 4 
 5         if ( maxNumberOfActiveTransactions_ >= 0 && 
 6              tidToTransactionMap_.size () >= maxNumberOfActiveTransactions_ ) {
 7             throw new IllegalStateException ( "Max number of active transactions reached:" + maxNumberOfActiveTransactions_ );
 8         }
 9         
10         String tid = tidmgr_.get ();
11         Stack lineage = new Stack ();
12         //建立協調者
15         CoordinatorImp cc = createCC ( null, tid, true, false, timeout );
       // 建立組合事務 16 CompositeTransaction ct = createCT ( tid, cc, lineage, false ); 17 return ct; 18 }

3.2 commit 提交事務

事務提交流程圖如下:

AbstractPlatformTransactionManager的commit原始碼如下:

 1 @Override
 2     public final void commit(TransactionStatus status) throws TransactionException {
 3         if (status.isCompleted()) {// 如果事務已完結,報錯無法再次提交
 4             throw new IllegalTransactionStateException(
 5                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
 6         }
 7 
 8         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
 9         if (defStatus.isLocalRollbackOnly()) {// 如果事務明確標記為回滾,
10             if (defStatus.isDebug()) {
11                 logger.debug("Transactional code has requested rollback");
12             }
13             processRollback(defStatus);//執行回滾
14             return;
15         }//如果不需要全域性回滾時提交 且 全域性回滾
16         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
17             if (defStatus.isDebug()) {
18                 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
19             }//執行回滾
20             processRollback(defStatus);
21             // 僅在最外層事務邊界(新事務)或顯式地請求時丟擲“未期望的回滾異常”
23             if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
24                 throw new UnexpectedRollbackException(
25                         "Transaction rolled back because it has been marked as rollback-only");
26             }
27             return;
28         }
29      // 執行提交事務
30         processCommit(defStatus);
31     }

如上圖,各種判斷:

  • 1.如果事務明確標記為本地回滾,-》執行回滾
  • 2.如果不需要全域性回滾時提交 且 全域性回滾-》執行回滾
  • 3.提交事務,核心方法processCommit()

processCommit如下:

 1 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
 2         try {
 3             boolean beforeCompletionInvoked = false;
 4             try {//3個前置操作
 5                 prepareForCommit(status);
 6                 triggerBeforeCommit(status);
 7                 triggerBeforeCompletion(status);
 8                 beforeCompletionInvoked = true;//3個前置操作已呼叫
 9                 boolean globalRollbackOnly = false;//新事務 或 全域性回滾失敗
10                 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
11                     globalRollbackOnly = status.isGlobalRollbackOnly();
12                 }//1.有儲存點,即巢狀事務
13                 if (status.hasSavepoint()) {
14                     if (status.isDebug()) {
15                         logger.debug("Releasing transaction savepoint");
16                     }//釋放儲存點
17                     status.releaseHeldSavepoint();
18                 }//2.新事務
19                 else if (status.isNewTransaction()) {
20                     if (status.isDebug()) {
21                         logger.debug("Initiating transaction commit");
22                     }//呼叫事務處理器提交事務
23                     doCommit(status);
24                 }
25                 // 3.非新事務,且全域性回滾失敗,但是提交時沒有得到異常,丟擲異常
27                 if (globalRollbackOnly) {
28                     throw new UnexpectedRollbackException(
29                             "Transaction silently rolled back because it has been marked as rollback-only");
30                 }
31             }
32             catch (UnexpectedRollbackException ex) {
33                 // 觸發完成後事務同步,狀態為回滾
34                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
35                 throw ex;
36             }// 事務異常
37             catch (TransactionException ex) {
38                 // 提交失敗回滾
39                 if (isRollbackOnCommitFailure()) {
40                     doRollbackOnCommitException(status, ex);
41                 }// 觸發完成後回撥,事務同步狀態為未知
42                 else {
43                     triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
44                 }
45                 throw ex;
46             }// 執行時異常
47             catch (RuntimeException ex) {
            // 如果3個前置步驟未完成,呼叫前置的最後一步操作 48 if (!beforeCompletionInvoked) { 49 triggerBeforeCompletion(status); 50 }// 提交異常回滾 51 doRollbackOnCommitException(status, ex); 52 throw ex; 53 }// 其它異常 54 catch (Error err) {  
            // 如果3個前置步驟未完成,呼叫前置的最後一步操作 55 if (!beforeCompletionInvoked) { 56 triggerBeforeCompletion(status); 57 }// 提交異常回滾 58 doRollbackOnCommitException(status, err); 59 throw err; 60 } 61 62 // Trigger afterCommit callbacks, with an exception thrown there 63 // propagated to callers but the transaction still considered as committed. 64 try { 65 triggerAfterCommit(status); 66 } 67 finally { 68 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); 69 } 70 71 } 72 finally { 73 cleanupAfterCompletion(status); 74 } 75 }

如上圖,commit事務時,有6個核心操作,分別是3個前置操作,3個後置操作,如下:

1.prepareForCommit(status);原始碼是空的,沒有拓展目前。

2.triggerBeforeCommit(status); 提交前觸發操作

1 protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
2         if (status.isNewSynchronization()) {
3             if (status.isDebug()) {
4                 logger.trace("Triggering beforeCommit synchronization");
5             }
6             TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
7         }
8     }

triggerBeforeCommit原始碼如下:

1 public static void triggerBeforeCommit(boolean readOnly) {
2         for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
3             synchronization.beforeCommit(readOnly);
4         }
5     }

如上圖,TransactionSynchronizationManager類定義了多個ThreadLocal(執行緒本地變數),其中一個用以儲存當前執行緒的事務同步:

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

遍歷事務同步器,把每個事務同步器都執行“提交前”操作,比如咱們用的jdbc事務,那麼最終就是SqlSessionUtils.beforeCommit()->this.holder.getSqlSession().commit();提交會話。

3.triggerBeforeCompletion(status);完成前觸發操作,如果是jdbc事務,那麼最終就是

SqlSessionUtils.beforeCompletion->

TransactionSynchronizationManager.unbindResource(sessionFactory); 解綁當前執行緒的會話工廠

this.holder.getSqlSession().close();關閉會話。

4.triggerAfterCommit(status);提交事務後觸發操作。TransactionSynchronizationUtils.triggerAfterCommit();->TransactionSynchronizationUtils.invokeAfterCommit,如下:

1 public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
2         if (synchronizations != null) {
3             for (TransactionSynchronization synchronization : synchronizations) {
4                 synchronization.afterCommit();
5             }
6         }
7     }

好吧,一頓找,最後在TransactionSynchronizationAdapter中複寫過,並且是空的....SqlSessionSynchronization繼承了TransactionSynchronizationAdapter但是沒有複寫這個方法。

5.triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);

TransactionSynchronizationUtils.TransactionSynchronizationUtils.invokeAfterCompletion,如下:

 1 public static void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
 2         if (synchronizations != null) {
 3             for (TransactionSynchronization synchronization : synchronizations) {
 4                 try {
 5                     synchronization.afterCompletion(completionStatus);
 6                 }
 7                 catch (Throwable tsex) {
 8                     logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
 9                 }
10             }
11         }
12     }

afterCompletion:對於JDBC事務來說,最終:

1)如果會話任然活著,關閉會話,

2)重置各種屬性:SQL會話同步器(SqlSessionSynchronization)的SQL會話持有者(SqlSessionHolder)的referenceCount引用計數、synchronizedWithTransaction同步事務、rollbackOnly只回滾、deadline超時時間點。

6.cleanupAfterCompletion(status);

1)設定事務狀態為已完成。

2) 如果是新的事務同步,解綁當前執行緒繫結的資料庫資源,重置資料庫連線

3)如果存在掛起的事務(巢狀事務),喚醒掛起的老事務的各種資源:資料庫資源、同步器。

 1     private void cleanupAfterCompletion(DefaultTransactionStatus status) {
 2         status.setCompleted();//設定事務狀態完成
       //如果是新的同步,清空當前執行緒繫結的除了資源外的全部執行緒本地變數:包括事務同步器、事務名稱、只讀屬性、隔離級別、真實的事務啟用狀態 3 if (status.isNewSynchronization()) { 4 TransactionSynchronizationManager.clear(); 5 }//如果是新的事務同步 6 if (status.isNewTransaction()) { 7 doCleanupAfterCompletion(status.getTransaction()); 8 }//如果存在掛起的資源 9 if (status.getSuspendedResources() != null) { 10 if (status.isDebug()) { 11 logger.debug("Resuming suspended transaction after completion of inner transaction"); 12 }//喚醒掛起的事務和資源(重新繫結之前掛起的資料庫資源,喚醒同步器,註冊同步器到TransactionSynchronizationManager) 13 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); 14 } 15 }

對於DataSourceTransactionManager,doCleanupAfterCompletion原始碼如下:

 1     protected void doCleanupAfterCompletion(Object transaction) {
 2         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 3 
 4         // 如果是最新的連線持有者,解綁當前執行緒繫結的<資料庫資源,ConnectionHolder>
 5         if (txObject.isNewConnectionHolder()) {
 6             TransactionSynchronizationManager.unbindResource(this.dataSource);
 7         }
 8 
 9         // 重置資料庫連線(隔離級別、只讀)
10         Connection con = txObject.getConnectionHolder().getConnection();
11         try {
12             if (txObject.isMustRestoreAutoCommit()) {
13                 con.setAutoCommit(true);
14             }
15             DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
16         }
17         catch (Throwable ex) {
18             logger.debug("Could not reset JDBC Connection after transaction", ex);
19         }
20 
21         if (txObject.isNewConnectionHolder()) {
22             if (logger.isDebugEnabled()) {
23                 logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
24             }// 資源引用計數-1,關閉資料庫連線
25             DataSourceUtils.releaseConnection(con, this.dataSource);
26         }
27         // 重置連線持有者的全部屬性
28         txObject.getConnectionHolder().clear();
29     }

上面這6個方法是AbstractPlatformTransactionManager做的事,本地事務和分散式事務都會執行。

doCommit就是呼叫事務管理器來實現事務提交。分散式事務環境下呼叫的是:JtaTransactionManager.doCommit()。

3.2.1 JtaTransactionManager.doCommit

 1 @Override
 2     protected void doCommit(DefaultTransactionStatus status) {
 3         JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();
 4         try {
 5             int jtaStatus = txObject.getUserTransaction().getStatus();
 6             if (jtaStatus == Status.STATUS_NO_TRANSACTION) {
 7                 // 事務狀態=已完結,非事務狀態。一般不會觸發
10                 throw new UnexpectedRollbackException("JTA transaction already completed - probably rolled back");
11             }
12             if (jtaStatus == Status.STATUS_ROLLEDBACK) {
13                 // 回滾
16                 try {
17                     txObject.getUserTransaction().rollback();
18                 }
19                 catch (IllegalStateException ex) {
20                     if (logger.isDebugEnabled()) {
21                         logger.debug("Rollback failure with transaction already marked as rolled back: " + ex);
22                     }
23                 }
24                 throw new UnexpectedRollbackException("JTA transaction already rolled back (probably due to a timeout)");
25             }//核心操作:提交事務
26             txObject.getUserTransaction().commit();
27         }
28         catch (RollbackException ex) {
29             throw new UnexpectedRollbackException(
30                     "JTA transaction unexpectedly rolled back (maybe due to a timeout)", ex);
31         }
32         catch (HeuristicMixedException ex) {
33             throw new HeuristicCompletionException(HeuristicCompletionException.STATE_MIXED, ex);
34         }
35         catch (HeuristicRollbackException ex) {
36             throw new HeuristicCompletionException(HeuristicCompletionException.STATE_ROLLED_BACK, ex);
37         }
38         catch (IllegalStateException ex) {
39             throw new TransactionSystemException("Unexpected internal transaction state", ex);
40         }
41         catch (SystemException ex) {
42             throw new TransactionSystemException("JTA failure on commit", ex);
43         }
44     }

txObject.getUserTransaction().commit()-->呼叫UserTransactionImp的commit()

1 public void commit () throws javax.transaction.RollbackException,
2             javax.transaction.HeuristicMixedException,
3             javax.transaction.HeuristicRollbackException,
4             javax.transaction.SystemException, java.lang.IllegalStateException,
5             java.lang.SecurityException
6     {
7         checkSetup ();
8         txmgr_.commit ();
9     }

TransactionManager的commit

 1 public void commit () throws javax.transaction.RollbackException,
 2             javax.transaction.HeuristicMixedException,
 3             javax.transaction.HeuristicRollbackException,
 4             javax.transaction.SystemException, java.lang.IllegalStateException,
 5             java.lang.SecurityException
 6     {
 7         Transaction tx = getTransaction();
 8         if ( tx == null ) raiseNoTransaction();
 9         tx.commit();
10     }

最終呼叫的是TransactionImp的commit()

 1 public void commit() throws javax.transaction.RollbackException,
 2             javax.transaction.HeuristicMixedException,
 3             javax.transaction.HeuristicRollbackException,
 4             javax.transaction.SystemException, java.lang.SecurityException
 5     {
 6         try {
 7             ct_.commit();
 8         } catch ( HeurHazardException hh ) {
 9             rethrowAsJtaHeuristicMixedException ( hh.getMessage () , hh );
10         } catch ( HeurRollbackException hr ) {
11             rethrowAsJtaHeuristicRollbackException ( hr.getMessage () , hr );
12         } catch ( HeurMixedException hm ) {
13             rethrowAsJtaHeuristicMixedException ( hm.getMessage () , hm );
14         } catch ( SysException se ) {
15             LOGGER.logWarning ( se.getMessage() , se );
16             throw new ExtendedSystemException ( se.getMessage (), se
17                     .getErrors () );
18         } catch ( com.atomikos.icatch.RollbackException rb ) {
19             //see case 29708: all statements have been closed
20             String msg = rb.getMessage ();
21             Throwable cause = rb.getCause();
22             if (cause == null) cause = rb;
23             rethrowAsJtaRollbackException (msg , cause);        
24         }
25     }

這裡就是呼叫CompositeTransaction介面的實現類CompositeTransactionImp的commit()

1 public void commit () throws HeurRollbackException, HeurMixedException,
2             HeurHazardException, SysException, SecurityException,
3             RollbackException
4     {
5         getTerminator().commit();
6     }

呼叫“組合命令列實現類”CompositeTerminatorImp的commit()

 1 public void commit () throws HeurRollbackException, HeurMixedException,
 2             HeurHazardException, SysException, java.lang.SecurityException,
 3             RollbackException
 4     {
 5         Stack errors = new Stack ();
 6         // 1.標記事務狀態
 7         transaction_.doCommit ();
 8         setSiblingInfoForIncoming1pcRequestFromRemoteClient();
 9         
10         if ( transaction_.isRoot () ) {
11             try {//2.提交事務,核心操作
12                 coordinator_.terminate ( true );
13             }
14 
15             catch ( RollbackException rb ) {
16                 throw rb;
17             } catch ( HeurHazardException hh ) {
18                 throw hh;
19             } catch ( HeurRollbackException hr ) {
20                 throw hr;
21             } catch ( HeurMixedException hm ) {
22                 throw hm;
23             } catch ( SysException se ) {
24                 throw se;
25             } catch ( Exception e ) {
26                 errors.push ( e );
27                 throw new SysException (
28                         "Unexpected error: " + e.getMessage (), errors );
29             }
30         }
31 
32     }

如上,1.呼叫“組合事務實現類”CompositeTransactionImp的doCommit()這裡只做標記為非活動,沒有提交事務。

2.呼叫CoordinatorImp的terminate()終結事務。

 1 protected void terminate ( boolean commit ) throws HeurRollbackException,
 2             HeurMixedException, SysException, java.lang.SecurityException,
 3             HeurCommitException, HeurHazardException, RollbackException,
 4             IllegalStateException
 5 
 6     {    
 7         synchronized ( fsm_ ) {
 8             if ( commit ) {// 如果只有一個參與者,直接一階段提交
 9                 if ( participants_.size () <= 1 ) {
10                     commit ( true );
11                 } else {//二階段提交:prepare階段
12                     int prepareResult = prepare ();
13                     // 二階段提交:commit階段。非只讀事務,才需要提交事務,
14                     if ( prepareResult != Participant.READ_ONLY )
15                         commit ( false );
16                 }
17             } else {
18                 rollback ();
19             }
20         }
21     }

如上圖是原始碼的優化精華:

1.根據參與者判斷,如果只有一個參與者,直接優化成一階段提交。

2.prepare完後,commit階段如果是隻讀事務,不用commit。

咱們是分散式事務,插入2個庫,肯定是兩階段提交。

CoordinatorImp的commit

 1 public HeuristicMessage[] commit ( boolean onePhase )
 2             throws HeurRollbackException, HeurMixedException,
 3             HeurHazardException, java.lang.IllegalStateException,
 4             RollbackException, SysException
 5     {
 6         HeuristicMessage[] ret = null;
 7         synchronized ( fsm_ ) {
 8             ret = stateHandler_.commit(onePhase);
 9         }
10         return ret;
11     }

追蹤到IndoubtStateHandler的commit,這個操作加了同步鎖。具體實現如下:

 1 protected HeuristicMessage[] commit ( boolean onePhase )
 2             throws HeurRollbackException, HeurMixedException,
 3             HeurHazardException, java.lang.IllegalStateException,
 4             RollbackException, SysException
 5     {
 6 
 7         return commitWithAfterCompletionNotification ( new CommitCallback() {
 8             public HeuristicMessage[] doCommit()
 9                     throws HeurRollbackException, HeurMixedException,
10                     HeurHazardException, IllegalStateException,
11                     RollbackException, SysException {
12                 return commitFromWithinCallback ( false, false );
13             }              
14         });
15 
16     }

如上,核心介面就是CommitCallback的doCommit方法,方法體就是commitFromWithinCallback。

  1 protected HeuristicMessage[] commitFromWithinCallback ( boolean heuristic ,
  2             boolean onePhase ) throws HeurRollbackException,
  3             HeurMixedException, HeurHazardException,
  4             java.lang.IllegalStateException, RollbackException, SysException
  5     {
  6         Stack<Exception> errors = new Stack<Exception> ();
  7         CoordinatorStateHandler nextStateHandler = null;
  8 
  9         try {
 10 
 11             Vector<Participant> participants = coordinator_.getParticipants();
 12             int count = (participants.size () - readOnlyTable_.size ());
 13             TerminationResult commitresult = new TerminationResult ( count );
 14 
 15             // cf bug 64546: avoid committed_ being null upon recovery!
 16             committed_ = new Boolean ( true );
 17             // for replaying completion: commit decision was reached
 18             // otherwise, replay requests might only see TERMINATED!
 19 
 20             try {
 21                 coordinator_.setState ( TxState.COMMITTING );
 22             } catch ( RuntimeException error ) {
 23                 //See case 23334
 24                 String msg = "Error in committing: " + error.getMessage() + " - rolling back instead";
 25                 LOGGER.logWarning ( msg , error );
 26                 try {
 27                     rollbackFromWithinCallback(getCoordinator().isRecoverableWhileActive().booleanValue(),false);
 28                     throw new RollbackException ( msg , error );
 29                 } catch ( HeurCommitException e ) {
 30                     LOGGER.logWarning ( "Illegal heuristic commit during rollback:" + e );
 31                     throw new HeurMixedException ( e.getHeuristicMessages() );
 32                 }
 33             }
 34 
 35 
 36             // start messages
 37             Enumeration<Participant> enumm = participants.elements ();
 38             while ( enumm.hasMoreElements () ) {
 39                 Participant p = enumm.nextElement ();
 40                 if ( !readOnlyTable_.containsKey ( p ) ) {
 41                     CommitMessage cm = new CommitMessage ( p, commitresult,
 42                             onePhase );
 43 
 44                     // if onephase: set cascadelist anyway, because if the
 45                     // participant is a REMOTE one, then it might have
 46                     // multiple participants that are not visible here!
 47 
 48                     if ( onePhase && cascadeList_ != null ) { // null for OTS
 49                         Integer sibnum = (Integer) cascadeList_.get ( p );
 50                         if ( sibnum != null ) // null for local participant!
 51                             p.setGlobalSiblingCount ( sibnum.intValue () );
 52                         p.setCascadeList ( cascadeList_ );
 53                     }
 54                     propagator_.submitPropagationMessage ( cm );
 55                 }
 56             } // while
 57 
 58             commitresult.waitForReplies ();
 59             int res = commitresult.getResult ();
 60 
 61             if ( res != TerminationResult.ALL_OK ) {
 62 
 63                 if ( res == TerminationResult.HEUR_MIXED ) {
 64                     Hashtable<Participant,TxState> hazards = commitresult.getPossiblyIndoubts ();
 65                     Hashtable heuristics = commitresult
 66                             .getHeuristicParticipants ();
 67                     addToHeuristicMap ( heuristics );
 68                     enumm = participants.elements ();
 69                     while ( enumm.hasMoreElements () ) {
 70                         Participant p = (Participant) enumm.nextElement ();
 71                         if ( !heuristics.containsKey ( p ) )
 72                             addToHeuristicMap ( p, TxState.TERMINATED );
 73                     }
 74                     nextStateHandler = new HeurMixedStateHandler ( this,
 75                             hazards );
 76 
 77                     coordinator_.setStateHandler ( nextStateHandler );
 78                     throw new HeurMixedException ( getHeuristicMessages () );
 79                 }
 80 
 81                 else if ( res == TerminationResult.ROLLBACK ) {
 82                     // 1PC and rolled back before commit arrived.
 83                     nextStateHandler = new TerminatedStateHandler ( this );
 84                     coordinator_.setStateHandler ( nextStateHandler );
 85                     throw new RollbackException ( "Rolled back already." );
 86                 } else if ( res == TerminationResult.HEUR_ROLLBACK ) {
 87                     nextStateHandler = new HeurAbortedStateHandler ( this );
 88                     coordinator_.setStateHandler ( nextStateHandler );
 89                     // Here, we do NOT need to add extra information, since ALL
 90                     // participants agreed to rollback. 
 91                     // Therefore, we need not worry about who aborted and who committed.
 92                     throw new HeurRollbackException ( getHeuristicMessages () );
 93 
 94                 }
 95 
 96                 else if ( res == TerminationResult.HEUR_HAZARD ) {
 97                     Hashtable hazards = commitresult.getPossiblyIndoubts ();
 98                     Hashtable heuristics = commitresult
 99                             .getHeuristicParticipants ();
100                     addToHeuristicMap ( heuristics );
101                     enumm = participants.elements ();
102                     while ( enumm.hasMoreElements () ) {
103                         Participant p = (Participant) enumm.nextElement ();
104                         if ( !heuristics.containsKey ( p ) )
105                             addToHeuristicMap ( p, TxState.TERMINATED );
106                     }
107                     nextStateHandler = new HeurHazardStateHandler ( this,
108                             hazards );
109                     coordinator_.setStateHandler ( nextStateHandler );
110                     throw new HeurHazardException ( getHeuristicMessages () );
111                 }
112 
113             } else {
114                 // all OK
115                 if ( heuristic ) {
116                     nextStateHandler = new HeurCommittedStateHandler ( this );
117                     // again, here we do NOT need to preserve extra per-participant
118                     // state mappings, since ALL participants were heur. committed.
119                 } else
120                     nextStateHandler = new TerminatedStateHandler ( this );
121 
122                 coordinator_.setStateHandler ( nextStateHandler );
123             }
124         } catch ( RuntimeException runerr ) {
125             errors.push ( runerr );
126             throw new SysException ( "Error in commit: " + runerr.getMessage (), errors );
127         }
128 
129         catch ( InterruptedException intr ) {
130             // cf bug 67457
131             InterruptedExceptionHelper.handleInterruptedException ( intr );
132             errors.push ( intr );
133             throw new SysException ( "Error in commit" + intr.getMessage (), errors );
134         }
135 
136         return getHeuristicMessages ();
137 
138     }

如上,構造了一個CommitMessage,呼叫傳播者Propagator的submitPropagationMessage()提交傳播訊息。-》CommitMessage的send()方法-》Participant的commit().

-》XAResourceTransaction的commit提交XA資源-》XAResource的commit( xid_, onePhase );饒了一大圈終於到了最最核心的程式碼了....我們這裡XAResource介面的實現類是MysqlXAConnection。

 1 public void commit(Xid xid, boolean onePhase) throws XAException {
 2         StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
 3         commandBuf.append("XA COMMIT ");
 4         appendXid(commandBuf, xid);
 5 
 6         if (onePhase) {
 7             commandBuf.append(" ONE PHASE");
 8         }
 9 
10         try {
11             dispatchCommand(commandBuf.toString());
12         } finally {
13             this.underlyingConnection.setInGlobalTx(false);
14         }
15     }

dispatchCommand排程命令如下:

 1 private ResultSet dispatchCommand(String command) throws XAException {
 2         Statement stmt = null;
 3 
 4         try {
 5             if (this.logXaCommands) {
 6                 this.log.logDebug("Executing XA statement: " + command);
 7             }
 8 
 9             // TODO: Cache this for lifetime of XAConnection這裡還規劃要做快取 - -!
10             stmt = this.underlyingConnection.createStatement();
11        // 核心執行事務提交
12             stmt.execute(command);
13 
14             ResultSet rs = stmt.getResultSet();
15 
16             return rs;
17         } catch (SQLException sqlEx) {
18             throw mapXAExceptionFromSQLException(sqlEx);
19         } finally {
20             if (stmt != null) {
21                 try {
22                     stmt.close();
23                 } catch (SQLException sqlEx) {
24                 }
25             }
26         }
27     }

如上就是一個經典的使用jdbc執行sql語句的過程:

1.使用com.mysql.jdbc.Connection建立Statement。

2.Statement執行sql命令.

3.得到結果。

3.3 rollback回滾事務

AbstractPlatformTransactionManager中rollback原始碼如下:

1     public final void rollback(TransactionStatus status) throws TransactionException {
2         if (status.isCompleted()) {
3             throw new IllegalTransactionStateException(
4                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
5         }
6 
7         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
8         processRollback(defStatus);
9     }

processRollback原始碼如下:

 1     private void processRollback(DefaultTransactionStatus status) {
 2         try {
 3             try {// 解綁當前執行緒繫結的會話工廠,並關閉會話
 4                 triggerBeforeCompletion(status);
 5                 if (status.hasSavepoint()) {// 1.如果有儲存點,即巢狀式事務
 6                     if (status.isDebug()) {
 7                         logger.debug("Rolling back transaction to savepoint");
 8                     }//回滾到儲存點
 9                     status.rollbackToHeldSavepoint();
10                 }//2.如果就是一個簡單事務
11                 else if (status.isNewTransaction()) {
12                     if (status.isDebug()) {
13                         logger.debug("Initiating transaction rollback");
14                     }//回滾核心方法
15                     doRollback(status);
16                 }//3.當前存在事務且沒有儲存點,即加入當前事務的
17                 else if (status.hasTransaction()) {//如果已經標記為回滾 或 當加入事務失敗時全域性回滾(預設true)
18                     if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
19                         if (status.isDebug()) {//debug時會列印:加入事務失敗-標記已存在事務為回滾
20                             logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
21                         }//設定當前connectionHolder:當加入一個已存在事務時回滾
22                         doSetRollbackOnly(status);
23                     }
24                     else {
25                         if (status.isDebug()) {
26                             logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
27                         }
28                     }
29                 }
30                 else {
31                     logger.debug("Should roll back transaction but cannot - no transaction available");
32                 }
33             }
34             catch (RuntimeException ex) {//關閉會話,重置SqlSessionHolder屬性
35                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
36                 throw ex;
37             }
38             catch (Error err) {
39                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
40                 throw err;
41             }
42             triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
43         }
44         finally {、、解綁當前執行緒
45             cleanupAfterCompletion(status);
46         }
47     }

核心操作是doRollback,分散式環境下呼叫的是JtaTransactionManager的doRollback。

 1 @Override
 2     protected void doRollback(DefaultTransactionStatus status) {
 3         JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();
 4         try {
 5             int jtaStatus = txObject.getUserTransaction().getStatus();
 6             if (jtaStatus != Status.STATUS_NO_TRANSACTION) {
 7                 try {
 8                     txObject.getUserTransaction().rollback();
 9                 }
10                 catch (IllegalStateException ex) {
11                     if (jtaStatus == Status.STATUS_ROLLEDBACK) {
12                         // Only really happens on JBoss 4.2 in case of an early timeout...
13                         if (logger.isDebugEnabled()) {
14                             logger.debug("Rollback failure with transaction already marked as rolled back: " + ex);
15                         }
16                     }
17                     else {
18                         throw new TransactionSystemException("Unexpected internal transaction state", ex);
19                     }
20                 }
21             }
22         }
23         catch (SystemException ex) {
24             throw new TransactionSystemException("JTA failure on rollback", ex);
25         }
26     }

呼叫的UserTransactionImp的rollback
1 public void rollback () throws IllegalStateException, SystemException,
2             SecurityException
3     {
4         checkSetup ();
5         txmgr_.rollback ();
6     }
TransactionManagerImp的rollback
1 public void rollback () throws IllegalStateException, SystemException,
2             SecurityException
3     {
4         Transaction tx = getTransaction();
5         if ( tx == null ) raiseNoTransaction();
6         tx.rollback();
7        
8     }
TransactionImp的rollback
 1 public void rollback() throws IllegalStateException, SystemException
 2     {
 3         try {
 4             ct_.rollback();
 5         } catch ( SysException se ) {
 6             LOGGER.logWarning ( se.getMessage() , se );
 7             throw new ExtendedSystemException ( se.getMessage (), se
 8                     .getErrors () );
 9         }
10 
11     }
CompositeTransactionImp的rollback

1  public void rollback () throws IllegalStateException, SysException
2     {
3         getTerminator().rollback();
4     }
CompositeTerminatorImp的rollback
 1 public void rollback () throws IllegalStateException, SysException
 2     {
 3         Stack errors = new Stack ();
 4 
 5         transaction_.doRollback ();
 6 
 7         if ( transaction_.isRoot () )
 8             try {
 9                 coordinator_.terminate ( false );
10             } catch ( Exception e ) {
11                 errors.push ( e );
12                 throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), errors );
13             }
14     }

一直追蹤到

RollbackMessage.send

 1 protected Object send () throws PropagationException
 2     {
 3         Participant part = getParticipant ();
 4         HeuristicMessage[] msgs = null;
 5         try {
 6             msgs = part.rollback ();
 7 
 8         } catch ( HeurCommitException heurc ) {
 9             throw new PropagationException ( heurc, false );
10         } catch ( HeurMixedException heurm ) {
11             throw new PropagationException ( heurm, false );
12         }
13 
14         catch ( Exception e ) {
15             // only retry if might be indoubt. Otherwise ignore.
16             if ( indoubt_ ) {
17                 // here, participant might be indoubt!
18                 // fill in exact heuristic msgs by using buffered effect of proxies
19                 HeurHazardException heurh = new HeurHazardException ( part.getHeuristicMessages () );
20                 throw new PropagationException ( heurh, true );
21             }
22         }
23         return msgs;
24     }
XAResourceTransaction.rollback->MysqlXAConnection.rollback
 1 public void rollback(Xid xid) throws XAException {
 2         StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
 3         commandBuf.append("XA ROLLBACK ");
 4         appendXid(commandBuf, xid);
 5 
 6         try {
 7             dispatchCommand(commandBuf.toString());
 8         } finally {
 9             this.underlyingConnection.setInGlobalTx(false);
10         }
11     }
dispatchCommand執行sql
 1 private ResultSet dispatchCommand(String command) throws XAException {
 2         Statement stmt = null;
 3 
 4         try {
 5             if (this.logXaCommands) {
 6                 this.log.logDebug("Executing XA statement: " + command);
 7             }
 8 
 9             // TODO: Cache this for lifetime of XAConnection
10             stmt = this.underlyingConnection.createStatement();
11 
12             stmt.execute(command);
13 
14             ResultSet rs = stmt.getResultSet();
15 
16             return rs;
17         } catch (SQLException sqlEx) {
18             throw mapXAExceptionFromSQLException(sqlEx);
19         } finally {
20             if (stmt != null) {
21                 try {
22                     stmt.close();
23                 } catch (SQLException sqlEx) {
24                 }
25             }
26         }
27     }

debug得到command:XA ROLLBACK 0x3139322e3136382e36302e31312e746d30303030313030303437,0x3139322e3136382e36302e31312e746d31,0x41544f4d
至此,回滾完畢。