1. 程式人生 > 其它 >如何優雅的解決,事務中傳送MQ接收處理導致的一致性問題

如何優雅的解決,事務中傳送MQ接收處理導致的一致性問題

背景描述:工作中經常發現,有同事(初中高階都有出現)在事務中傳送MQ,然後消費進行處理

有的人是自己定義了一個方法,沒有加事務,當時被別人寫的方法呼叫了,所以事務傳播了(Spring的事務傳播特性,預設下是會傳播當前事務的)

首先暫且不考慮傳送MQ失敗和MQ成功,當前事務歸滾導致的不一致性問題 (這個大家都能理解,也會去考慮)

這次主要講解成功提交事務所帶來的不一致的場景

首先,我們參考圖例回顧一下場景

可以看出來 MethodC 最終查出來會概率性的為空,原因就不用介紹了,除非你連資料庫的隔離級別都不清楚

如果避免這個問題了

  1. 靠人力水平來避免,Review

    首先我們能想到的是,不應該在事務裡面發MQ,因為他會帶來異常回滾的不一致性,但是不是每一個業務都那麼重要,人員的水平也是參差不齊,如果想這樣去做,需要提高編碼人員的技術水平,及CodeReview的級別,這確實是一個解決問題的辦法,但是不夠好,成本太高

  2. 靠系統架構設計來避免

    一開始就把架構設計好,詳細設計各種由架構師來定好,包括資料的各種扭轉,各種的異常,重要的專案可以這樣幹,但是一個公司,90%的專案都是不那麼重要的,多大的廠都是如此,根本不可能有這麼高的成本投入。

  3. 利用技術手段來避免

怎麼利用技術手段來避免這個問題

引入TransactionSynchronizationManager,增加事務後置處理。

例如:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    @Override
    public void afterCommit() {
        // message queue send todo
    }
});

當然使用TransactionSynchronizationManager.registerSynchronization之前別忘記了增加isActive的判斷,很重要,你怎麼避免內部方法呼叫沒有加@Transaction的問題呢

所以:

boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
 if (isActive) {
     TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    @Override
    public void afterCommit() {
        // message queue send todo
    }
});
 } else {
        // message queue send todo
 }

這樣實現很明顯,如果你對原有程式碼進行重構,你的工作量是非常大的,而且這些程式碼看起來重複量很大。你每發個MQ都得這樣寫,所以我們為什麼不使用AOP無侵入的增加這樣一段程式碼在每次傳送MQ之前呢???

優雅的實現

使用Spring AOP 對MQ傳送進行切面處理,在傳送的前後增加事務的後置處理,那麼已經加了後置處理的會出現問題嗎,也就是二層Post會有問題嗎?並不會

# 如果這裡在事務中 isActive=true
boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
if (isActive) {
    TransactionSynchronizationManager.registerSynchronization(
        new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                # 當前已經沒有在事務了,所以isActive=false
                boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
                if (isActive) {
                    TransactionSynchronizationManager.registerSynchronization(
                        new TransactionSynchronizationAdapter() {
                            @Override
                            public void afterCommit() {
                                // message queue send todo
                            }
                        });
                } else {
                    // message queue send todo
                }
            }
        });
} else {
    // message queue send todo
}

以下是AOP怎麼實現切面攔截的具體程式碼,RabbitMQ為例

/**
 * 支援的方法,必須是無返回值的
 */
private static final List<String> SUPPORT_METHODS = Lists.newArrayList("convertAndSend", "send");

@Around("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.*(..))")
public Object sendAround(ProceedingJoinPoint joinPoint) throws Throwable {
  String name = joinPoint.getSignature().getName();
  if (SUPPORT_METHODS.contains(name)) {
    boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
    if (isActive) {
      final ProceedingJoinPoint joinPoint1 = joinPoint;
      TransactionSynchronizationManager.registerSynchronization(
          new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
              try {
                joinPoint1.proceed();
              } catch (Throwable e) {
                log.error("傳送MQ出現異常: {}", e.getMessage(), e);
              }
            }
          });
      return null;
    }
    return joinPoint.proceed();
  }
  return joinPoint.proceed();
}

謝謝閱讀!