spring5 原始碼深度解析----- 事務增強器(100%理解事務)
上一篇文章我們講解了事務的Advisor是如何註冊進Spring容器的,也講解了Spring是如何將有配置事務的類配置上事務的,實際上也就是用了AOP那一套,也講解了Advisor,pointcut驗證流程,至此,事務的初始化工作都已經完成了,在之後的呼叫過程,如果代理類的方法被呼叫,都會呼叫BeanFactoryTransactionAttributeSourceAdvisor這個Advisor的增強方法,也就是我們還未提到的那個Advisor裡面的advise,還記得嗎,在自定義標籤的時候我們將TransactionInterceptor這個Advice作為bean註冊進IOC容器,並且將其注入進Advisor中,這個Advice在代理類的invoke方法中會被封裝到攔截器鏈中,最終事務的功能都在advise中體現,所以我們先來關注一下TransactionInterceptor這個類吧。
TransactionInterceptor類繼承自MethodInterceptor,所以呼叫該類是從其invoke方法開始的,首先預覽下這個方法:
@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
重點來了,進入invokeWithinTransaction方法:
@Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 獲取對應事務屬性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 獲取beanFactory中的transactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 構造方法唯一標識(類.方法,如:service.UserServiceImpl.save) final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 宣告式事務處理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 建立TransactionInfo TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 執行原方法 // 繼續呼叫方法攔截器鏈,這裡一般將會呼叫目標類的方法,如:AccountServiceImpl.save方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 異常回滾 completeTransactionAfterThrowing(txInfo, ex); // 手動向上丟擲異常,則下面的提交事務不會執行 // 如果子事務出異常,則外層事務程式碼需catch住子事務程式碼,不然外層事務也會回滾 throw ex; } finally { // 消除資訊 cleanupTransactionInfo(txInfo); } // 提交事務 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); try { // 程式設計式事務處理 Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } }
建立事務Info物件
我們先分析事務建立的過程。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. // 如果沒有名稱指定則使用方法唯一標識,並使用DelegatingTransactionAttribute封裝txAttr if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 獲取TransactionStatus status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 根據指定的屬性與status準備一個TransactionInfo return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
對於createTransactionlfNecessary函式主要做了這樣幾件事情。
(1)使用 DelegatingTransactionAttribute 封裝傳入的 TransactionAttribute 例項。
對於傳入的TransactionAttribute型別的引數txAttr,當前的實際型別是RuleBasedTransactionAttribute,是由獲取事務屬性時生成,主要用於資料承載,而這裡之所以使用DelegatingTransactionAttribute進行封裝,當然是提供了更多的功能。
(2)獲取事務。
事務處理當然是以事務為核心,那麼獲取事務就是最重要的事情。
(3)構建事務資訊。
根據之前幾個步驟獲取的資訊構建Transactionlnfo並返回。
獲取事務
其中核心是在getTransaction方法中:
@Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 獲取一個transaction Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { definition = new DefaultTransactionDefinition(); } // 如果在這之前已經存在事務了,就進入存在事務的方法中 if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } // 事務超時設定驗證 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 走到這裡說明此時沒有存在事務,如果傳播特性是MANDATORY時丟擲異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 如果此時不存在事務,當傳播特性是REQUIRED或NEW或NESTED都會進入if語句塊 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事務 // 因為此時不存在事務,將null掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // new一個status,存放剛剛建立的transaction,然後將其標記為新事務! // 這裡transaction後面一個引數決定是否是新事務! DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 新開一個連線的地方,非常重要 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } // 其他的傳播特性一律返回一個空事務,transaction = null //當前不存在事務,且傳播機制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,這三種情況,建立“空”事務 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
先來看看transaction是如何被創建出來的:
@Override protected Object doGetTransaction() { // 這裡DataSourceTransactionObject是事務管理器的一個內部類 // DataSourceTransactionObject就是一個transaction,這裡new了一個出來 DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 解綁與繫結的作用在此時體現,如果當前執行緒有繫結的話,將會取出holder // 第一次conHolder肯定是null ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); // 此時的holder被標記成一箇舊holder txObject.setConnectionHolder(conHolder, false); return txObject; }
建立transaction過程很簡單,接著就會判斷當前是否存在事務:
@Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); } public boolean hasConnectionHolder() { return (this.connectionHolder != null); }
這裡判斷是否存在事務的依據主要是獲取holder中的transactionActive變數是否為true,如果是第一次進入事務,holder直接為null判斷不存在了,如果是第二次進入事務transactionActive變數是為true的(後面會提到是在哪裡把它變成true的),由此來判斷當前是否已經存在事務了。
至此,原始碼分成了2條處理線
1.當前已存在事務:isExistingTransaction()判斷是否存在事務,存在事務handleExistingTransaction()根據不同傳播機制不同處理
2.當前不存在事務: 不同傳播機制不同處理
當前不存在事務
如果不存在事務,傳播特性又是REQUIRED或NEW或NESTED,將會先掛起null,這個掛起方法我們後面再講,然後建立一個DefaultTransactionStatus ,並將其標記為新事務,然後執行doBegin(transaction, definition);這個方法也是一個關鍵方法
神祕又關鍵的status物件
TransactionStatus介面
public interface TransactionStatus extends SavepointManager, Flushable { // 返回當前事務是否為新事務(否則將參與到現有事務中,或者可能一開始就不在實際事務中執行) boolean isNewTransaction(); // 返回該事務是否在內部攜帶儲存點,也就是說,已經建立為基於儲存點的巢狀事務。 boolean hasSavepoint(); // 設定事務僅回滾。 void setRollbackOnly(); // 返回事務是否已標記為僅回滾 boolean isRollbackOnly(); // 將會話重新整理到資料儲存區 @Override void flush(); // 返回事物是否已經完成,無論提交或者回滾。 boolean isCompleted(); }
再來看看實現類DefaultTransactionStatus
DefaultTransactionStatus
public class DefaultTransactionStatus extends AbstractTransactionStatus { //事務物件 @Nullable private final Object transaction; //事務物件 private final boolean newTransaction; private final boolean newSynchronization; private final boolean readOnly; private final boolean debug; //事務物件 @Nullable private final Object suspendedResources; public DefaultTransactionStatus( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) { this.transaction = transaction; this.newTransaction = newTransaction; this.newSynchronization = newSynchronization; this.readOnly = readOnly; this.debug = debug; this.suspendedResources = suspendedResources; } //略... }
我們看看這行程式碼 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 這裡是構造一個status物件的方法 protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
實際上就是封裝了事務屬性definition,新建立的transaction,並且將事務狀態屬性設定為新事務,最後一個引數為被掛起的事務。
簡單瞭解一下關鍵引數即可:
第二個引數transaction:事務物件,在一開頭就有建立,其就是事務管理器的一個內部類。
第三個引數newTransaction:布林值,一個標識,用於判斷是否是新的事務,用於提交或者回滾方法中,是新的才會提交或者回滾。
最後一個引數suspendedResources:被掛起的物件資源,掛起操作會返回舊的holder,將其與一些事務屬性一起封裝成一個物件,就是這個suspendedResources這個物件了,它會放在status中,在最後的清理工作方法中判斷status中是否有這個掛起物件,如果有會恢復它
接著我們來看看關鍵程式碼 doBegin(transaction, definition);
1 @Override 2 protected void doBegin(Object transaction, TransactionDefinition definition) { 3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; 4 Connection con = null; 5 6 try { 7 // 判斷如果transaction沒有holder的話,才去從dataSource中獲取一個新連線 8 if (!txObject.hasConnectionHolder() || 9 txObject.getConnectionHolder().isSynchronizedWithTransaction()) { 10 //通過dataSource獲取連線 11 Connection newCon = this.dataSource.getConnection(); 12 if (logger.isDebugEnabled()) { 13 logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); 14 } 15 // 所以,只有transaction中的holder為空時,才會設定為新holder 16 // 將獲取的連線封裝進ConnectionHolder,然後封裝進transaction的connectionHolder屬性 17 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); 18 } 19 //設定新的連線為事務同步中 20 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); 21 con = txObject.getConnectionHolder().getConnection(); 22 //conn設定事務隔離級別,只讀 23 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); 24 txObject.setPreviousIsolationLevel(previousIsolationLevel);//DataSourceTransactionObject設定事務隔離級別 25 26 // 如果是自動提交切換到手動提交 27 if (con.getAutoCommit()) { 28 txObject.setMustRestoreAutoCommit(true); 29 if (logger.isDebugEnabled()) { 30 logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); 31 } 32 con.setAutoCommit(false); 33 } 34 // 如果只讀,執行sql設定事務只讀 35 prepareTransactionalConnection(con, definition); 36 // 設定connection持有者的事務開啟狀態 37 txObject.getConnectionHolder().setTransactionActive(true); 38 39 int timeout = determineTimeout(definition); 40 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { 41 // 設定超時秒數 42 txObject.getConnectionHolder().setTimeoutInSeconds(timeout); 43 } 44 45 // 將當前獲取到的連線繫結到當前執行緒 46 if (txObject.isNewConnectionHolder()) { 47 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); 48 } 49 }catch (Throwable ex) { 50 if (txObject.isNewConnectionHolder()) { 51 DataSourceUtils.releaseConnection(con, this.dataSource); 52 txObject.setConnectionHolder(null, false); 53 } 54 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); 55 } 56 }
conn設定事務隔離級別,只讀
@Nullable public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException { Assert.notNull(con, "No Connection specified"); // Set read-only flag. // 設定資料連線的只讀標識 if (definition != null && definition.isReadOnly()) { try { if (logger.isDebugEnabled()) { logger.debug("Setting JDBC Connection [" + con + "] read-only"); } con.setReadOnly(true); } catch (SQLException | RuntimeException ex) { Throwable exToCheck = ex; while (exToCheck != null) { if (exToCheck.getClass().getSimpleName().contains("Timeout")) { // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0 throw ex; } exToCheck = exToCheck.getCause(); } // "read-only not supported" SQLException -> ignore, it's just a hint anyway logger.debug("Could not set JDBC Connection read-only", ex); } } // Apply specific isolation level, if any. // 設定資料庫連線的隔離級別 Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { if (logger.isDebugEnabled()) { logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel()); } int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } } return previousIsolationLevel; }
我們看到都是通過 Connection 去設定
執行緒變數的繫結
我們看 doBegin 方法的47行,將當前獲取到的連線繫結到當前執行緒,繫結與解綁圍繞一個執行緒變數,此變數在TransactionSynchronizationManager
類中:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
這是一個 static final 修飾的 執行緒變數,儲存的是一個Map,我們來看看47行的靜態方法,bindResource
public static void bindResource(Object key, Object value) throws IllegalStateException { // 從上面可知,執行緒變數是一個Map,而這個Key就是dataSource // 這個value就是holder Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); // 獲取這個執行緒變數Map Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<>(); resources.set(map); } // 將新的holder作為value,dataSource作為key放入當前執行緒Map中 Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } Thread.currentThread().getName() + "]"); } // 略... }
擴充知識點
這裡再擴充一點,mybatis中獲取的資料庫連線,就是根據 dataSource 從ThreadLocal中獲取的
以查詢舉例,會呼叫Executor#doQuery方法:
最終會呼叫DataSourceUtils#doGetConnection獲取,真正的資料庫連線,其中TransactionSynchronizationManager中儲存的就是方法呼叫前,spring增強方法中繫結到執行緒的connection,從而保證整個事務過程中connection的一致性
我們看看TransactionSynchronizationManager.getResource(Object key)這個方法
@Nullable public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } @Nullable private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
就是從執行緒變數的Map中根據 DataSource獲取 ConnectionHolder
已經存在的事務
前面已經提到,第一次事務開始時必會新創一個holder然後做繫結操作,此時執行緒變數是有holder的且avtive為true,如果第二個事務進來,去new一個transaction之後去執行緒變數中取holder,holder是不為空的且active是為true的,所以會進入handleExistingTransaction方法:
1 private TransactionStatus handleExistingTransaction( 2 TransactionDefinition definition, Object transaction, boolean debugEnabled) 3 throws TransactionException { 4 // 1.NERVER(不支援當前事務;如果當前事務存在,丟擲異常)報錯 5 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { 6 throw new IllegalTransactionStateException( 7 "Existing transaction found for transaction marked with propagation 'never'"); 8 } 9 // 2.NOT_SUPPORTED(不支援當前事務,現有同步將被掛起)掛起當前事務,返回一個空事務 10 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { 11 if (debugEnabled) { 12 logger.debug("Suspending current transaction"); 13 } 14 // 這裡會將原來的事務掛起,並返回被掛起的物件 15 Object suspendedResources = suspend(transaction); 16 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); 17 // 這裡可以看到,第二個引數transaction傳了一個空事務,第三個引數false為舊標記 18 // 最後一個引數就是將前面掛起的物件封裝進新的Status中,當前事務執行完後,就恢復suspendedResources 19 return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources); 20 } 21 // 3.REQUIRES_NEW掛起當前事務,建立新事務 22 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { 23 if (debugEnabled) { 24 logger.debug("Suspending current transaction, creating new transaction with name [" + 25 definition.getName() + "]"); 26 } 27 // 將原事務掛起,此時新建事務,不與原事務有關係 28 // 會將transaction中的holder設定為null,然後解綁! 29 SuspendedResourcesHolder suspendedResources = suspend(transaction); 30 try { 31 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 32 // new一個status出來,傳入transaction,並且為新事務標記,然後傳入掛起事務 33 DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); 34 // 這裡也做了一次doBegin,此時的transaction中holer是為空的,因為之前的事務被掛起了 35 // 所以這裡會取一次新的連線,並且繫結! 36 doBegin(transaction, definition); 37 prepareSynchronization(status, definition); 38 return status; 39 } 40 catch (RuntimeException beginEx) { 41 resumeAfterBeginException(transaction, suspendedResources, beginEx); 42 throw beginEx; 43 } 44 catch (Error beginErr) { 45 resumeAfterBeginException(transaction, suspendedResources, beginErr); 46 throw beginErr; 47 } 48 } 49 // 如果此時的傳播特性是NESTED,不會掛起事務 50 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { 51 if (!isNestedTransactionAllowed()) { 52 throw new NestedTransactionNotSupportedException( 53 "Transaction manager does not allow nested transactions by default - " + 54 "specify 'nestedTransactionAllowed' property with value 'true'"); 55 } 56 if (debugEnabled) { 57 logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); 58 } 59 // 這裡如果是JTA事務管理器,就不可以用savePoint了,將不會進入此方法 60 if (useSavepointForNestedTransaction()) { 61 // 這裡不會掛起事務,說明NESTED的特性是原事務的子事務而已 62 // new一個status,傳入transaction,傳入舊事務標記,傳入掛起物件=null 63 DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); 64 // 這裡是NESTED特性特殊的地方,在先前存在事務的情況下會建立一個savePoint 65 status.createAndHoldSavepoint(); 66 return status; 67 } 68 else { 69 // JTA事務走這個分支,建立新事務 70 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 71 DefaultTransactionStatus status = newTransactionStatus( 72 definition, transaction, true, newSynchronization, debugEnabled, null); 73 doBegin(transaction, definition); 74 prepareSynchronization(status, definition); 75 return status; 76 } 77 } 78 79 // 到這裡PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,存在事務加入事務即可,標記為舊事務,空掛起 80 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 81 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); 82 }
對於已經存在事務的處理過程中,我們看到了很多熟悉的操作,但是,也有些不同的地方,函式中對已經存在的事務處理考慮兩種情況。
(1)PROPAGATION_REQUIRES_NEW表示當前方法必須在它自己的事務裡執行,一個新的事務將被啟動,而如果有一個事務正在執行的話,則在這個方法執行期間被掛起。而Spring中對於此種傳播方式的處理與新事務建立最大的不同點在於使用suspend方法將原事務掛起。 將資訊掛起的目的當然是為了在當前事務執行完畢後在將原事務還原。
(2)PROPAGATION_NESTED表示如果當前正有一個事務在執行中,則該方法應該執行在一個巢狀的事務中,被巢狀的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存在,行為就像PROPAGATION_REQUIRES_NEW。對於嵌入式事務的處理,Spring中主要考慮了兩種方式的處理。
- Spring中允許嵌入事務的時候,則首選設定儲存點的方式作為異常處理的回滾。
- 對於其他方式,比如JTA無法使用儲存點的方式,那麼處理方式與PROPAGATION_ REQUIRES_NEW相同,而一旦出現異常,則由Spring的事務異常處理機制去完成後續操作。
對於掛起操作的主要目的是記錄原有事務的狀態,以便於後續操作對事務的恢復
小結
到這裡我們可以知道,在當前存在事務的情況下,根據傳播特性去決定是否為新事務,是否掛起當前事務。
NOT_SUPPORTED :會掛起事務,不執行doBegin方法傳空transaction
,標記為舊事務。封裝status
物件:
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources)
REQUIRES_NEW :將會掛起事務且執行doBegin方法,標記為新事務。封裝status
物件:
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
NESTED :不會掛起事務且不會執行doBegin方法,標記為舊事務,但會建立savePoint
。封裝status
物件:
DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
其他事務例如REQUIRED :不會掛起事務,封裝原有的transaction不會執行doBegin方法,標記舊事務,封裝status
物件:
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
掛起
對於掛起操作的主要目的是記錄原有事務的狀態,以便於後續操作對事務的恢復:
@Nullable protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 這裡是真正做掛起的方法,這裡返回的是一個holder suspendedResources = doSuspend(transaction); } // 這裡將名稱、隔離級別等資訊從執行緒變數中取出並設定對應屬性為null到執行緒變數 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); // 將事務各個屬性與掛起的holder一併封裝進SuspendedResourcesHolder物件中 return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
@Override protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 將transaction中的holder屬性設定為空 txObject.setConnectionHolder(null); // ConnnectionHolder從執行緒變數中解綁! return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
我們來看看 unbindResource
private static Object doUnbindResource(Object actualKey) { // 取得當前執行緒的執行緒變數Map Map<Object, Object> map = resources.get(); if (map == null) { return null; } // 將key為dataSourece的value移除出Map,然後將舊的Holder返回 Object value = map.remove(actualKey); // Remove entire ThreadLocal if empty... // 如果此時map為空,直接清除執行緒變數 if (map.isEmpty()) { resources.remove(); } // Transparently suppress a ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { value = null; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]"); } // 將舊Holder返回 return value; }
可以回頭看一下解綁操作的介紹。這裡掛起主要乾了三件事:
- 將transaction中的holder屬性設定為空
- 解綁(會返回執行緒中的那個舊的holder出來,從而封裝到SuspendedResourcesHolder物件中)
- 將SuspendedResourcesHolder放入status中,方便後期子事務完成後,恢復外層事務
&n