如何優雅的解決,事務中傳送MQ接收處理導致的一致性問題
背景描述:工作中經常發現,有同事(初中高階都有出現)在事務中傳送MQ,然後消費進行處理
有的人是自己定義了一個方法,沒有加事務,當時被別人寫的方法呼叫了,所以事務傳播了(Spring的事務傳播特性,預設下是會傳播當前事務的)
首先暫且不考慮傳送MQ失敗和MQ成功,當前事務歸滾導致的不一致性問題 (這個大家都能理解,也會去考慮)
這次主要講解成功提交事務所帶來的不一致的場景
首先,我們參考圖例回顧一下場景
可以看出來 MethodC 最終查出來會概率性的為空,原因就不用介紹了,除非你連資料庫的隔離級別都不清楚
如果避免這個問題了
-
靠人力水平來避免,Review
首先我們能想到的是,不應該在事務裡面發MQ,因為他會帶來異常回滾的不一致性,但是不是每一個業務都那麼重要,人員的水平也是參差不齊,如果想這樣去做,需要提高編碼人員的技術水平,及CodeReview的級別,這確實是一個解決問題的辦法,但是不夠好,成本太高
-
靠系統架構設計來避免
一開始就把架構設計好,詳細設計各種由架構師來定好,包括資料的各種扭轉,各種的異常,重要的專案可以這樣幹,但是一個公司,90%的專案都是不那麼重要的,多大的廠都是如此,根本不可能有這麼高的成本投入。
-
利用技術手段來避免
怎麼利用技術手段來避免這個問題
引入TransactionSynchronizationManager,增加事務後置處理。
例如:
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { // message queue send todo } });
當然使用TransactionSynchronizationManager.registerSynchronization之前別忘記了增加isActive的判斷,很重要,你怎麼避免內部方法呼叫沒有加@Transaction的問題呢
所以:
boolean isActive = TransactionSynchronizationManager.isSynchronizationActive(); if (isActive) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { // message queue send todo } }); } else { // message queue send todo }
這樣實現很明顯,如果你對原有程式碼進行重構,你的工作量是非常大的,而且這些程式碼看起來重複量很大。你每發個MQ都得這樣寫,所以我們為什麼不使用AOP無侵入的增加這樣一段程式碼在每次傳送MQ之前呢???
優雅的實現
使用Spring AOP 對MQ傳送進行切面處理,在傳送的前後增加事務的後置處理,那麼已經加了後置處理的會出現問題嗎,也就是二層Post會有問題嗎?並不會
# 如果這裡在事務中 isActive=true
boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
if (isActive) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
# 當前已經沒有在事務了,所以isActive=false
boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
if (isActive) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// message queue send todo
}
});
} else {
// message queue send todo
}
}
});
} else {
// message queue send todo
}
以下是AOP怎麼實現切面攔截的具體程式碼,RabbitMQ為例
/**
* 支援的方法,必須是無返回值的
*/
private static final List<String> SUPPORT_METHODS = Lists.newArrayList("convertAndSend", "send");
@Around("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.*(..))")
public Object sendAround(ProceedingJoinPoint joinPoint) throws Throwable {
String name = joinPoint.getSignature().getName();
if (SUPPORT_METHODS.contains(name)) {
boolean isActive = TransactionSynchronizationManager.isSynchronizationActive();
if (isActive) {
final ProceedingJoinPoint joinPoint1 = joinPoint;
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
try {
joinPoint1.proceed();
} catch (Throwable e) {
log.error("傳送MQ出現異常: {}", e.getMessage(), e);
}
}
});
return null;
}
return joinPoint.proceed();
}
return joinPoint.proceed();
}
謝謝閱讀!