Spring事務實現原始碼之事務實現以及Connection的繫結與獲取
PlatformTransactionManager是spring事務的高階抽象,事務的實現需要藉助PlatformTransactionManager完成,該管理主要方法如下:
當我們在使用事務的時候,需要呼叫如下方法獲取一個事務狀態物件。
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
接下來就看一下AbstractPlatformTransactionManager#getTransaction原始碼:
@Override public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //transaction型別:DataSourceTransactionManager.DataSourceTransactionObject //注意 DataSourceTransactionObject的資料結構,DataSourceTransactionObject包含一個ConnectionHolder // 此時會 Object transaction = doGetTransaction(); if (definition == null) { // 如果傳入的事務定義例項為null的話則建立一個預設的事務定義例項 definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { // 事務傳播行為有關的的處理 return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //重重之重的程式碼,完成建立資料庫連線繫結以及設定自動提交為False以及 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
doGetTransaction方法會嘗試獲取連線,如果當前執行緒沒有繫結Connection的話則返回null,org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction原始碼如下:
@Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 重點程式碼,使用TransactionSynchronizationManager獲取上下文是否有Connection //TransactionSynchronizationManager獲取連線是藉助ThreadLocal實現的 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); return txObject; }
在doGetTransaction如果當前沒有繫結連線的話,則需要給當前執行緒繫結一個連線;org.springframework.transaction.support.AbstractPlatformTransactionManager#doBegin方法是事務的基礎,接下里以DataSourceTransactionManager實現為例,檢視一下原始碼:
@Override protected void doBegin(Object transaction, TransactionDefinition definition) { // DataSourceTransactionObject持有資料庫連線ConnectionHolder成員 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 上下文沒有獲取到Connection,因此需要在資料庫連線池中獲取一個Connection 並設定到事務物件中 // 但是由於該連線並沒有和執行緒繫結因此需要跟執行緒繫結,下文TransactionSynchronizationManager.bindResourc方法繫結到執行緒 Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 將連線跟當前執行緒進行繫結 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
到此為止就完成了當前執行緒的Connection繫結,繫結資源時候呼叫TransactionSynchronizationManager#bindResource方法,傳入兩個引數:分別是DataSource與ConnechtionHolder,這是為了避免一個執行緒處理多個連線池的Connection時候出錯而設定,這樣獲取連線時候會根據執行緒與連線池共同為key獲取對應的唯一Connection。在Connection繫結時候TransactionSynchronizationManager是一個重要的工具。
TransactionSynchronizationManager 該類是一個重要的工具類,用於Spring事務中的資源繫結,每一個執行緒繫結一個屬於自己的Connection,這樣保證事務有序不亂。底層使用ThreadLocal實現!!!!
當某一執行緒需要獲取連線時候會呼叫doGetResource方法獲取連線: org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource; 當doGetResource方法獲取不到連線時候呼叫bindResource方法繫結連線: org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
接下來檢視一下資料查詢時候如何獲取執行緒繫結的Connection,為了便於debug使用的程式設計式事務管理,原始碼如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 掃描註解-->
<context:component-scan base-package="com.javartisan.jdbc.spring"/>
<bean id="datasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="minIdle" value="2"/>
<property name="maxIdle" value="5"/>
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="maxTotal" value="10"/>
<property name="initialSize" value="5"/>
<property name="url"
value="jdbc:mysql://127.0.0.1:3306/databasebook?serverTimezone=UTC&characterEncoding=utf-8&useSSL=true"/>
</bean>
<bean id="platformTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="datasource"/>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="datasource"/>
</bean>
</beans>
Service原始碼:
@Service
public class StudentService {
@Resource
private PlatformTransactionManager platformTransactionManager;
@Resource
private StudentDao studentDao;
public List<Student> getStudents() {
TransactionDefinition td = new DefaultTransactionDefinition();
TransactionStatus status = platformTransactionManager.getTransaction(td);
List<Student> res = null;
try {
res = studentDao.getStudents();
studentDao.insert();
// int intRes = 1 / 0;
platformTransactionManager.commit(status);
} catch (Exception e) {
platformTransactionManager.rollback(status);
}
return res;
}
}
Dao原始碼:
@Repository
public class StudentDao {
@Resource
private JdbcTemplate jdbcTemplate;
public List<Student> getStudents() {
return jdbcTemplate.query("select * from student", new StudentMapper());
}
public void insert() {
Object[] args = {System.currentTimeMillis() + "", "0907", "1", 18, "0907"};
int[] types = {Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.VARCHAR};
jdbcTemplate.update("INSERT INTO databasebook.STUDENT (sno, sname, ssex, sage, sdept) VALUES (?,?,?, ?, ?);", args, types);
}
}
此處以getStudents方法為例跟進分析如何獲取當前繫結的Connection,方法呼叫流程如下:
jdbcTemplate.query呼叫JdbcTemplate.execute方法,重點看一下JdbcTemplate.execute原始碼:
@Override
public <T> T execute(StatementCallback<T> action) throws DataAccessException {
Assert.notNull(action, "Callback object must not be null");
// 重點程式碼,使用DataSourceUtils工具獲取繫結資源
Connection con = DataSourceUtils.getConnection(getDataSource());
Statement stmt = null;
try {
Connection conToUse = con;
if (this.nativeJdbcExtractor != null &&
this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativeStatements()) {
conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
}
stmt = conToUse.createStatement();
applyStatementSettings(stmt);
Statement stmtToUse = stmt;
if (this.nativeJdbcExtractor != null) {
stmtToUse = this.nativeJdbcExtractor.getNativeStatement(stmt);
}
T result = action.doInStatement(stmtToUse);
handleWarnings(stmt);
return result;
}
catch (SQLException ex) {
// Release Connection early, to avoid potential connection pool deadlock
// in the case when the exception translator hasn't been initialized yet.
JdbcUtils.closeStatement(stmt);
stmt = null;
DataSourceUtils.releaseConnection(con, getDataSource());
con = null;
throw getExceptionTranslator().translate("StatementCallback", getSql(action), ex);
}
finally {
JdbcUtils.closeStatement(stmt);
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
org.springframework.jdbc.datasource.DataSourceUtils#getConnection原始碼如下:
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection原始碼:
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
// 重點:還是TransactionSynchronizationManager獲取繫結資源
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(dataSource.getConnection());
}
//返回繫結資源
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
// 如果沒有獲取到的話則進行建立並繫結資源資源返回
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
} else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
到此就即可獲取到繫結的Connection資源!!!
重重重要:
TransactionSynchronizationManager是繫結資源與獲取繫結資源的核心工具!!!!