1. 程式人生 > >分散式事務Hmily TCC原始碼跟讀記錄

分散式事務Hmily TCC原始碼跟讀記錄

一、什麼是分散式事務

分散式事務是指事務的參與者、支援事務的伺服器、資源伺服器以及事務管理器分別位於不同的分散式系統的不同節點上,

本質上來說,分散式事務是為了保證不同資料庫的資料一致性

TCC事務主要是基於AOP切面攔截實現的三階段提交事務,下面我們來跟讀原始碼

二、CAP理論

解決多個微服務之間資料可能不一致的問題,一般有個CAP理論:

  • C: 一致性.在分散式系統中的所有資料備份,在同一時刻具有同樣的值,所有節點在同一時刻讀取的資料都是最新的資料副本
  • A: 可用性,好的響應效能.完全的可用性指的是在任何故障模型下,服務都會有限的時間內處理完成並進行響應
  • P:分割槽容忍性.儘管網路有部分訊息丟失,但系統仍然可以繼續工作

CAP原理證明,任何分散式系統只可同時滿足以上兩點,無法三者兼顧.由於關係型資料庫是單節點無複製的,因此不具有分割槽容忍性,但具有一致性和可用性,而分散式的微服務系統都必須滿足分割槽容忍性,SpringCloud中的Eureka就是A P 定理的結合

三、Hmily框架原始碼跟讀

專案結構:

1.hmily-tcc-admin:用來監控事務日誌

2.hmily-tcc-annotation:提供tcc aop註解

3.hmily-tcc-common:提供一些工具類和配置類

4.hmily-tcc-core:分散式事務執行的核心部分

這個框架主要模擬一個場景,下單---->新建訂單---->扣款----->減庫存的操作,涉及到3個庫

tcc分散式事務主要是基於AOP切面原理進行註解攔截三階段提交(try-confirm-cancel)的分散式事務,下面我們進行原始碼跟讀:

  • 註解攔截器

HmilyTransactionBootstrap啟動引導類

/**
 * tcc分散式事務框架註解.
 * @author xiaoyu
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Tcc {

    /**
     * spring事務傳播.
     * @return {@linkplain PropagationEnum}
     */
    PropagationEnum propagation() default PropagationEnum.PROPAGATION_REQUIRED;

    /**
     * tcc框架確認方法 tcc中第一個c.
     *
     * @return confirm方法名稱
     */
    String confirmMethod() default "";

    /**
     * tcc框架確認方法 tcc中第二個c.
     *
     * @return cancel方法名稱
     */
    String cancelMethod() default "";

    /**
     * 模式 tcc 和cc模式.
     * tcc模式代表try中有資料庫操作,try需要回滾.
     * cc模式代表try中無資料庫操作,try不需要回滾.
     *
     * @return {@linkplain TccPatternEnum}
     */
    TccPatternEnum pattern() default TccPatternEnum.TCC;

}

該註解通過類AbstractTccTransactionAspect的子類SpringCloudHmilyTransactionAspect攔截,切點是該註解,環繞執行的是TccTransactionInterceptor實現類的interceptor方法,當方法上有tcc註解時,都會執行該切面攔截器方法,

@Aspect
public abstract class AbstractTccTransactionAspect {

    private TccTransactionInterceptor tccTransactionInterceptor;

    protected void setTccTransactionInterceptor(final TccTransactionInterceptor tccTransactionInterceptor) {
        this.tccTransactionInterceptor = tccTransactionInterceptor;
    }

    /**
     * this is point cut with {@linkplain com.hmily.tcc.annotation.Tcc }.
     */
    @Pointcut("@annotation(com.hmily.tcc.annotation.Tcc)")
    public void hmilyTccInterceptor() {
    }

    /**
     * this is around in {@linkplain com.hmily.tcc.annotation.Tcc }.
     * @param proceedingJoinPoint proceedingJoinPoint
     * @return Object
     * @throws Throwable  Throwable
     */
    @Around("hmilyTccInterceptor()")
    public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
    }

    /**
     * spring Order.
     *
     * @return int
     */
    public abstract int getOrder();
}

子類會重寫setTccTransactionInterceptor方法,程式啟動就注入了SpringCloudHmilyTransactionInterceptor

接而進入到SpringCloudHmilyTransactionInterceptor的interceptor方法,該方法內定義TccTransactionContext物件,該物件定義了事務上下文,用來儲存本地事務,包含事務Id,事務階段,事務參與者的角色

  • try階段

try 階段,也就是本地事務階段,還未發起遠端呼叫,AOP攔截到@TCC註解,此時TccTransactionContext事務上下文為空

 @Override
    public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
        TccTransactionContext tccTransactionContext;
        //如果不是本地反射呼叫補償
        RequestAttributes requestAttributes = null;
        try {
            requestAttributes = RequestContextHolder.currentRequestAttributes();
        } catch (Throwable ex) {
            LogUtil.warn(LOGGER, () -> "can not acquire request info:" + ex.getLocalizedMessage());
        }

        HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
        String context = request == null ? null : request.getHeader(CommonConstant.TCC_TRANSACTION_CONTEXT);
        if (StringUtils.isNoneBlank(context)) {
            tccTransactionContext = GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
        } else {
            tccTransactionContext = TransactionContextLocal.getInstance().get();
            if (Objects.nonNull(tccTransactionContext)) {
                tccTransactionContext.setRole(TccRoleEnum.SPRING_CLOUD.getCode());
            }
        }
        return hmilyTransactionAspectService.invoke(tccTransactionContext, pjp);
    }

