fescar 原始碼分析-執行流程
阿新 • • 發佈:2019-04-24
com.alibaba.fescar.spring.annotation.GlobalTransactionalInterceptor
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();//執行本地業務 ,com.alibaba.fescar.rm.datasource.StatementProxy.executeUpdate(String),//行鎖 for update
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();//com.alibaba.fescar.rm.datasource.ConnectionProxy -- rm.register() tc獲取鎖 rm.commit()//提交事務
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
com.alibaba.fescar.rm.datasource.ConnectionProxy
@Override
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
report(true);
context.reset();
}
com.alibaba.fescar.rm.datasource.exec.ExecuteTemplate.execute(SQLRecognizer, StatementProxy<S>, StatementCallback<T, S>, Object...)
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException)ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}
com.alibaba.fescar.rm.datasource.exec.AbstractDMLBaseExecutor.executeAutoCommitFalse(Object[])
/**
* Execute auto commit false t.
*
* @param args the args
* @return the t
* @throws Throwable the throwable
*/
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
com.alibaba.fescar.rm.datasource.exec.UpdateExecutor.beforeImage() //加for update
@Override
protected TableRecords beforeImage() throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
String whereCondition = null;
ArrayList<Object> paramAppender = new ArrayList<>();
if (statementProxy instanceof ParametersHolder) {
whereCondition = recognizer.getWhereCondition((ParametersHolder)statementProxy, paramAppender);
} else {
whereCondition = recognizer.getWhereCondition();
}
selectSQLAppender.append(" FROM " + getFromTableInSQL());
if (StringUtils.isNotBlank(whereCondition)) {
selectSQLAppender.append(" WHERE " + whereCondition);
}
selectSQLAppender.append(" FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords beforeImage = null;
PreparedStatement ps = null;
Statement st = null;
ResultSet rs = null;
try {
if (paramAppender.isEmpty()) {
st = statementProxy.getConnection().createStatement();
rs = st.executeQuery(selectSQL);
} else {
ps = statementProxy.getConnection().prepareStatement(selectSQL);
for (int i = 0; i < paramAppender.size(); i++) {
ps.setObject(i + 1, paramAppender.get(i));
}
rs = ps.executeQuery();
}
beforeImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (st != null) {
st.close();
}
if (ps != null) {
ps.close();
}
}
return beforeImage;
}
com.alibaba.fescar.rm.datasource.exec.UpdateExecutor.afterImage(TableRecords)
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
if (!tmeta.containsPK(updateColumns)) {
// PK should be included.
selectSQLAppender.append(getColumnNameInSQL(tmeta.getPkName()) + ", ");
}
for (int i = 0; i < updateColumns.size(); i++) {
selectSQLAppender.append(updateColumns.get(i));
if (i < (updateColumns.size() - 1)) {
selectSQLAppender.append(", ");
}
}
List<Field> pkRows = beforeImage.pkRows();
selectSQLAppender.append(
" FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(pkRows) + " FOR UPDATE");
String selectSQL = selectSQLAppender.toString();
TableRecords afterImage = null;
PreparedStatement pst = null;
ResultSet rs = null;
try {
pst = statementProxy.getConnection().prepareStatement(selectSQL);
int index = 0;
for (Field pkField : pkRows) {
index++;
pst.setObject(index, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
afterImage = TableRecords.buildRecords(tmeta, rs);
} finally {
if (rs != null) {
rs.close();
}
if (pst != null) {
pst.close();
}
}
return afterImage;
}