分散式事務(五)原始碼詳解
目錄
正文
回到頂部系列目錄
引子
本節我們將會從上一節的”簡單樣例“入手:Spring Boot+Atomikos(TM)+Mybatis(ORM)+Mysql(DB),深入原始碼,看看這個分散式事務是怎麼定義、執行的。
先來回憶一下第二節講的JTA規範,如下圖。Atomikos是什麼角色?起到什麼作用?
角色:
Atomikos根本上是一個事務管理器(TM)也就是JTA模型的核心,上圖扇形的中心位置。
作用:
TM呼叫 【Resource Manager資源管理器】 的XAResource介面來實現事務操作。
TM依賴 【Application Server應用伺服器】 的TransactionManager介面當然如果伺服器不支援事務管理,自然也就只能使用第三方包,例如Atomikos。
TM依賴 【Application應用程式】 設定事務邊界、屬性,application呼叫UserTransaction介面控制事務開始、提交、回滾。
回到頂部一、bean定義
1.1JtaTransactionManager
org.springframework.transaction.jta.JtaTransactionManager類是spring提供的分散式事務管理器。
JtaTransactionManager類圖如下:
實現了介面如下:
- PlatformTransactionManager :獲取事務,提交事務,回滾事務
- TransactionFactory:建立事務
- InitializingBean:初始化bean
JtaTransactionManager實現了InitializingBean介面的afterPropertiesSet()方法,處於bean生命週期的容器初始化->例項化期->初始化中期,如下圖:
下面我們看一下JtaTransactionManager在bean初始化中期InitializingBean介面的afterPropertiesSet()做了什麼:
1 /** 2 * Initialize the UserTransaction as well as the TransactionManager handle. 3 * @see #initUserTransactionAndTransactionManager() 4 */ 5 @Override 6 public void afterPropertiesSet() throws TransactionSystemException { 7 initUserTransactionAndTransactionManager(); 8 checkUserTransactionAndTransactionManager(); 9 initTransactionSynchronizationRegistry(); 10 }
-
1.initUserTransactionAndTransactionManager:初始化UserTransaction和TransactionManager介面。主要是如果沒有定義的話,可以支援JNDI。
-
2.checkUserTransactionAndTransactionManager:校驗2個介面是否存在。UserTransaction必須定義,TransactionManager可以不定義。
原始碼如下:
對應控制檯列印:
o.s.t.jta.JtaTransactionManager : Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@614aeccc o.s.t.jta.JtaTransactionManager : Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@5116ac09
- 3.initTransactionSynchronizationRegistry:初始化事務同步註冊,這個不使用JNDI的話沒啥用。
上一節分散式事務(三)簡單樣例中我們配置了JtaTransactionManagerConfig類,如下:
1 package study.config.datasource; 2 3 import com.atomikos.icatch.jta.UserTransactionImp; 4 import com.atomikos.icatch.jta.UserTransactionManager; 5 import org.springframework.context.annotation.Bean; 6 import org.springframework.context.annotation.Configuration; 7 import org.springframework.transaction.jta.JtaTransactionManager; 8 9 import javax.transaction.UserTransaction; 10 11 /** 12 * 事務管理器配置類 13 * 14 * @author denny 15 */ 16 @Configuration 17 public class JtaTransactionManagerConfig { 18 19 @Bean(name = "atomikosTransactionManager") 20 public JtaTransactionManager regTransactionManager() { 21 UserTransactionManager userTransactionManager = new UserTransactionManager(); 22 UserTransaction userTransaction = new UserTransactionImp(); 23 return new JtaTransactionManager(userTransaction, userTransactionManager); 24 } 25 }
如上圖,我們定義了一個name = "atomikosTransactionManager"的bean,具體型別為JtaTransactionManager。其中構造了2個實現類UserTransactionImp(javax.transaction.UserTransaction介面)、UserTransactionManager(javax.transaction.TransactionManager介面)。並用這2個實現類構造了一個JtaTransactionManager。
1.UserTransaction介面
提供給使用者操控事務的:開啟,提交,回滾等等。原始碼如下:
2 TransactionManager介面
原始碼如下:
相比UserTransaction,TransactionManager介面多了介面的掛起、恢復、獲取事務3個介面。這3個方法明顯是留給系統自己呼叫的。
1.2 AtomikosDataSourceBean
Spring 為Atomikos定製了一個org.springframework.boot.jta.atomikos.AtomikosDataSourceBean,提供了bean生命週期的一些介面:
- BeanNameAware:設定bean名稱
- InitializingBean:初始化bean
- DisposableBean:銷燬bean
我們只需要定義這個bean即可輕鬆使得spring來維護。
com.atomikos.jdbc.AtomikosDataSourceBean類圖如下:
其中核心介面:
DataSource介面:getConnection獲取資料庫連線
ConnectionPoolProperties介面:用於載入連線池的屬性
回到頂部二、原始碼剖析
2.1 自動配置類
老套路哈,spring boot就這麼點花花腸子,既然使用@Transactional這種註解的方式,那麼我們就從springboot 容器啟動時的自動配置載入(spring boot容器啟動詳解)開始看。在/META-INF/spring.factories中配置檔案中查詢,如下圖:
載入2個關於事務的自動配置類:
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
org.springframework.boot.autoconfigure.transaction.jta.JtaAutoConfiguration,
由於本文是分散式事務,故2個配置檔案都生效了,我們先看JtaAutoConfiguration
2.2 JtaAutoConfiguration
1 /** 2 * {@link EnableAutoConfiguration Auto-configuration} for JTA. 3 * 4 * @author Josh Long 5 * @author Phillip Webb 6 * @since 1.2.0 7 */ 8 @ConditionalOnClass(javax.transaction.Transaction.class) 9 @ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true) 10 @AutoConfigureBefore({ XADataSourceAutoConfiguration.class, 11 ActiveMQAutoConfiguration.class, ArtemisAutoConfiguration.class, 12 HibernateJpaAutoConfiguration.class }) 13 @Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class, 14 AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class }) 15 @EnableConfigurationProperties(JtaProperties.class) 16 public class JtaAutoConfiguration { 17 18 }
如上,JtaAutoConfiguration這個類竟然是個空殼,只有一堆註解,挑幾個重要的講一講:
1.@ConditionalOnClass(javax.transaction.Transaction.class):代表類路徑下存在javax.transaction.Transaction.class這個類,那麼JtaAutoConfiguration生效。
2.@ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing =true),自動開啟spring.jta.enabled=true.
3.@Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class,AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class }),又是spring套路哈,用來匯入類。這裡匯入了4個配置類,可見支援4種第三方事務管理器。AtomikosJtaConfiguration.class自然就是Atomikos了。
AtomikosJtaConfiguration.class這個配置類
1 @Configuration 2 @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class }) 3 @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class }) 4 @ConditionalOnMissingBean(PlatformTransactionManager.class) 5 class AtomikosJtaConfiguration { 6 7 private final JtaProperties jtaProperties; 8 9 private final TransactionManagerCustomizers transactionManagerCustomizers; 10 11 AtomikosJtaConfiguration(JtaProperties jtaProperties, 12 ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) { 13 this.jtaProperties = jtaProperties; 14 this.transactionManagerCustomizers = transactionManagerCustomizers 15 .getIfAvailable(); 16 } 17 18 @Bean(initMethod = "init", destroyMethod = "shutdownForce") 19 @ConditionalOnMissingBean(UserTransactionService.class) 20 public UserTransactionServiceImp userTransactionService( 21 AtomikosProperties atomikosProperties) { 22 Properties properties = new Properties(); 23 if (StringUtils.hasText(this.jtaProperties.getTransactionManagerId())) { 24 properties.setProperty("com.atomikos.icatch.tm_unique_name", 25 this.jtaProperties.getTransactionManagerId()); 26 } 27 properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir()); 28 properties.putAll(atomikosProperties.asProperties()); 29 return new UserTransactionServiceImp(properties); 30 } 31 32 private String getLogBaseDir() { 33 if (StringUtils.hasLength(this.jtaProperties.getLogDir())) { 34 return this.jtaProperties.getLogDir(); 35 } 36 File home = new ApplicationHome().getDir(); 37 return new File(home, "transaction-logs").getAbsolutePath(); 38 } 39 40 @Bean(initMethod = "init", destroyMethod = "close") 41 @ConditionalOnMissingBean 42 public UserTransactionManager atomikosTransactionManager( 43 UserTransactionService userTransactionService) throws Exception { 44 UserTransactionManager manager = new UserTransactionManager(); 45 manager.setStartupTransactionService(false); 46 manager.setForceShutdown(true); 47 return manager; 48 } 49 50 @Bean 51 @ConditionalOnMissingBean(XADataSourceWrapper.class) 52 public AtomikosXADataSourceWrapper xaDataSourceWrapper() { 53 return new AtomikosXADataSourceWrapper(); 54 } 55 56 @Bean 57 @ConditionalOnMissingBean 58 public static AtomikosDependsOnBeanFactoryPostProcessor atomikosDependsOnBeanFactoryPostProcessor() { 59 return new AtomikosDependsOnBeanFactoryPostProcessor(); 60 } 61 62 @Bean 63 public JtaTransactionManager transactionManager(UserTransaction userTransaction, 64 TransactionManager transactionManager) { 65 JtaTransactionManager jtaTransactionManager = new JtaTransactionManager( 66 userTransaction, transactionManager); 67 if (this.transactionManagerCustomizers != null) { 68 this.transactionManagerCustomizers.customize(jtaTransactionManager); 69 } 70 return jtaTransactionManager; 71 } 72 73 @Configuration 74 @ConditionalOnClass(Message.class) 75 static class AtomikosJtaJmsConfiguration { 76 77 @Bean 78 @ConditionalOnMissingBean(XAConnectionFactoryWrapper.class) 79 public AtomikosXAConnectionFactoryWrapper xaConnectionFactoryWrapper() { 80 return new AtomikosXAConnectionFactoryWrapper(); 81 } 82 83 } 84 85 }
2.3 TransactionAutoConfiguration
這裡和本地事務分析過程一致,就不再重複,飛機票spring事務詳解(三)原始碼詳解,一直看到第二節結束.這裡只截個圖:
最終原始碼呼叫具體事務管理器的PlatformTransactionManager介面的3個方法:
1 public interface PlatformTransactionManager { 2 // 獲取事務狀態 3 TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException; 4 // 事務提交 5 void commit(TransactionStatus status) throws TransactionException; 6 // 事務回滾 7 void rollback(TransactionStatus status) throws TransactionException; 8 }回到頂部
三、核心原始碼
核心實現類圖:
如上提所示,PlatformTransactionManager頂級介面定義了最核心的事務管理方法,下面一層是AbstractPlatformTransactionManager抽象類,實現了PlatformTransactionManager介面的方法並定義了一些抽象方法,供子類拓展。最下面一層是2個經典事務管理器:
1.DataSourceTransactionmanager: 即本地單資源事務管理器。
2.JtaTransactionManager:即多資源事務管理器(又叫做分散式事務管理器),其實現了JTA規範,使用XA協議進行兩階段提交。
我們這裡自然是JTA分散式環境,我們只需要從JtaTransactionManager這個實現類入手即可。
3.1getTransaction獲取事務
AbstractPlatformTransactionManager實現了getTransaction()方法如下:
1 @Override 2 public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { 3 Object transaction = doGetTransaction(); 4 5 // Cache debug flag to avoid repeated checks. 6 boolean debugEnabled = logger.isDebugEnabled(); 7 8 if (definition == null) { 9 // Use defaults if no transaction definition given. 10 definition = new DefaultTransactionDefinition(); 11 } 12 // 如果當前已經存在事務 13 if (isExistingTransaction(transaction)) { 14 // 根據不同傳播機制不同處理 15 return handleExistingTransaction(definition, transaction, debugEnabled); 16 } 17 18 // 超時不能小於預設值 19 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { 20 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); 21 } 22 23 // 當前不存在事務,傳播機制=MANDATORY(支援當前事務,沒事務報錯),報錯 24 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { 25 throw new IllegalTransactionStateException( 26 "No existing transaction found for transaction marked with propagation 'mandatory'"); 27 }// 當前不存在事務,傳播機制=REQUIRED/REQUIRED_NEW/NESTED,這三種情況,需要新開啟事務,且加上事務同步 28 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || 29 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || 30 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { 31 SuspendedResourcesHolder suspendedResources = suspend(null); 32 if (debugEnabled) { 33 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); 34 } 35 try {// 是否需要新開啟同步// 開啟// 開啟 36 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 37 DefaultTransactionStatus status = newTransactionStatus( 38 definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); 39 doBegin(transaction, definition);// 開啟新事務 40 prepareSynchronization(status, definition);//預備同步 41 return status; 42 } 43 catch (RuntimeException ex) { 44 resume(null, suspendedResources); 45 throw ex; 46 } 47 catch (Error err) { 48 resume(null, suspendedResources); 49 throw err; 50 } 51 } 52 else { 53 // 當前不存在事務當前不存在事務,且傳播機制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,這三種情況,建立“空”事務:沒有實際事務,但可能是同步。警告:定義了隔離級別,但並沒有真實的事務初始化,隔離級別被忽略有隔離級別但是並沒有定義實際的事務初始化,有隔離級別但是並沒有定義實際的事務初始化, 54 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { 55 logger.warn("Custom isolation level specified but no actual transaction initiated; " + 56 "isolation level will effectively be ignored: " + definition); 57 } 58 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); 59 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); 60 } 61 }
上圖核心步驟就是:
- 1.doGetTransaction():獲取事務
- 2.doBegin:準備工作
3.1.1 JtaTransactionManager的doGetTransaction()
其實也就是把UserTransaction封裝成一個JtaTransactionObject返回。
1 @Override 2 protected Object doGetTransaction() { 3 UserTransaction ut = getUserTransaction(); 4 if (ut == null) { 5 throw new CannotCreateTransactionException("No JTA UserTransaction available - " + 6 "programmatic PlatformTransactionManager.getTransaction usage not supported"); 7 } 8 if (!this.cacheUserTransaction) { 9 ut = lookupUserTransaction( 10 this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME); 11 } 12 return doGetJtaTransaction(ut); 13 } 14 15 /** 16 * Get a JTA transaction object for the given current UserTransaction. 17 * <p>Subclasses can override this to provide a JtaTransactionObject 18 * subclass, for example holding some additional JTA handle needed. 19 * @param ut the UserTransaction handle to use for the current transaction 20 * @return the JtaTransactionObject holding the UserTransaction 21 */ 22 protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) { 23 return new JtaTransactionObject(ut); 24 }
3.1.2 JtaTransactionManager.doBegin
1 @Override 2 protected void doBegin(Object transaction, TransactionDefinition definition) { 3 JtaTransactionObject txObject = (JtaTransactionObject) transaction; 4 try { 5 doJtaBegin(txObject, definition); 6 } 7 catch (NotSupportedException ex) { 8 // assume nested transaction not supported 9 throw new NestedTransactionNotSupportedException( 10 "JTA implementation does not support nested transactions", ex); 11 } 12 catch (UnsupportedOperationException ex) { 13 // assume nested transaction not supported 14 throw new NestedTransactionNotSupportedException( 15 "JTA implementation does not support nested transactions", ex); 16 } 17 catch (SystemException ex) { 18 throw new CannotCreateTransactionException("JTA failure on begin", ex); 19 } 20 }
呼叫JtaTransactionManager.doJtaBegin:
1 protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition) 2 throws NotSupportedException, SystemException { 3 4 applyIsolationLevel(txObject, definition.getIsolationLevel()); 5 int timeout = determineTimeout(definition); 6 applyTimeout(txObject, timeout); 7 txObject.getUserTransaction().begin(); 8 }
UserTransactionImp.begin->TransactionManagerImp.begin
1 public void begin ( int timeout ) throws NotSupportedException, 2 SystemException 3 { 4 CompositeTransaction ct = null; 5 ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null; 6 7 ct = compositeTransactionManager.getCompositeTransaction(); 8 if ( ct != null && ct.getProperty ( JTA_PROPERTY_NAME ) == null ) { 9 LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() + 10 " (will be resumed after JTA transaction ends)" ); 11 ct = compositeTransactionManager.suspend(); 12 resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct ); 13 } 14 15 try { 16 ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 ); 17 if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant ); 18 if ( ct.isRoot () && getDefaultSerial () ) 19 ct.getTransactionControl ().setSerial (); 20 ct.setProperty ( JTA_PROPERTY_NAME , "true" ); 21 } catch ( SysException se ) { 22 String msg = "Error in begin()"; 23 LOGGER.logWarning( msg , se ); 24 throw new ExtendedSystemException ( msg , se 25 .getErrors () ); 26 } 27 recreateCompositeTransactionAsJtaTransaction(ct); 28 }
createCompositeTransaction建立混合事務
1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException 2 { 3 Stack errors = new Stack(); 4 CompositeTransaction ct = null , ret = null; 5 // 獲取當前執行緒繫結的事務 6 ct = getCurrentTx ();
// 當前執行緒不存在事務 7 if ( ct == null ) {
// 建立組合事務 8 ret = service_.createCompositeTransaction ( timeout ); 9 if(LOGGER.isInfoEnabled()){ 10 LOGGER.logInfo("createCompositeTransaction ( " + timeout + " ): " 11 + "created new ROOT transaction with id " + ret.getTid ()); 12 }
// 當前執行緒存在事務 13 } else { 14 if(LOGGER.isInfoEnabled()) LOGGER.logInfo("createCompositeTransaction ( " + timeout + " )");
// 建立子事務 15 ret = ct.getTransactionControl ().createSubTransaction (); 16 17 } 18 Thread thread = Thread.currentThread ();
// 綁定當前執行緒和事務的2個對映map 19 setThreadMappings ( ret, thread ); 20 21 return ret; 22 }
如果當前執行緒不存在事務,建立組合事務。如果當前執行緒存在事務,建立子事務。
呼叫TransactionServiceImp的createCompositeTransaction建立混合事務
1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException 2 { 3 if ( !initialized_ ) throw new IllegalStateException ( "Not initialized" ); 4 5 if ( maxNumberOfActiveTransactions_ >= 0 && 6 tidToTransactionMap_.size () >= maxNumberOfActiveTransactions_ ) { 7 throw new IllegalStateException ( "Max number of active transactions reached:" + maxNumberOfActiveTransactions_ ); 8 } 9 10 String tid = tidmgr_.get (); 11 Stack lineage = new Stack (); 12 //建立協調者 15 CoordinatorImp cc = createCC ( null, tid, true, false, timeout );
// 建立組合事務 16 CompositeTransaction ct = createCT ( tid, cc, lineage, false ); 17 return ct; 18 }
3.2 commit 提交事務
事務提交流程圖如下:
AbstractPlatformTransactionManager的commit原始碼如下:
1 @Override 2 public final void commit(TransactionStatus status) throws TransactionException { 3 if (status.isCompleted()) {// 如果事務已完結,報錯無法再次提交 4 throw new IllegalTransactionStateException( 5 "Transaction is already completed - do not call commit or rollback more than once per transaction"); 6 } 7 8 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; 9 if (defStatus.isLocalRollbackOnly()) {// 如果事務明確標記為回滾, 10 if (defStatus.isDebug()) { 11 logger.debug("Transactional code has requested rollback"); 12 } 13 processRollback(defStatus);//執行回滾 14 return; 15 }//如果不需要全域性回滾時提交 且 全域性回滾 16 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { 17 if (defStatus.isDebug()) { 18 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); 19 }//執行回滾 20 processRollback(defStatus); 21 // 僅在最外層事務邊界(新事務)或顯式地請求時丟擲“未期望的回滾異常” 23 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { 24 throw new UnexpectedRollbackException( 25 "Transaction rolled back because it has been marked as rollback-only"); 26 } 27 return; 28 } 29 // 執行提交事務 30 processCommit(defStatus); 31 }
如上圖,各種判斷:
- 1.如果事務明確標記為本地回滾,-》執行回滾
- 2.如果不需要全域性回滾時提交 且 全域性回滾-》執行回滾
- 3.提交事務,核心方法processCommit()
processCommit如下:
1 private void processCommit(DefaultTransactionStatus status) throws TransactionException { 2 try { 3 boolean beforeCompletionInvoked = false; 4 try {//3個前置操作 5 prepareForCommit(status); 6 triggerBeforeCommit(status); 7 triggerBeforeCompletion(status); 8 beforeCompletionInvoked = true;//3個前置操作已呼叫 9 boolean globalRollbackOnly = false;//新事務 或 全域性回滾失敗 10 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { 11 globalRollbackOnly = status.isGlobalRollbackOnly(); 12 }//1.有儲存點,即巢狀事務 13 if (status.hasSavepoint()) { 14 if (status.isDebug()) { 15 logger.debug("Releasing transaction savepoint"); 16 }//釋放儲存點 17 status.releaseHeldSavepoint(); 18 }//2.新事務 19 else if (status.isNewTransaction()) { 20 if (status.isDebug()) { 21 logger.debug("Initiating transaction commit"); 22 }//呼叫事務處理器提交事務 23 doCommit(status); 24 } 25 // 3.非新事務,且全域性回滾失敗,但是提交時沒有得到異常,丟擲異常 27 if (globalRollbackOnly) { 28 throw new UnexpectedRollbackException( 29 "Transaction silently rolled back because it has been marked as rollback-only"); 30 } 31 } 32 catch (UnexpectedRollbackException ex) { 33 // 觸發完成後事務同步,狀態為回滾 34 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); 35 throw ex; 36 }// 事務異常 37 catch (TransactionException ex) { 38 // 提交失敗回滾 39 if (isRollbackOnCommitFailure()) { 40 doRollbackOnCommitException(status, ex); 41 }// 觸發完成後回撥,事務同步狀態為未知 42 else { 43 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); 44 } 45 throw ex; 46 }// 執行時異常 47 catch (RuntimeException ex) {
// 如果3個前置步驟未完成,呼叫前置的最後一步操作 48 if (!beforeCompletionInvoked) { 49 triggerBeforeCompletion(status); 50 }// 提交異常回滾 51 doRollbackOnCommitException(status, ex); 52 throw ex; 53 }// 其它異常 54 catch (Error err) {
// 如果3個前置步驟未完成,呼叫前置的最後一步操作 55 if (!beforeCompletionInvoked) { 56 triggerBeforeCompletion(status); 57 }// 提交異常回滾 58 doRollbackOnCommitException(status, err); 59 throw err; 60 } 61 62 // Trigger afterCommit callbacks, with an exception thrown there 63 // propagated to callers but the transaction still considered as committed. 64 try { 65 triggerAfterCommit(status); 66 } 67 finally { 68 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); 69 } 70 71 } 72 finally { 73 cleanupAfterCompletion(status); 74 } 75 }
如上圖,commit事務時,有6個核心操作,分別是3個前置操作,3個後置操作,如下:
1.prepareForCommit(status);原始碼是空的,沒有拓展目前。
2.triggerBeforeCommit(status); 提交前觸發操作
1 protected final void triggerBeforeCommit(DefaultTransactionStatus status) { 2 if (status.isNewSynchronization()) { 3 if (status.isDebug()) { 4 logger.trace("Triggering beforeCommit synchronization"); 5 } 6 TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly()); 7 } 8 }
triggerBeforeCommit原始碼如下:
1 public static void triggerBeforeCommit(boolean readOnly) { 2 for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) { 3 synchronization.beforeCommit(readOnly); 4 } 5 }
如上圖,TransactionSynchronizationManager類定義了多個ThreadLocal(執行緒本地變數),其中一個用以儲存當前執行緒的事務同步:
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
遍歷事務同步器,把每個事務同步器都執行“提交前”操作,比如咱們用的jdbc事務,那麼最終就是SqlSessionUtils.beforeCommit()->this.holder.getSqlSession().commit();提交會話。
3.triggerBeforeCompletion(status);完成前觸發操作,如果是jdbc事務,那麼最終就是
SqlSessionUtils.beforeCompletion->
TransactionSynchronizationManager.unbindResource(sessionFactory); 解綁當前執行緒的會話工廠
this.holder.getSqlSession().close();關閉會話。
4.triggerAfterCommit(status);提交事務後觸發操作。TransactionSynchronizationUtils.triggerAfterCommit();->TransactionSynchronizationUtils.invokeAfterCommit,如下:
1 public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) { 2 if (synchronizations != null) { 3 for (TransactionSynchronization synchronization : synchronizations) { 4 synchronization.afterCommit(); 5 } 6 } 7 }
好吧,一頓找,最後在TransactionSynchronizationAdapter中複寫過,並且是空的....SqlSessionSynchronization繼承了TransactionSynchronizationAdapter但是沒有複寫這個方法。
5.triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
TransactionSynchronizationUtils.TransactionSynchronizationUtils.invokeAfterCompletion,如下:
1 public static void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) { 2 if (synchronizations != null) { 3 for (TransactionSynchronization synchronization : synchronizations) { 4 try { 5 synchronization.afterCompletion(completionStatus); 6 } 7 catch (Throwable tsex) { 8 logger.error("TransactionSynchronization.afterCompletion threw exception", tsex); 9 } 10 } 11 } 12 }
afterCompletion:對於JDBC事務來說,最終:
1)如果會話任然活著,關閉會話,
2)重置各種屬性:SQL會話同步器(SqlSessionSynchronization)的SQL會話持有者(SqlSessionHolder)的referenceCount引用計數、synchronizedWithTransaction同步事務、rollbackOnly只回滾、deadline超時時間點。
6.cleanupAfterCompletion(status);
1)設定事務狀態為已完成。
2) 如果是新的事務同步,解綁當前執行緒繫結的資料庫資源,重置資料庫連線
3)如果存在掛起的事務(巢狀事務),喚醒掛起的老事務的各種資源:資料庫資源、同步器。
1 private void cleanupAfterCompletion(DefaultTransactionStatus status) { 2 status.setCompleted();//設定事務狀態完成
//如果是新的同步,清空當前執行緒繫結的除了資源外的全部執行緒本地變數:包括事務同步器、事務名稱、只讀屬性、隔離級別、真實的事務啟用狀態 3 if (status.isNewSynchronization()) { 4 TransactionSynchronizationManager.clear(); 5 }//如果是新的事務同步 6 if (status.isNewTransaction()) { 7 doCleanupAfterCompletion(status.getTransaction()); 8 }//如果存在掛起的資源 9 if (status.getSuspendedResources() != null) { 10 if (status.isDebug()) { 11 logger.debug("Resuming suspended transaction after completion of inner transaction"); 12 }//喚醒掛起的事務和資源(重新繫結之前掛起的資料庫資源,喚醒同步器,註冊同步器到TransactionSynchronizationManager) 13 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); 14 } 15 }
對於DataSourceTransactionManager,doCleanupAfterCompletion原始碼如下:
1 protected void doCleanupAfterCompletion(Object transaction) { 2 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; 3 4 // 如果是最新的連線持有者,解綁當前執行緒繫結的<資料庫資源,ConnectionHolder> 5 if (txObject.isNewConnectionHolder()) { 6 TransactionSynchronizationManager.unbindResource(this.dataSource); 7 } 8 9 // 重置資料庫連線(隔離級別、只讀) 10 Connection con = txObject.getConnectionHolder().getConnection(); 11 try { 12 if (txObject.isMustRestoreAutoCommit()) { 13 con.setAutoCommit(true); 14 } 15 DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); 16 } 17 catch (Throwable ex) { 18 logger.debug("Could not reset JDBC Connection after transaction", ex); 19 } 20 21 if (txObject.isNewConnectionHolder()) { 22 if (logger.isDebugEnabled()) { 23 logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); 24 }// 資源引用計數-1,關閉資料庫連線 25 DataSourceUtils.releaseConnection(con, this.dataSource); 26 } 27 // 重置連線持有者的全部屬性 28 txObject.getConnectionHolder().clear(); 29 }
上面這6個方法是AbstractPlatformTransactionManager做的事,本地事務和分散式事務都會執行。
doCommit就是呼叫事務管理器來實現事務提交。分散式事務環境下呼叫的是:JtaTransactionManager.doCommit()。
3.2.1 JtaTransactionManager.doCommit
1 @Override 2 protected void doCommit(DefaultTransactionStatus status) { 3 JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction(); 4 try { 5 int jtaStatus = txObject.getUserTransaction().getStatus(); 6 if (jtaStatus == Status.STATUS_NO_TRANSACTION) { 7 // 事務狀態=已完結,非事務狀態。一般不會觸發 10 throw new UnexpectedRollbackException("JTA transaction already completed - probably rolled back"); 11 } 12 if (jtaStatus == Status.STATUS_ROLLEDBACK) { 13 // 回滾 16 try { 17 txObject.getUserTransaction().rollback(); 18 } 19 catch (IllegalStateException ex) { 20 if (logger.isDebugEnabled()) { 21 logger.debug("Rollback failure with transaction already marked as rolled back: " + ex); 22 } 23 } 24 throw new UnexpectedRollbackException("JTA transaction already rolled back (probably due to a timeout)"); 25 }//核心操作:提交事務 26 txObject.getUserTransaction().commit(); 27 } 28 catch (RollbackException ex) { 29 throw new UnexpectedRollbackException( 30 "JTA transaction unexpectedly rolled back (maybe due to a timeout)", ex); 31 } 32 catch (HeuristicMixedException ex) { 33 throw new HeuristicCompletionException(HeuristicCompletionException.STATE_MIXED, ex); 34 } 35 catch (HeuristicRollbackException ex) { 36 throw new HeuristicCompletionException(HeuristicCompletionException.STATE_ROLLED_BACK, ex); 37 } 38 catch (IllegalStateException ex) { 39 throw new TransactionSystemException("Unexpected internal transaction state", ex); 40 } 41 catch (SystemException ex) { 42 throw new TransactionSystemException("JTA failure on commit", ex); 43 } 44 }
txObject.getUserTransaction().commit()-->呼叫UserTransactionImp的commit()
1 public void commit () throws javax.transaction.RollbackException, 2 javax.transaction.HeuristicMixedException, 3 javax.transaction.HeuristicRollbackException, 4 javax.transaction.SystemException, java.lang.IllegalStateException, 5 java.lang.SecurityException 6 { 7 checkSetup (); 8 txmgr_.commit (); 9 }
TransactionManager的commit
1 public void commit () throws javax.transaction.RollbackException, 2 javax.transaction.HeuristicMixedException, 3 javax.transaction.HeuristicRollbackException, 4 javax.transaction.SystemException, java.lang.IllegalStateException, 5 java.lang.SecurityException 6 { 7 Transaction tx = getTransaction(); 8 if ( tx == null ) raiseNoTransaction(); 9 tx.commit(); 10 }
最終呼叫的是TransactionImp的commit()
1 public void commit() throws javax.transaction.RollbackException, 2 javax.transaction.HeuristicMixedException, 3 javax.transaction.HeuristicRollbackException, 4 javax.transaction.SystemException, java.lang.SecurityException 5 { 6 try { 7 ct_.commit(); 8 } catch ( HeurHazardException hh ) { 9 rethrowAsJtaHeuristicMixedException ( hh.getMessage () , hh ); 10 } catch ( HeurRollbackException hr ) { 11 rethrowAsJtaHeuristicRollbackException ( hr.getMessage () , hr ); 12 } catch ( HeurMixedException hm ) { 13 rethrowAsJtaHeuristicMixedException ( hm.getMessage () , hm ); 14 } catch ( SysException se ) { 15 LOGGER.logWarning ( se.getMessage() , se ); 16 throw new ExtendedSystemException ( se.getMessage (), se 17 .getErrors () ); 18 } catch ( com.atomikos.icatch.RollbackException rb ) { 19 //see case 29708: all statements have been closed 20 String msg = rb.getMessage (); 21 Throwable cause = rb.getCause(); 22 if (cause == null) cause = rb; 23 rethrowAsJtaRollbackException (msg , cause); 24 } 25 }
這裡就是呼叫CompositeTransaction介面的實現類CompositeTransactionImp的commit()
1 public void commit () throws HeurRollbackException, HeurMixedException, 2 HeurHazardException, SysException, SecurityException, 3 RollbackException 4 { 5 getTerminator().commit(); 6 }
呼叫“組合命令列實現類”CompositeTerminatorImp的commit()
1 public void commit () throws HeurRollbackException, HeurMixedException, 2 HeurHazardException, SysException, java.lang.SecurityException, 3 RollbackException 4 { 5 Stack errors = new Stack (); 6 // 1.標記事務狀態 7 transaction_.doCommit (); 8 setSiblingInfoForIncoming1pcRequestFromRemoteClient(); 9 10 if ( transaction_.isRoot () ) { 11 try {//2.提交事務,核心操作 12 coordinator_.terminate ( true ); 13 } 14 15 catch ( RollbackException rb ) { 16 throw rb; 17 } catch ( HeurHazardException hh ) { 18 throw hh; 19 } catch ( HeurRollbackException hr ) { 20 throw hr; 21 } catch ( HeurMixedException hm ) { 22 throw hm; 23 } catch ( SysException se ) { 24 throw se; 25 } catch ( Exception e ) { 26 errors.push ( e ); 27 throw new SysException ( 28 "Unexpected error: " + e.getMessage (), errors ); 29 } 30 } 31 32 }
如上,1.呼叫“組合事務實現類”CompositeTransactionImp的doCommit()這裡只做標記為非活動,沒有提交事務。
2.呼叫CoordinatorImp的terminate()終結事務。
1 protected void terminate ( boolean commit ) throws HeurRollbackException, 2 HeurMixedException, SysException, java.lang.SecurityException, 3 HeurCommitException, HeurHazardException, RollbackException, 4 IllegalStateException 5 6 { 7 synchronized ( fsm_ ) { 8 if ( commit ) {// 如果只有一個參與者,直接一階段提交 9 if ( participants_.size () <= 1 ) { 10 commit ( true ); 11 } else {//二階段提交:prepare階段 12 int prepareResult = prepare (); 13 // 二階段提交:commit階段。非只讀事務,才需要提交事務, 14 if ( prepareResult != Participant.READ_ONLY ) 15 commit ( false ); 16 } 17 } else { 18 rollback (); 19 } 20 } 21 }
如上圖是原始碼的優化精華:
1.根據參與者判斷,如果只有一個參與者,直接優化成一階段提交。
2.prepare完後,commit階段如果是隻讀事務,不用commit。
咱們是分散式事務,插入2個庫,肯定是兩階段提交。
CoordinatorImp的commit
1 public HeuristicMessage[] commit ( boolean onePhase ) 2 throws HeurRollbackException, HeurMixedException, 3 HeurHazardException, java.lang.IllegalStateException, 4 RollbackException, SysException 5 { 6 HeuristicMessage[] ret = null; 7 synchronized ( fsm_ ) { 8 ret = stateHandler_.commit(onePhase); 9 } 10 return ret; 11 }
追蹤到IndoubtStateHandler的commit,這個操作加了同步鎖。具體實現如下:
1 protected HeuristicMessage[] commit ( boolean onePhase ) 2 throws HeurRollbackException, HeurMixedException, 3 HeurHazardException, java.lang.IllegalStateException, 4 RollbackException, SysException 5 { 6 7 return commitWithAfterCompletionNotification ( new CommitCallback() { 8 public HeuristicMessage[] doCommit() 9 throws HeurRollbackException, HeurMixedException, 10 HeurHazardException, IllegalStateException, 11 RollbackException, SysException { 12 return commitFromWithinCallback ( false, false ); 13 } 14 }); 15 16 }
如上,核心介面就是CommitCallback的doCommit方法,方法體就是commitFromWithinCallback。
1 protected HeuristicMessage[] commitFromWithinCallback ( boolean heuristic , 2 boolean onePhase ) throws HeurRollbackException, 3 HeurMixedException, HeurHazardException, 4 java.lang.IllegalStateException, RollbackException, SysException 5 { 6 Stack<Exception> errors = new Stack<Exception> (); 7 CoordinatorStateHandler nextStateHandler = null; 8 9 try { 10 11 Vector<Participant> participants = coordinator_.getParticipants(); 12 int count = (participants.size () - readOnlyTable_.size ()); 13 TerminationResult commitresult = new TerminationResult ( count ); 14 15 // cf bug 64546: avoid committed_ being null upon recovery! 16 committed_ = new Boolean ( true ); 17 // for replaying completion: commit decision was reached 18 // otherwise, replay requests might only see TERMINATED! 19 20 try { 21 coordinator_.setState ( TxState.COMMITTING ); 22 } catch ( RuntimeException error ) { 23 //See case 23334 24 String msg = "Error in committing: " + error.getMessage() + " - rolling back instead"; 25 LOGGER.logWarning ( msg , error ); 26 try { 27 rollbackFromWithinCallback(getCoordinator().isRecoverableWhileActive().booleanValue(),false); 28 throw new RollbackException ( msg , error ); 29 } catch ( HeurCommitException e ) { 30 LOGGER.logWarning ( "Illegal heuristic commit during rollback:" + e ); 31 throw new HeurMixedException ( e.getHeuristicMessages() ); 32 } 33 } 34 35 36 // start messages 37 Enumeration<Participant> enumm = participants.elements (); 38 while ( enumm.hasMoreElements () ) { 39 Participant p = enumm.nextElement (); 40 if ( !readOnlyTable_.containsKey ( p ) ) { 41 CommitMessage cm = new CommitMessage ( p, commitresult, 42 onePhase ); 43 44 // if onephase: set cascadelist anyway, because if the 45 // participant is a REMOTE one, then it might have 46 // multiple participants that are not visible here! 47 48 if ( onePhase && cascadeList_ != null ) { // null for OTS 49 Integer sibnum = (Integer) cascadeList_.get ( p ); 50 if ( sibnum != null ) // null for local participant! 51 p.setGlobalSiblingCount ( sibnum.intValue () ); 52 p.setCascadeList ( cascadeList_ ); 53 } 54 propagator_.submitPropagationMessage ( cm ); 55 } 56 } // while 57 58 commitresult.waitForReplies (); 59 int res = commitresult.getResult (); 60 61 if ( res != TerminationResult.ALL_OK ) { 62 63 if ( res == TerminationResult.HEUR_MIXED ) { 64 Hashtable<Participant,TxState> hazards = commitresult.getPossiblyIndoubts (); 65 Hashtable heuristics = commitresult 66 .getHeuristicParticipants (); 67 addToHeuristicMap ( heuristics ); 68 enumm = participants.elements (); 69 while ( enumm.hasMoreElements () ) { 70 Participant p = (Participant) enumm.nextElement (); 71 if ( !heuristics.containsKey ( p ) ) 72 addToHeuristicMap ( p, TxState.TERMINATED ); 73 } 74 nextStateHandler = new HeurMixedStateHandler ( this, 75 hazards ); 76 77 coordinator_.setStateHandler ( nextStateHandler ); 78 throw new HeurMixedException ( getHeuristicMessages () ); 79 } 80 81 else if ( res == TerminationResult.ROLLBACK ) { 82 // 1PC and rolled back before commit arrived. 83 nextStateHandler = new TerminatedStateHandler ( this ); 84 coordinator_.setStateHandler ( nextStateHandler ); 85 throw new RollbackException ( "Rolled back already." ); 86 } else if ( res == TerminationResult.HEUR_ROLLBACK ) { 87 nextStateHandler = new HeurAbortedStateHandler ( this ); 88 coordinator_.setStateHandler ( nextStateHandler ); 89 // Here, we do NOT need to add extra information, since ALL 90 // participants agreed to rollback. 91 // Therefore, we need not worry about who aborted and who committed. 92 throw new HeurRollbackException ( getHeuristicMessages () ); 93 94 } 95 96 else if ( res == TerminationResult.HEUR_HAZARD ) { 97 Hashtable hazards = commitresult.getPossiblyIndoubts (); 98 Hashtable heuristics = commitresult 99 .getHeuristicParticipants (); 100 addToHeuristicMap ( heuristics ); 101 enumm = participants.elements (); 102 while ( enumm.hasMoreElements () ) { 103 Participant p = (Participant) enumm.nextElement (); 104 if ( !heuristics.containsKey ( p ) ) 105 addToHeuristicMap ( p, TxState.TERMINATED ); 106 } 107 nextStateHandler = new HeurHazardStateHandler ( this, 108 hazards ); 109 coordinator_.setStateHandler ( nextStateHandler ); 110 throw new HeurHazardException ( getHeuristicMessages () ); 111 } 112 113 } else { 114 // all OK 115 if ( heuristic ) { 116 nextStateHandler = new HeurCommittedStateHandler ( this ); 117 // again, here we do NOT need to preserve extra per-participant 118 // state mappings, since ALL participants were heur. committed. 119 } else 120 nextStateHandler = new TerminatedStateHandler ( this ); 121 122 coordinator_.setStateHandler ( nextStateHandler ); 123 } 124 } catch ( RuntimeException runerr ) { 125 errors.push ( runerr ); 126 throw new SysException ( "Error in commit: " + runerr.getMessage (), errors ); 127 } 128 129 catch ( InterruptedException intr ) { 130 // cf bug 67457 131 InterruptedExceptionHelper.handleInterruptedException ( intr ); 132 errors.push ( intr ); 133 throw new SysException ( "Error in commit" + intr.getMessage (), errors ); 134 } 135 136 return getHeuristicMessages (); 137 138 }
如上,構造了一個CommitMessage,呼叫傳播者Propagator的submitPropagationMessage()提交傳播訊息。-》CommitMessage的send()方法-》Participant的commit().
-》XAResourceTransaction的commit提交XA資源-》XAResource的commit( xid_, onePhase );饒了一大圈終於到了最最核心的程式碼了....我們這裡XAResource介面的實現類是MysqlXAConnection。
1 public void commit(Xid xid, boolean onePhase) throws XAException { 2 StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH); 3 commandBuf.append("XA COMMIT "); 4 appendXid(commandBuf, xid); 5 6 if (onePhase) { 7 commandBuf.append(" ONE PHASE"); 8 } 9 10 try { 11 dispatchCommand(commandBuf.toString()); 12 } finally { 13 this.underlyingConnection.setInGlobalTx(false); 14 } 15 }
dispatchCommand排程命令如下:
1 private ResultSet dispatchCommand(String command) throws XAException { 2 Statement stmt = null; 3 4 try { 5 if (this.logXaCommands) { 6 this.log.logDebug("Executing XA statement: " + command); 7 } 8 9 // TODO: Cache this for lifetime of XAConnection這裡還規劃要做快取 - -! 10 stmt = this.underlyingConnection.createStatement(); 11 // 核心執行事務提交 12 stmt.execute(command); 13 14 ResultSet rs = stmt.getResultSet(); 15 16 return rs; 17 } catch (SQLException sqlEx) { 18 throw mapXAExceptionFromSQLException(sqlEx); 19 } finally { 20 if (stmt != null) { 21 try { 22 stmt.close(); 23 } catch (SQLException sqlEx) { 24 } 25 } 26 } 27 }
如上就是一個經典的使用jdbc執行sql語句的過程:
1.使用com.mysql.jdbc.Connection建立Statement。
2.Statement執行sql命令.
3.得到結果。
3.3 rollback回滾事務
AbstractPlatformTransactionManager中rollback原始碼如下:
1 public final void rollback(TransactionStatus status) throws TransactionException { 2 if (status.isCompleted()) { 3 throw new IllegalTransactionStateException( 4 "Transaction is already completed - do not call commit or rollback more than once per transaction"); 5 } 6 7 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; 8 processRollback(defStatus); 9 }
processRollback原始碼如下:
1 private void processRollback(DefaultTransactionStatus status) { 2 try { 3 try {// 解綁當前執行緒繫結的會話工廠,並關閉會話 4 triggerBeforeCompletion(status); 5 if (status.hasSavepoint()) {// 1.如果有儲存點,即巢狀式事務 6 if (status.isDebug()) { 7 logger.debug("Rolling back transaction to savepoint"); 8 }//回滾到儲存點 9 status.rollbackToHeldSavepoint(); 10 }//2.如果就是一個簡單事務 11 else if (status.isNewTransaction()) { 12 if (status.isDebug()) { 13 logger.debug("Initiating transaction rollback"); 14 }//回滾核心方法 15 doRollback(status); 16 }//3.當前存在事務且沒有儲存點,即加入當前事務的 17 else if (status.hasTransaction()) {//如果已經標記為回滾 或 當加入事務失敗時全域性回滾(預設true) 18 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { 19 if (status.isDebug()) {//debug時會列印:加入事務失敗-標記已存在事務為回滾 20 logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); 21 }//設定當前connectionHolder:當加入一個已存在事務時回滾 22 doSetRollbackOnly(status); 23 } 24 else { 25 if (status.isDebug()) { 26 logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); 27 } 28 } 29 } 30 else { 31 logger.debug("Should roll back transaction but cannot - no transaction available"); 32 } 33 } 34 catch (RuntimeException ex) {//關閉會話,重置SqlSessionHolder屬性 35 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); 36 throw ex; 37 } 38 catch (Error err) { 39 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); 40 throw err; 41 } 42 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); 43 } 44 finally {、、解綁當前執行緒 45 cleanupAfterCompletion(status); 46 } 47 }
核心操作是doRollback,分散式環境下呼叫的是JtaTransactionManager的doRollback。
1 @Override 2 protected void doRollback(DefaultTransactionStatus status) { 3 JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction(); 4 try { 5 int jtaStatus = txObject.getUserTransaction().getStatus(); 6 if (jtaStatus != Status.STATUS_NO_TRANSACTION) { 7 try { 8 txObject.getUserTransaction().rollback(); 9 } 10 catch (IllegalStateException ex) { 11 if (jtaStatus == Status.STATUS_ROLLEDBACK) { 12 // Only really happens on JBoss 4.2 in case of an early timeout... 13 if (logger.isDebugEnabled()) { 14 logger.debug("Rollback failure with transaction already marked as rolled back: " + ex); 15 } 16 } 17 else { 18 throw new TransactionSystemException("Unexpected internal transaction state", ex); 19 } 20 } 21 } 22 } 23 catch (SystemException ex) { 24 throw new TransactionSystemException("JTA failure on rollback", ex); 25 } 26 }
呼叫的UserTransactionImp的rollback
1 public void rollback () throws IllegalStateException, SystemException, 2 SecurityException 3 { 4 checkSetup (); 5 txmgr_.rollback (); 6 }
TransactionManagerImp的rollback
1 public void rollback () throws IllegalStateException, SystemException, 2 SecurityException 3 { 4 Transaction tx = getTransaction(); 5 if ( tx == null ) raiseNoTransaction(); 6 tx.rollback(); 7 8 }
TransactionImp的rollback
1 public void rollback() throws IllegalStateException, SystemException 2 { 3 try { 4 ct_.rollback(); 5 } catch ( SysException se ) { 6 LOGGER.logWarning ( se.getMessage() , se ); 7 throw new ExtendedSystemException ( se.getMessage (), se 8 .getErrors () ); 9 } 10 11 }
CompositeTransactionImp的rollback
1 public void rollback () throws IllegalStateException, SysException 2 { 3 getTerminator().rollback(); 4 }
CompositeTerminatorImp的rollback
1 public void rollback () throws IllegalStateException, SysException 2 { 3 Stack errors = new Stack (); 4 5 transaction_.doRollback (); 6 7 if ( transaction_.isRoot () ) 8 try { 9 coordinator_.terminate ( false ); 10 } catch ( Exception e ) { 11 errors.push ( e ); 12 throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), errors ); 13 } 14 }
一直追蹤到
RollbackMessage.send
1 protected Object send () throws PropagationException 2 { 3 Participant part = getParticipant (); 4 HeuristicMessage[] msgs = null; 5 try { 6 msgs = part.rollback (); 7 8 } catch ( HeurCommitException heurc ) { 9 throw new PropagationException ( heurc, false ); 10 } catch ( HeurMixedException heurm ) { 11 throw new PropagationException ( heurm, false ); 12 } 13 14 catch ( Exception e ) { 15 // only retry if might be indoubt. Otherwise ignore. 16 if ( indoubt_ ) { 17 // here, participant might be indoubt! 18 // fill in exact heuristic msgs by using buffered effect of proxies 19 HeurHazardException heurh = new HeurHazardException ( part.getHeuristicMessages () ); 20 throw new PropagationException ( heurh, true ); 21 } 22 } 23 return msgs; 24 }
XAResourceTransaction.rollback->MysqlXAConnection.rollback
1 public void rollback(Xid xid) throws XAException { 2 StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH); 3 commandBuf.append("XA ROLLBACK "); 4 appendXid(commandBuf, xid); 5 6 try { 7 dispatchCommand(commandBuf.toString()); 8 } finally { 9 this.underlyingConnection.setInGlobalTx(false); 10 } 11 }
dispatchCommand執行sql
1 private ResultSet dispatchCommand(String command) throws XAException { 2 Statement stmt = null; 3 4 try { 5 if (this.logXaCommands) { 6 this.log.logDebug("Executing XA statement: " + command); 7 } 8 9 // TODO: Cache this for lifetime of XAConnection 10 stmt = this.underlyingConnection.createStatement(); 11 12 stmt.execute(command); 13 14 ResultSet rs = stmt.getResultSet(); 15 16 return rs; 17 } catch (SQLException sqlEx) { 18 throw mapXAExceptionFromSQLException(sqlEx); 19 } finally { 20 if (stmt != null) { 21 try { 22 stmt.close(); 23 } catch (SQLException sqlEx) { 24 } 25 } 26 } 27 }
debug得到command:XA ROLLBACK 0x3139322e3136382e36302e31312e746d30303030313030303437,0x3139322e3136382e36302e31312e746d31,0x41544f4d
至此,回滾完畢。