分散式事務 TCC-Transaction 原始碼分析 —— TCC 實現
本文主要基於 TCC-Transaction 1.2.3.3 正式版
- 1. 概述
- 2. TCC 原理
- 3. TCC-Transaction 原理
- 4. 事務與參與者
- 4.1 事務
- 4.2 參與者
- 5. 事務管理器
- 5.1 發起根事務
- 5.2 傳播發起分支事務
- 5.3 傳播獲取分支事務
- 5.4 提交事務
- 5.5 回滾事務
- 5.6 新增參與者到事務
- 6. 事務攔截器
- 6.1 Compensable
- 6.2 可補償事務攔截器
- 6.3 資源協調者攔截器
- 666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 專案:
-
tcc-transaction-core
-
tcc-transaction-api
:tcc-transaction 使用 API。 -
tcc-transaction-spring
:tcc-transaction Spring 支援。
你行好事會因為得到讚賞而愉悅 同理,開源專案貢獻者會因為 Star 而更加有動力 為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文件 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html TCC事務 為了解決在事務執行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務執行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務程式碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的效能。
- Try 階段
- 完成所有業務檢查( 一致性 )
- 預留必須業務資源( 準隔離性 )
- Try :嘗試執行業務
- Confirm / Cancel 階段:
- 釋放 Try 階段預留的業務資源
- Cancel 操作滿足冪等性
- 真正執行業務
- 不做任務業務檢查
- Confirm 操作滿足冪等性
- Confirm :確認執行業務
- Cancel :取消執行業務
- Confirm 與 Cancel 互斥
整體流程如下圖:
-
紅框部分功能由
tcc-transaction-core
實現:- 啟動業務活動
- 登記業務操作
- 提交 / 回滾業務活動
-
黃框部分功能由
tcc-transaction-http-sample
實現( 官方提供的示例專案 ):- Try 操作
- Confirm 操作
- Cancel 操作
與 2PC協議 比較:
- 位於業務服務層而非自願層
- 沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
- Try 操作可以靈活選擇業務資源的鎖定粒度
- 較高開發成本
參考資料:
- 《支付寶運營架構中柔性事務指的是什麼?》
- 《分散式事務的典型處理方式:2PC、TCC、非同步確保和最大努力型》
3. TCC-Transaction 原理
在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要宣告 try / confirm / cancel 三個型別的方法,和 TCC 的操作一一對應。在程式裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例程式碼如下:
// try
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// confirm
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
// cancel
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
- 在示例程式碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法呼叫,從而實現 TCC 。簡化流程如下圖:
第一個攔截器,可補償事務攔截器,實現如下功能:
- 在 Try 階段,對事務的發起、傳播。
- 在 Confirm / Cancel 階段,對事務提交或回滾。
- 為什麼會有對事務的傳播呢?在遠端呼叫服務的參與者時,會通過"引數"( 需要序列化 )的形式傳遞事務給遠端參與者。
第二個攔截器,資源協調者攔截器,實現如下功能:
- 在 Try 階段,新增參與者到事務中。當事務上下文不存在時,進行建立。
實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。
在 TCC-Transaction 程式碼實現上,元件分層如下圖:
本文按照如下順序分享:
- 「4. 事務攔截器」
- 「5. 事務管理器」
- 「6. 事務管理器」
內容是自下而上的方式分享,每個元件可以更加整體的被認識。當然這可能對你理解時產生一臉悶逼,所以推薦兩種閱讀方式:
- 簡讀 x 1 + 深讀 x 1
- 倒著讀,發現未分享的方法,全文檢索該方法。
事務儲存器在《TCC-Transaction 原始碼解析 —— 事務儲存於恢復》詳細解析。
事務恢復在《TCC-Transaction 原始碼解析 —— 事務恢復》詳細解析。
4. 事務與參與者
在 TCC 裡,一個事務( org.mengyun.tcctransaction.Transaction
) 可以有多個參與者( org.mengyun.tcctransaction.Participant
)參與業務活動。類圖關係如下( 開啟大圖 ):
4.1 事務
Transaction 實現程式碼如下:
public class Transaction implements Serializable {
private static final long serialVersionUID = 7291423944314337931L;
/**
* 事務編號
*/
private TransactionXid xid;
/**
* 事務狀態
*/
private TransactionStatus status;
/**
* 事務型別
*/
private TransactionType transactionType;
/**
* 重試次數
*/
private volatile int retriedCount = 0;
/**
* 建立時間
*/
private Date createTime = new Date();
/**
* 最後更新時間
*/
private Date lastUpdateTime = new Date();
/**
* 版本號
*/
private long version = 1;
/**
* 參與者集合
*/
private List<Participant> participants = new ArrayList<Participant>();
/**
* 附帶屬性對映
*/
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
/**
* 新增參與者
*
* @param participant 參與者
*/
public void enlistParticipant(Participant participant) {
participants.add(participant);
}
/**
* 提交 TCC 事務
*/
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
/**
* 回滾 TCC 事務
*/
public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}
}
- xid,事務編號( TransactionXid ),用於唯一標識一個事務。使用 UUID 演算法生成,保證唯一性。
org.mengyun.tcctransaction.api.TransactionXid
實現javax.transaction.xa.Xid
介面,實現程式碼如下: public class TransactionXid implements Xid, Serializable { private static final long serialVersionUID = -6817267250789142043L;/** * xid 格式識別符號 */ private int formatId = 1; /** * 全域性事務編號 */ private byte[] globalTransactionId; /** * 分支事務編號 */ private byte[] branchQualifier; }- TODO 為什麼要繼承 Xid 介面?
- status,事務狀態( TransactionStatus )。
org.mengyun.tcctransaction.api.TransactionStatus
實現程式碼如下: public enum TransactionStatus { /** * 嘗試中狀態 */ TRYING(1), /** * 確認中狀態 */ CONFIRMING(2), /** * 取消中狀態 */ CANCELLING(3);private int id; } - transactionType,事務型別( TransactionType )。
org.mengyun.tcctransaction.common.TransactionType
實現程式碼如下: public enum TransactionType {/** * 根事務 */ ROOT(1), /** * 分支事務 */ BRANCH(2); int id; }- 在「6.2 可補償事務攔截器」有詳細解析,可以看到看到這兩種事務是如何發起。
- retriedCount,重試次數。在 TCC 過程中,可能參與者異常崩潰,這個時候會進行重試直到成功或超過最大次數。在《TCC-Transaction 原始碼解析 —— 事務恢復》詳細解析。
- version,版本號,用於樂觀鎖更新事務。在《TCC-Transaction 原始碼解析 —— 事務儲存器》詳細解析。
- attachments,附帶屬性對映。在《TCC-Transaction 原始碼解析 —— Dubbo 支援》詳細解析。
- 提供
#enlistParticipant()
方法,新增事務參與者。 - 提供
#commit()
方法,呼叫參與者們提交事務。 - 提供
#rollback()
方法,呼叫參與者回滾事務。
4.2 參與者
Participant 實現程式碼如下:
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
/**
* 事務編號
*/
private TransactionXid xid;
/**
* 確認執行業務方法呼叫上下文
*/
private InvocationContext confirmInvocationContext;
/**
* 取消執行業務方法
*/
private InvocationContext cancelInvocationContext;
/**
* 執行器
*/
private Terminator terminator = new Terminator();
/**
* 事務上下文編輯
*/
Class<? extends TransactionContextEditor> transactionContextEditorClass;
/**
* 提交事務
*/
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
/**
* 回滾事務
*/
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}
}
- xid,參與者事務編號。通過
TransactionXid.globalTransactionId
屬性,關聯上其所屬的事務。當參與者進行遠端呼叫時,遠端的分支事務的事務編號等於該參與者的事務編號。通過事務編號的關聯,TCC Confirm / Cancel 階段,使用參與者的事務編號和遠端的分支事務進行關聯,從而實現事務的提交和回滾,在「5.2 傳播發起分支事務」 + 「6.2 可補償事務攔截器」可以看到具體實現。 - confirmInvocationContext,確認執行業務方法呼叫上下文( InvocationContext )。
org.mengyun.tcctransaction.InvocationContext
實現程式碼如下: public class InvocationContext implements Serializable {private static final long serialVersionUID = -7969140711432461165L; /** * 類 */ private Class targetClass; /** * 方法名 */ private String methodName; /** * 引數型別陣列 */ private Class[] parameterTypes; /** * 引數陣列 */ private Object[] args; }- InvocationContext,執行方法呼叫上下文,記錄類、方法名、引數型別陣列、引數陣列。通過這些屬性,可以執行提交 / 回滾事務。在
org.mengyun.tcctransaction.Terminator
會看到具體的程式碼實現。本質上,TCC 通過多個參與者的 try / confirm / cancel 方法,實現事務的最終一致性。
- InvocationContext,執行方法呼叫上下文,記錄類、方法名、引數型別陣列、引數陣列。通過這些屬性,可以執行提交 / 回滾事務。在
- cancelInvocationContext,取消執行業務方法呼叫上下文( InvocationContext )。
- terminator,執行器( Terminator )。
org.mengyun.tcctransaction.Terminator
實現程式碼如下:
public class Terminator implements Serializable {
private static final long serialVersionUID = -164958655471605778L;
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try { // 獲得 參與者物件
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
// 獲得 方法
Method method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes()); // 設定 事務上下文 到方法引數
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs()); // 執行方法
return method.invoke(target, invocationContext.getArgs()); } catch (Exception e) {
throw new SystemException(e); } }
return null; } }
- transactionContextEditorClass,事務上下文編輯,在「6.1 Compensable」詳細解析。
- 提交
#commit()
方法,提交參與者自己的事務。 - 提交
#rollback()
方法,回滾參與者自己的事務。
5. 事務管理器
org.mengyun.tcctransaction.TransactionManager
,事務管理器,提供事務的獲取、發起、提交、回滾,參與者的新增等等方法。
5.1 發起根事務
提供 begin()
方法,發起根事務。該方法在呼叫方法型別為 MethodType.ROOT 並且 事務處於 Try 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現程式碼如下:
// TransactionManager.java
/**
* 發起根事務
*
* @return 事務
*/
public Transaction begin() {
// 建立 根事務
Transaction transaction = new Transaction(TransactionType.ROOT);
// 儲存 事務
transactionRepository.create(transaction);
// 註冊 事務
registerTransaction(transaction);
return transaction;
}
- 呼叫 Transaction 構造方法,建立根事務。實現程式碼如下:
// Transaction.java /** * 建立指定型別的事務 * * @param transactionType 事務型別 */
public Transaction(TransactionType transactionType) {
this.xid = new TransactionXid();
this.status = TransactionStatus.TRYING; // 嘗試中狀態
this.transactionType = transactionType; }
- 呼叫
TransactionRepository#crete()
方法,儲存事務。 - 呼叫
#registerTransaction(...)
方法,註冊事務到當前執行緒事務佇列。實現程式碼如下: // TransactionManager.java /** * 當前執行緒事務佇列 */ private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>(); /** * 註冊事務到當前執行緒事務佇列 * * @param transaction 事務 */ private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); // 新增到頭部 }-
可能有同學會比較好奇,為什麼使用佇列儲存當前執行緒事務?TCC-Transaction 支援多個的事務獨立存在,後建立的事務先提交,類似 Spring 的
org.springframework.transaction.annotation.Propagation.REQUIRES_NEW
。在下文,很快我們就會看到 TCC-Transaction 自己的org.mengyun.tcctransaction.api.Propagation
。
-
可能有同學會比較好奇,為什麼使用佇列儲存當前執行緒事務?TCC-Transaction 支援多個的事務獨立存在,後建立的事務先提交,類似 Spring 的
5.2 傳播發起分支事務
呼叫 #propagationNewBegin(...)
方法,傳播發起分支事務。該方法在呼叫方法型別為 MethodType.PROVIDER 並且 事務處於 Try 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現程式碼如下:
/**
* 傳播發起分支事務
*
* @param transactionContext 事務上下文
* @return 分支事務
*/
public Transaction propagationNewBegin(TransactionContext transactionContext) {
// 建立 分支事務
Transaction transaction = new Transaction(transactionContext);
// 儲存 事務
transactionRepository.create(transaction);
// 註冊 事務
registerTransaction(transaction);
return transaction;
}
- 呼叫 Transaction 構造方法,建立分支事務。實現程式碼如下:
/** * 建立分支事務 * * @param transactionContext 事務上下文 */ public Transaction(TransactionContext transactionContext) { this.xid = transactionContext.getXid(); // 事務上下文的 xid this.status = TransactionStatus.TRYING; // 嘗試中狀態 this.transactionType = TransactionType.BRANCH; // 分支事務 }
- 分支事務使用傳播的事務上下文的事務編號。
- 呼叫
TransactionRepository#crete()
方法,儲存事務。為什麼要儲存分支事務,在「6.3 資源協調者攔截器」詳細解析。 - 呼叫
#registerTransaction(...)
方法,註冊事務到當前執行緒事務佇列。
5.3 傳播獲取分支事務
呼叫 #propagationExistBegin(...)
方法,傳播發起分支事務。該方法在呼叫方法型別為 MethodType.PROVIDER 並且 事務處於 Confirm / Cancel 階段被呼叫。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現程式碼如下:
/**
* 傳播獲取分支事務
*
* @param transactionContext 事務上下文
* @return 分支事務
* @throws NoExistedTransactionException 當事務不存在時
*/
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
// 查詢 事務
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
// 設定 事務 狀態
transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
// 註冊 事務
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
}
- 呼叫
TransactionRepository#findByXid()
方法,查詢事務。 - 呼叫
Transaction#changeStatus(...)
方法,設定事務狀態為 CONFIRMING 或 CANCELLING。 - 呼叫
#registerTransaction(...)
方法,註冊事務到當前執行緒事務佇列。 - 為什麼此處是分支事務呢?結合
#propagationNewBegin(...)
思考下。
5.4 提交事務
呼叫 #commit(...)
方法,提交事務。該方法在事務處於 Confirm / Cancel 階段被呼叫。
實現程式碼如下:
/**
* 提交事務
*/
public void commit() {
// 獲取 事務
Transaction transaction = getCurrentTransaction();
// 設定 事務狀態 為 CONFIRMING
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新 事務
transactionRepository.update(transaction);
try {
// 提交 事務
transaction.commit();
// 刪除 事務
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.error("compensable transaction confirm failed.", commitException);
throw new ConfirmingException(commitException);
}
}
- 呼叫
#getCurrentTransaction()
方法, 獲取事務。實現程式碼如下:
public Transaction getCurrentTransaction() {
if (isTransactionActive()) {
return CURRENT.get().peek(); // 獲得頭部元素 }
return null; }
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty(); }
- 呼叫
Transaction#changeStatus(...)
方法, 設定事務狀態為 CONFIRMING。 - 呼叫
TransactionRepository#update(...)
方法, 更新事務。 - 呼叫
Transaction#commit(...)
方法, 提交事務。 - 呼叫
TransactionRepository#delete(...)
方法,刪除事務。
5.5 回滾事務
呼叫 #rollback(...)
方法,取消事務,和 #commit()
方法基本類似。該方法在事務處於 Confirm / Cancel 階段被呼叫。
實現程式碼如下:
/**
* 回滾事務
*/
public void rollback() {
// 獲取 事務
Transaction transaction = getCurrentTransaction();
// 設定 事務狀態 為 CANCELLING
transaction.changeStatus(TransactionStatus.CANCELLING);
// 更新 事務
transactionRepository.update(transaction);
try {
// 提交 事務
transaction.rollback();
// 刪除 事務
transactionRepository.delete(transaction);
} catch (Throwable rollbackException) {
logger.error("compensable transaction rollback failed.", rollbackException);
throw new CancellingException(rollbackException);
}
}
- 呼叫
#getCurrentTransaction()
方法,獲取事務。 - 呼叫
Transaction#changeStatus(...)
方法, 設定事務狀態為 CANCELLING。 - 呼叫
TransactionRepository#update(...)
方法, 更新事務。 - 呼叫
Transaction#rollback(...)
方法, 回滾事務。 - 呼叫
TransactionRepository#delete(...)
方法,刪除事務。
5.6 新增參與者到事務
呼叫 #enlistParticipant(...)
方法,新增參與者到事務。該方法在事務處於 Try 階段被呼叫,在「6.3 資源協調者攔截器」有詳細解析。
實現程式碼如下:
/**
* 新增參與者到事務
*
* @param participant 參與者
*/
public void enlistParticipant(Participant participant) {
// 獲取 事務
Transaction transaction = this.getCurrentTransaction();
// 新增參與者
transaction.enlistParticipant(participant);
// 更新 事務
transactionRepository.update(transaction);
}
- 呼叫
#getCurrentTransaction()
方法,獲取事務。 - 呼叫
Transaction#enlistParticipant(...)
方法, 新增參與者到事務。 - 呼叫
TransactionRepository#update(...)
方法, 更新事務。
6. 事務攔截器
TCC-Transaction 基於 org.mengyun.tcctransaction.api.@Compensable
+ org.aspectj.lang.annotation.@Aspect
註解 AOP 切面實現業務方法的 TCC 事務宣告攔截,同 Spring 的 org.springframework.transaction.annotation.@Transactional
的實現。
TCC-Transaction 有兩個攔截器:
-
org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor
,可補償事務攔截器。 -
org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor
,資源協調者攔截器。
在分享攔截器的實現之前,我們先一起看看 @Compensable 註解。
6.1 Compensable
@Compensable,標記可補償的方法註解。實現程式碼如下:
public @interface Compensable {
/**
* 傳播級別
*/
Propagation propagation() default Propagation.REQUIRED;
/**
* 確認執行業務方法
*/
String confirmMethod() default "";
/**
* 取消執行業務方法
*/
String cancelMethod() default "";
/**
* 事務上下文編輯
*/
Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
}
- propagation,傳播級別( Propagation ),預設 Propagation.REQUIRED。和 Spring 的 Propagation 除了缺少幾個屬性,基本一致。實現程式碼如下: public enum Propagation {/** * 支援當前事務,如果當前沒有事務,就新建一個事務。 */ REQUIRED(0), /** * 支援當前事務,如果當前沒有事務,就以非事務方式執行。 */ SUPPORTS(1), /** * 支援當前事務,如果當前沒有事務,就丟擲異常。 */ MANDATORY(2), /** * 新建事務,如果當前存在事務,把當前事務掛起。 */ REQUIRES_NEW(3); private final int value; }
- confirmMethod,確認執行業務方法名。
- cancelMethod,取消執行業務方法名。
- TransactionContextEditor,事務上下文編輯器( TransactionContextEditor ),用於設定和獲得事務上下文( TransactionContext ),在「6.3 資源協調者攔截器」可以看到被呼叫,此處只看它的程式碼實現。
org.mengyun.tcctransaction.api.TransactionContextEditor
介面程式碼如下: public interface TransactionContextEditor {/** * 從引數中獲得事務上下文 * * @param target 物件 * @param method 方法 * @param args 引數 * @return 事務上下文 */ TransactionContext get(Object target, Method method, Object[] args); /** * 設定事務上下文到引數中 * * @param transactionContext 事務上下文 * @param target 物件 * @param method 方法 * @param args 引數 */ void set(TransactionContext transactionContext, Object target, Method method, Object[] args); }- x
- DefaultTransactionContextEditor,預設事務上下文編輯器實現。實現程式碼如下:
class DefaultTransactionContextEditor implements TransactionContextEditor {
@Override public TransactionContext get(Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
return (TransactionContext) args[position]; }
return null; }
@Override public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
args[position] = transactionContext; // 設定方法引數 } } /** * 獲得事務上下文在方法引數裡的位置 * * @param parameterTypes 引數型別集合 * @return 位置 */
public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
int position = -1;
for (int i = 0; i < parameterTypes.length; i++) {
if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
position = i;
break; } }
return position; } }
- NullableTransactionContextEditor,無事務上下文編輯器實現。實現程式碼如下: ```Java class NullableTransactionContextEditor implements TransactionContextEditor { @Override public TransactionContext get(Object target, Method method, Object[] args) { return nul l; } @Overr ide public void set(TransactionContext transactionContext, Object target, Method method, Object[] arg s ) { } } ```
- DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,通過 Dubbo 隱式傳參方式獲得事務上下文,在《TCC-Transaction 原始碼解析 —— Dubbo 支援》詳細解析。
6.2 可補償事務攔截器
先一起來看下可補償事務攔截器對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現程式碼如下:
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}
- 通過
org.aspectj.lang.annotation.@Pointcut
+org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,呼叫CompensableTransactionInterceptor#interceptCompensableMethod(...)
方法進行處理。
CompensableTransactionInterceptor 實現程式碼如下:
public class CompensableTransactionInterceptor {
private TransactionManager transactionManager;
private Set<Class<? extends Exception>> delayCancelExceptions;
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 獲得帶 @Compensable 註解的方法
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
//
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
// 獲得 事務上下文
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
// 當前執行緒是否在事務中
boolean isTransactionActive = transactionManager.isTransactionActive();
// 判斷事務上下文是否合法
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
// 計算方法型別
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
// 處理
switch (methodType) {
case ROOT:
return rootMethodProceed(pjp);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext);
default:
return pjp.proceed();
}
}
}
- 呼叫 CompensableMethodUtils#getCompensableMethod(...) 方法,獲得帶 @Compensable 註解的方法。實現程式碼如下:
// CompensableMethodUtils.java /** * 獲得帶 @Compensable 註解的方法 * * @param pjp 切面點 * @return 方法 */
public static Method getCompensableMethod(ProceedingJoinPoint pjp) {
Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); // 代理方法物件
if (method.getAnnotation(Compensable.class) == null) {
try {
method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes()); // 實際方法物件 }
catch (NoSuchMethodException e) {
return null; } }
return method; }
- 呼叫
TransactionContextEditor#get(...)
方法,從引數中獲得事務上下文。為什麼從引數中可以獲得事務上下文呢?在「6.3 資源協調者攔截器」揭曉答案。 - 呼叫
TransactionManager#isTransactionActive()
方法,當前執行緒是否在事務中。實現程式碼如下:
// TransactionManager.java
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty(); }
- 呼叫
TransactionUtils#isLegalTransactionContext(...)
方法,判斷事務上下文是否合法。實現程式碼如下:
// TransactionUtils.java
/** * 判斷事務上下文是否合法 * 在 Propagation.MANDATORY 必須有在事務內 * * @param isTransactionActive 是否 * @param propagation 傳播級別 * @param transactionContext 事務上下文 * @return 是否合法 */
public static boolean isLegalTransactionContext(boolean isTransactionActive, Propagation propagation, TransactionContext transactionContext) {
if (propagation.equals(Propagation.MANDATORY) && !isTransactionActive && transactionContext == null) {
return false; }
return true; }
- 呼叫
CompensableMethodUtils#calculateMethodType(...)
方法,計算方法型別。實現程式碼如下:
/** * 計算方法型別 * * @param propagation 傳播級別 * @param isTransactionActive 是否事務開啟 * @param transactionContext 事務上下文 * @return 方法型別 */
public static MethodType calculateMethodType(Propagation propagation, boolean isTransactionActive, TransactionContext transactionContext) {
if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) // Propagation.REQUIRED:支援當前事務,當前沒有事務,就新建一個事務。
|| propagation.equals(Propagation.REQUIRES_NEW)) {
// Propagation.REQUIRES_NEW:新建事務,如果當前存在事務,把當前事務掛起。
return MethodType.ROOT; }
else if ((propagation.equals(Propagation.REQUIRED) // Propagation.REQUIRED:支援當前事務
|| propagation.equals(Propagation.MANDATORY)) // Propagation.MANDATORY:支援當前事務
&& !isTransactionActive && transactionContext != null) {
return MethodType.PROVIDER; }
else {
return MethodType.NORMAL; } }
- 方法型別為 MethodType.ROOT 時,發起分支事務,判斷條件如下二選一: * 事務傳播級別為 Propagation.REQUIRED,並且當前不存在事務,並且方法引數傳遞了事務上下文。 * 事務傳播級別為 Propagation.PROVIDER,並且當前不存在事務,並且方法引數傳遞了事務上下文。 * 當前不存在事務,方法引數傳遞了事務上下文是什麼意思?當跨服務遠端呼叫時,被呼叫服務本身( 服務提供者 )不在事務中,通過傳遞事務上下文引數,融入當前事務。
- 方法型別為 MethodType.Normal 時,不進行事務處理。
- MethodType.CONSUMER 專案已經不再使用,猜測已廢棄。
- 當方法型別為 MethodType.ROOT 時,呼叫
#rootMethodProceed(...)
方法,發起 TCC 整體流程。實現程式碼如下:
private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
Object returnValue;
Transaction transaction = null;
try { // 發起根事務
transaction = transactionManager.begin(); // 執行方法原邏輯
try {
returnValue = pjp.proceed(); } catch (Throwable tryingException) {
if (isDelayCancelException(tryingException)) { // 是否延遲迴滾 }
else {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
// 回滾事務
transactionManager.rollback(); }
throw tryingException; } // 提交事務
transactionManager.commit(); }
finally { // 將事務從當前執行緒事務佇列移除
transactionManager.cleanAfterCompletion(transaction); }
return returnValue; }
// TransactionManager.java
public void cleanAfterCompletion(Transaction transaction) {
if (isTransactionActive() && transaction != null) {
Transaction currentTransaction = getCurrentTransaction();
if (currentTransaction == transaction) {
CURRENT.get().pop(); }
else {
throw new SystemException("Illegal transaction when clean after completion");
} } }
- 當方法型別為 Propagation.PROVIDER 時,服務提供者參與 TCC 整體流程。實現程式碼如下:
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable {
Transaction transaction = null; try { switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING: // 傳播發起分支事務 transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed(); case CONFIRMING: try { // 傳播獲取分支事務
transaction = transactionManager.propagationExistBegin(transactionContext); // 提交事務
transactionManager.commit(); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. }
break; case CANCELLING: try { // 傳播獲取分支事務
transaction = transactionManager.propagationExistBegin(transactionContext); // 回滾事務
transactionManager.rollback(); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. }
break; } } finally { // 將事務從當前執行緒事務佇列移除 transactionManager.cleanAfterCompletion(transaction); } // 返回空值
Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); }
public static Object getNullValue(Class type) { // 處理基本型別
if (boolean.class.equals(type)) {
return false; }
else if (byte.class.equals(type)) {
return 0; }
else if (short.class.equals(type)) {
return 0; }
else if (int.class.equals(type)) {
return 0; }
else if (long.class.equals(type)) {
return 0; }
else if (float.class.equals(type)) {
return 0; }
else if (double.class.equals(type)) {
return 0; } // 處理物件
return null; }
- 當方法型別為 Propagation.NORMAL 時,執行方法原邏輯,不進行事務處理。
6.3 資源協調者攔截器
先一起來看下資源協調者攔截器 對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現程式碼如下:
@Aspect
public abstract class ResourceCoordinatorAspect {
private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {
}
@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
}
public abstract int getOrder();
}
- 通過
org.aspectj.lang.annotation.@Pointcut
+org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,呼叫ResourceCoordinatorInterceptor#interceptTransactionContextMethod(...)
方法進行處理。
ResourceCoordinatorInterceptor 實現程式碼如下:
public class ResourceCoordinatorInterceptor {
private TransactionManager transactionManager;
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
// 新增事務參與者
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
// 執行方法原邏輯
return pjp.proceed(pjp.getArgs());
}
}
- 當事務處於 TransactionStatus.TRYING 時,呼叫
#enlistParticipant(...)
方法,新增事務參與者。 - 呼叫
ProceedingJoinPoint#proceed(...)
方法,執行方法原邏輯。
ResourceCoordinatorInterceptor#enlistParticipant()
實現程式碼如下:
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
// 獲得 @Compensable 註解
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
if (method == null) {
throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
}
Compensable compensable = method.getAnnotation(Compensable.class);
// 獲得 確認執行業務方法 和 取消執行業務方法
String confirmMethodName = compensable.confirmMethod();
String cancelMethodName = compensable.cancelMethod();
// 獲取 當前執行緒事務第一個(頭部)元素
Transaction transaction = transactionManager.getCurrentTransaction();
// 建立 事務編號
TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
// TODO
if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
}
// 獲得類
Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
// 建立 確認執行方法呼叫上下文 和 取消執行方法呼叫上下文
InvocationContext confirmInvocation = new InvocationContext(targetClass,
confirmMethodName,
method.getParameterTypes(), pjp.getArgs());
InvocationContext cancelInvocation = new InvocationContext(targetClass,
cancelMethodName,
method.getParameterTypes(), pjp.getArgs());
// 建立 事務參與者
Participant participant =
new Participant(
xid,
confirmInvocation,
cancelInvocation,
compensable.transactionContextEditor());
// 新增 事務參與者 到 事務
transactionManager.enlistParticipant(participant);
}
- 呼叫
CompensableMethodUtils#getCompensableMethod(...)
方法,獲得帶 @Compensable 註解的方法。 - 呼叫
#getCurrentTransaction()
方法, 獲取事務。 - 呼叫 TransactionXid 構造方法,建立分支事務編號。實現程式碼如下:
/** * 全域性事務編號 */ private byte[] globalTransactionId; /** * 分支事務編號 */ private byte[] branchQualifier; public TransactionXid(byte[] globalTransactionId) { this.globalTransactionId = globalTransactionId; branchQualifier = uuidToByteArray(UUID.randomUUID()); // 生成 分支事務編號 }
- 分支事務編號(
branchQualifier
) 需要生成。
- 分支事務編號(
- TODO TransactionContext 和 Participant 的關係。
- 呼叫
ReflectionUtils#getDeclaringType(...)
方法,獲得宣告 @Compensable 方法的實際類。實現程式碼如下:
public static Class getDeclaringType(Class aClass, String methodName, Class<?>[] parameterTypes) {
Method method;
Class findClass = aClass; do {
Class[] clazzes = findClass.getInterfaces();
for (Class clazz : clazzes) { try {
method = clazz.getDeclaredMethod(methodName, parameterTypes);
} catch (NoSuchMethodException e) {
method = null; }
if (method != null) {
return clazz; } }
findClass = findClass.getSuperclass(); } while (!findClass.equals(Object.class));
return aClass; }
- 呼叫 InvocationContext 構造方法,分別建立確認執行方法呼叫上下文和取消執行方法呼叫上下文。實現程式碼如下: /** * 類 */ private Class targetClass; /** * 方法名 */ private String methodName; /** * 引數型別陣列 */ private Class[] parameterTypes; /** * 引數陣列 */ private Object[] args; public InvocationContext(Class targetClass, String methodName, Class[] parameterTypes, Object... args) { this.methodName = methodName; this.parameterTypes = parameterTypes; this.targetClass = targetClass; this.args = args; }
- 呼叫 Participant 構造方法,建立事務參與者。實現程式碼如下:
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L; /** * 事務編號 */
private TransactionXid xid; /** * 確認執行業務方法呼叫上下文 */
private InvocationContext confirmInvocationContext; /** * 取消執行業務方法 */
private InvocationContext cancelInvocationContext; /** * 執行器 */
private Terminator terminator = new Terminator();
/** * 事務上下文編輯 */
Class<? extends TransactionContextEditor> transactionContextEditorClass;
public Participant() { }
public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
this.xid = xid;
this.confirmInvocationContext = confirmInvocationContext;
this.cancelInvocationContext = cancelInvocationContext;
this.transactionContextEditorClass = transactionContextEditorClass;
} }
- 呼叫
TransactionManager#enlistParticipant(...)
方法,新增事務參與者到事務。
666. 彩蛋
受限於本人的能力,蠻多處表達不夠清晰或者易懂,非常抱歉。如果你對任何地方有任何疑問,歡迎新增本人微訊號( wangwenbin-server ),期待與你的交流。不限於 TCC,也可以是分散式事務,也可以是微服務,以及等等。
外送一本武林祕籍:帶中文註釋的 TCC-Transaction 倉庫地址,目前正在慢慢完善。傳送門:https://github.com/YunaiV/tcc-transaction。
再送一本葵花寶典:《TCC型分散式事務原理和實現》系列。
胖友,分享一個朋友圈可好?