原始碼分析 spring事務處理機制
Spring在TransactionDefinition介面中定義這些屬性,以供PlatfromTransactionManager使用, PlatfromTransactionManager是Spring事務管理的核心介面。
介面程式碼如下:
public interface TransactionDefinition { int getPropagationBehavior(); //返回事務的傳播行為。 int getIsolationLevel(); //返回事務的隔離級別,事務管理器根據它來控制另外一個事務可以看到本事務內的哪些資料。 int getTimeout(); //返回事務必須在多少秒內完成。 boolean isReadOnly(); //事務是否只讀,事務管理器能夠根據這個返回值進行優化,確保事務是隻讀的。 }
事務的傳播屬性有:
PROPAGATION_REQUIRED 如果存在一個事務,則支援當前事務。如果沒有事務則開啟一個新的事務。(預設的傳播屬性)
PROPAGATION_REQUIRES_NEW 總是開啟一個新的事務。如果一個事務已經存在,則將這個存在的事務掛起。
PROPAGATION_SUPPORTS 如果存在一個事務,支援當前事務。如果沒有事務,則非事務的執行。
PROPAGATION_MANDATORY 如果已經存在一個事務,支援當前事務。如果沒有一個活動的事務,則丟擲異常。
PROPAGATION_NOT_SUPPORTED 總是非事務地執行,並掛起任何存在的事務。
PROPAGATION_NEVER
PROPAGATION_NESTED如果一個活動的事務存在,則執行在一個巢狀的事務中. 如果沒有活動事務, 則按TransactionDefinition.PROPAGATION_REQUIRED 屬性執行。
當我們的程式呼叫到 把被 @Transaction 註解修飾的方法時,會被spring的AOP切面攔截,該方法會被進行增強,其中就包含了spring對該方法進行事務管理。spring會對不同的傳播屬性進行不同的事務處理。spring 通過 AbstractPlatformTransactionManager這個類來管理事務。
方法如下:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //doGetTransaction()方法是抽象方法,具體的實現由具體的事務處理器提供 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); //如果沒有配置事務屬性,則使用預設的事務屬性 if (definition == null) { definition = new DefaultTransactionDefinition(); } //檢查當前執行緒是否存在事務 if (isExistingTransaction(transaction)) { //處理已存在的事務 return handleExistingTransaction(definition, transaction, debugEnabled); } //檢查事務屬性中timeout超時屬性設定是否合理 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } //對事務屬性中配置的事務傳播特性處理 //如果事務傳播特性配置的是mandatory,當前沒有事務存在,丟擲異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //如果事務傳播特性為required、required_new或nested else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } //建立事務 try { //不啟用和當前執行緒繫結的事務,因為事務傳播特性配置要求建立新的事務 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //建立一個新的事務狀態 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //建立事務的呼叫,具體實現由具體的事務處理器提供 doBegin(transaction, definition); //初始化和同步事務狀態 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { //建立空事務,否則就建立一個空事務,沒有實際事務,但可能同步。(實際上就是不使用事務) boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //準備事務狀態 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
AbstractPlatformTransactionManager處理已經存在的事務:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//如果事務傳播特性為:never,則丟擲異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//如果事務傳播特性是not_supported,同時當前執行緒存在事務,則將事務掛起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//掛起事務
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//建立非事務的事務狀態,讓方法非事務地執行
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//如果事務傳播特性是required_new,則建立新事務,同時把當前執行緒中存在的
//事務掛起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//掛起已存在的事務
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//將掛起的事務狀態儲存起來
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//建立新事務
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//如果事務傳播特性是nested巢狀事務
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//如果不允許事務巢狀,則丟擲異常
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//如果允許使用savepoint儲存點儲存巢狀事務
if (useSavepointForNestedTransaction()) {
//為當前事務建立一個儲存點
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
//如果不允許使用savepoint儲存點儲存巢狀事務
else {
//使用JTA的巢狀commit/rollback呼叫
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
//對於事務傳播特性為supported和required的處理
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
//校驗已存在的事務,如果已有事務與事務屬性配置不一致,則丟擲異常
if (isValidateExistingTransaction()) {
//如果事務隔離級別不是預設隔離級別
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
//獲取當前事務的隔離級別
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
//如果獲取到的當前事務隔離級別為null獲取不等於事務屬性配置的隔離級別
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
}
}
//如果事務不是隻讀
if (!definition.isReadOnly()) {
//如果當前已有事務是隻讀
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//返回當前事務的執行狀態
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
加個抽離無關程式碼的總結,對流程更清晰一點
下篇將會介紹spring的事務管理如何在日常的開發中更靈活的使用