藉助訊息佇列解決分散式事務
先介紹一下RabbitMQ的基本概念
核心概念
Queue:真正儲存資料的地方
Exchange接受請求,轉存資料
Bind:收到請求後儲存到哪裡
訊息生產者:傳送資料的應用
訊息消費者:取出資料處理的應用
Bind的幾種分發規則:Direct、Topic、Fanout
Fanout:與該Exchange繫結的Queue都發一份資料
Direct:給完全匹配的Queue傳送資料,routingkey與bindkey完全一致
Topic:給模糊匹配的Queue傳送資料,binding key中可以存在兩種特殊字元“”與“#”,用於做模糊匹配,其中“”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
工作流程
生產者將資料通過RabbitMQ-client傳送到RabbitMQ-server中的exchange,exchange根據路由配置,分發給Queue,消費者從Queue拿到資料
分散式事務的產生
多個系統相互配合工作,產生資料一致性問題。
例如外賣場景中,下單中心,運單中心兩個系統要配合工作,必須保證兩個系統資料一致性。
錯誤的解決方案:
使用API介面呼叫,下單中心插入資料,呼叫運單中心的API介面處理資料,並啟動事務回滾。
咋一看這場景沒有什麼問題,畢竟有事務回滾,一起成功一起失敗,但其實存在API呼叫超時的情況,此時下單中心以為呼叫失敗回滾,而運單中心只是超時仍會繼續執行程式,從而造成兩個系統資料不一致。
假設API呼叫成功,也有可能是在訂單中心提交事務時失敗了,此時訂單中心回滾,而API已經呼叫,下單中心的資料已經產生,資料不一致。
使用訊息佇列解決分散式事務
問題的核心就是保證可靠生產與可靠消費
可靠生產:下單中心處理資料和狀態表更改應該保證事務一致
。生產者往訊息佇列傳送資料時,在本地建立一張狀態表,看是否成功傳送給佇列。利用RabbitMQ的確認機制看是否重發還是定時掃描狀態表重發,保證可靠生產。兜底方案還是定時掃描狀態表。
程式碼
#開啟mq的訊息回執確認機制
spring.rabbitmq.publisher-confirms=true
@PostConstruct //回撥函式 public void setUp(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //ack為true 代表傳送成功 if(!ack){ //傳送失敗 System.out.println(cause); return; } System.out.println("傳送成功"); //真實應該在本地記錄表中更改為已傳送標誌 } }); }
可靠消費:消費端開啟手動ACK,給MQ佇列傳送確認資訊。對每條資料進行記錄,一旦有異常記錄可以試著去重試重新要求MQ再發資料,但不能重試太多次。可以將資料主鍵插入資料庫,這樣同一資料就不會執行兩次,或者使用redis記錄資料操作。
程式碼
#開啟消費端手動確認機制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = "hello")
public void process(String message, Channel channel , @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("Receiver1 : " + message);
//保證冪等性 防止重複資料重複處理
//回覆MQ 已經接受到資料一切正常
channel.basicAck(tag,false);
} catch (IOException e) {
//出現異常,通知MQ重發資料,但要記錄下資料的異常處理次數
//對於錯誤資料另外處理,人工干預
channel.basicNack(tag,false,false);
}
}
優缺點
- 通用性強
- 擴充套件性強
缺點
- 適合非同步場景
- 處理有延遲,業務上需要適應