事務攔截器首先會獲取當前請求,從請求頭裡獲取事務上下文後和切面引數一起進入事務service切面的invoke方法,關於事務上下文header請看HmilyRestTemplateInterceptor類

 @Override
    public Object invoke(final TccTransactionContext tccTransactionContext, final ProceedingJoinPoint point) throws Throwable {
        final Class clazz = hmilyTransactionFactoryService.factoryOf(tccTransactionContext);
        final HmilyTransactionHandler txTransactionHandler = (HmilyTransactionHandler) SpringBeanUtils.getInstance().getBean(clazz);
        return txTransactionHandler.handler(point, tccTransactionContext);
    }

該方法會根據事務上下文獲取當前事務處於發起者還是參與者階段,不同階段返回不同的HmilyTransactionHandler子類,它由3實現類分別是發起者事務處理,參與者事務處理和本地事務處理,一般最常用的是發起者和參與者

由上述debug可知,在發起階段由SpringCloudHmilyTransactionInterceptor的invoke方法傳遞過來的事務上下文是null,所以是發起者,begin是初始化事務,當point proceed執行完後,更新事務狀態為try階段,並更新事務日誌.

該方法維護了一個TransactionContextLocal,用來儲存TccTransactionContext事務上下文,內部是threadlocal,可以將事務上下文繫結到當前執行緒裡.

如果出了異常,則在StarterHmilyTransactionHandler#handler方法catch塊執行cancel方法,如果沒有異常,就執行confirm方法確認tcc事務完成

@Override
    public Object handler(final ProceedingJoinPoint point, TccTransactionContext context)
            throws Throwable {
        Object returnValue = null;
        try {
            TccTransaction tccTransaction;
            context = TransactionContextLocal.getInstance().get();
            if (context == null) {
                tccTransaction = hmilyTransactionExecutor.begin(point);
                try {
                    //execute try
                    returnValue = point.proceed();
                    tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
                    hmilyTransactionExecutor.updateStatus(tccTransaction);
                } catch (Throwable throwable) {
                    //if exception ,execute cancel
                    hmilyTransactionExecutor
                            .cancel(hmilyTransactionExecutor.getCurrentTransaction());
                    throw throwable;
                }
                //execute confirm
                hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
            } else if (context.getAction() == TccActionEnum.CONFIRMING.getCode()) {
                //execute confirm
                hmilyTransactionExecutor.confirm(hmilyTransactionExecutor.getCurrentTransaction());
            }

        } finally {
            hmilyTransactionExecutor.remove();
        }
        return returnValue;
    }

當發起者執行ponit.proceed完畢意味著包括了多個參與者的tcc方法也一起執行完畢了.

接下來是消費者的方法,一樣攔截tcc註解和上面程式碼一樣走

在這裡返回的是消費者的處理器

接著放行執行程式碼feign,進入feign的攔截器,由於SpringCloudHmilyTransactionAspect切面設定了最高階的排序所以會比Feign攔截器切面優先執行,下面來看HmilyRestTemplateConfiguration自定義的feign攔截器

第60行程式碼就是我們上面說的從請求通過"事務上下文header"獲取事務的來原始碼,會進入HmilyRestTemplateInterceptor#apply方法.

當在try階段參與者出現超時等異常會進入70行程式碼進行回滾操作

接下里是參與者的方法,首先會判斷當前事務處於哪個階段,如果在try階段,那麼開始把參與者加入到當前事務上下文裡,並更新事務狀態,如果當前參與者事務try階段異常,就刪除事務,如果是confirm階段,就執行confirm,如果是cancel階段就執行cancel

 @Override
    public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context) throws Throwable {
        TccTransaction tccTransaction = null;
        TccTransaction currentTransaction;
        switch (TccActionEnum.acquireByCode(context.getAction())) {
            case TRYING:
                try {
                    tccTransaction = hmilyTransactionExecutor.beginParticipant(context, point);
                    final Object proceed = point.proceed();
                    tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
                    //update log status to try
                    hmilyTransactionExecutor.updateStatus(tccTransaction);
                    return proceed;
                } catch (Throwable throwable) {
                    //if exception ,delete log.
                    hmilyTransactionExecutor.deleteTransaction(tccTransaction);
                    assert tccTransaction != null;
                    TccTransactionCacheManager.getInstance().removeByKey(tccTransaction.getTransId());
                    throw throwable;
                }
            case CONFIRMING:
                currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
                hmilyTransactionExecutor.confirm(currentTransaction);
                break;
            case CANCELING:
                currentTransaction = TccTransactionCacheManager.getInstance().getTccTransaction(context.getTransId());
                hmilyTransactionExecutor.cancel(currentTransaction);
                break;
            default:
                break;
        }
        Method method = ((MethodSignature) (point.getSignature())).getMethod();
        return DefaultValueUtils.getDefaultValue(method.getReturnType());
    }

總結:

  1. 框架的執行核心就是AOP,所有TCC操作都是經過AOP實現的,發起者的AOP環繞前結束時,所有的發起者對應的tcc方法都已經執行完畢
  2. 每個TCC註解的方法,都要實現一個對應的confirm方法和cancel方法
  3. 不管是事務的發起者還是參與者,都是一個階段都執行完畢了才會進行到下一個階段,也就是一起pretry,一起try,一起confirm或一起cancel
  4. 對於我們專案裡,一般來說service方法執行完畢了,整個業務流程就結束了,但是這個框架不一樣,service方法執行完畢只能程式碼try階段執行完畢,真正的業務流程結束在confirm裡,所以一定要定義好這兩個方法
  5. tcc框架有個缺點,就是每個涉及到事務的方法都要實現它的confirm和cancel方法