噓!非同步事件這樣用真的好麼?
阿新 • • 發佈:2020-06-29
## 故事背景
今年年初的時候寫了一篇文章 [《圍觀:基於事件機制的內部解耦之心路歷程》](https://mp.weixin.qq.com/s/kfrTWkIVMSDjEpMR54pMlQ)。這篇文章主要講的是用 ES 資料異構的場景。程式訂閱 Mysql Binlog 的變更,然後程式內部使用 Spring Event 來分發具體的事件,因為一個表的資料變更可能會需要更新多個 ES 索引。
為了方便大家理解我把之前方案的圖片複製過來了,如下:
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200629130844531-1729158862.png)
上圖的方案存在一個問題,就是我們今天文章要聊的內容。
這個問題就是當 MQ Consumer 收到訊息後,就直接釋出 Event 了,如果是同步的,沒有問題。如果某個 EventListener 中處理失敗了,那麼這條訊息將不會 ACK。
如果是非同步釋出 Event 的場景,釋出完訊息馬上就 ACK 了。就算某個 EventListener 中處理失敗了,MQ 也感知不到,不會進行訊息的重新投遞,這就是存在的問題。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200629130853577-1904402026.png)
## 解決方案
### 方案一
既然訊息已經 ACK 了,那就不利用 MQ 的重試功能了,使用方自己重試是不是也可以呢?
可肯定是可以的,內部處理是否成功肯定是可以知道的,如果處理失敗了可以預設重試,或者有一定策略的重試。實在不行還可以落庫,儲存記錄。
這樣的問題在於太煩了呀,每個使用的地方都要去做這件事情,而且對於未來接手你程式碼的程式小哥哥來說,這很有可能讓小哥哥頭髮慢慢脫落啊。。。。
脫落不要緊,關鍵他還不知道要做這個處理,說不定哪天就背鍋了,慘兮兮。。。。
### 方案二
要保證訊息和業務處理的一致性,就不能立馬進行 ACK 操作。而是要等業務處理完成後再決定是否要 ACK。
如果有處理失敗的就不應該 ACK,這樣就能複用 MQ 的重試機制了。
分析下來,這就是一個典型的非同步轉同步的場景。像 Dubbo 中也有這個場景,所以我們可以借鑑 Dubbo 中的實現思路。
建立一個 DefaultFuture 用於同步等待獲取任務執行結果。然後在 MQ 消費的地方使用 DefaultFuture。
```plain
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")
public class DataChangeConsume implements RocketMQ