1. 程式人生 > 程式設計 >RbbitMQ(五) -- 100%訊息投遞消費

RbbitMQ(五) -- 100%訊息投遞消費

一:前言概述

生產者生產訊息到消費者訊息消費,中間需要生產者將訊息傳送到交換器,再由交換器路由到佇列儲存,然後消費者進行訊息消費。在沒有任何設定情況下,中間可能存在以下幾種情況導致訊息丟失:

在這裡插入圖片描述

  1. 消費者將訊息傳送到交換器因為RabbitMQ內部原因丟失訊息
  2. 交換器將訊息路由到佇列,因為佇列不存在等因素導致訊息丟失
  3. 佇列中儲存的訊息在消費者未消費時RabbitMQ服務宕機導致訊息丟失
  4. 消費者消費訊息時消費者宕機未處理完訊息導致訊息丟失

針對上述情況,本文將根據每個節點講述如何操作確保訊息投遞的可靠性,同時在保障可靠性的情況下可能會引發系列如訊息重複等問題,也是本文將會涉及到的重點

二:事務TX

理解資料庫的事務就是將多次操作原子化,統一提交回滾實現資料一致性。RabbitMQ事務可能與之前資料庫等事務有一定差別,當然也是支援訊息的提交與回滾操作。事務是解決生產者傳送訊息到交換器訊息丟失問題的方案之一

2.1 相關操作

RabbitMQ中的事務實現有如下兩個個主要步驟:

在這裡插入圖片描述

  1. 將通訊的通道設定為事務模式
  2. 事務提交/事務回滾
2.2 程式碼示例
        // 將通道設定為事務模式
        channel.txSelect();
        // 傳送訊息
        try {
            channel.basicPublish(bindingKey,bindingKey,false
,null,messgae.getBytes("UTF-8")); // 提交事務 channel.txCommit(); }catch (Exception e){ // 異常回滾事務,發生異常的時候可以嘗試重發或者記錄等操作 channel.txRollback(); } 複製程式碼
2.3 總結

事務機制的確認需要等待上一條訊息傳送完畢反饋之後才能進行第二條訊息傳送,這樣的操作將會對於使用RabbitMQ而言感覺就是暴殄天物。如果是想要傳送多條訊息只能迴圈操作,但是注意如果沒有將channel通道設定為事務模式不能進行事務操作

,不然會丟擲異常。最後一點就是通道設定為事務模式只需要操作一次即可

三:確認Confirm

事務機制對於RabbitMQ效能消耗是災難性的,針對生產者到交換器訊息丟失處理提出了全新的輕量級處理方式。傳送發確認機制confirm,該操作編碼上會有很多地方都在講什麼等待確認、批量確認、非同步確認。一切為了生產,所以本文將只會介紹生產用的非同步確認方式

3.1 相關操作

RabbitMQ的生產傳送確認實現由以下三個部分構成:

  • 將通道channel設定為確認模式
  • 增加確認監聽Listener
  • 處理監聽結果
3.2 程式碼示例
  • 每個通道傳送到RabbitMQ的Broker中都會有唯一的編碼
  • 生產端最好使用有序佇列儲存傳送的訊息,方便確認後的刪除
  • 建立Channel通道時可以指定唯一編碼,標識該通道
        // 設定confirm訊息傳送確認機制
        channel.confirmSelect();
        // 增加確認機制監聽器
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 成功確認
             * deliveryTag 表示訊息的唯一標識
             * multiple 本次確認是否為批量操作
             */
            @Override
            public void handleAck (long deliveryTag,boolean multiple) throws IOException {
                // 刪除有序集合中的訊息
                if (! multiple){
                    // 根據座標刪除訊息
                }else {
                    // 批量刪除訊息
                }
            }
    
            /**
             * 失敗確認
             * deliveryTag 表示訊息的唯一標識
             * multiple 本次確認是否為批量操作
             */
            @Override
            public void handleNack (long deliveryTag,boolean multiple) throws IOException {
                // 可以根據deliveryTag做重試操作等
            }
        });
複製程式碼
3.3 注意事項

在這裡插入圖片描述

  • 共存:事務與確認機制不能共存,不然會異常
  • 查驗:通過RabbitMQ視覺化監控介面可看到Channels欄Mod屬性T表示事務,C表示監聽確認
  • 有序:因為RabbitMQ生成的序列deliveryTag是由小到大自動遞增的,所以最好儲存訊息的時候考慮到順序性,更方便通過deliveryTag定位到訊息進行操作

四:Mandatory

交換器不儲存訊息,所有訊息都要路由到佇列儲存。如果中間過程訊息丟失,對於生產者而言不設定的情況下是無法知曉的錯誤。Mandatory實現與Confirm實現類似,通過增加監控監聽Listener實現。前面文章訊息與佇列進階詳細描述過這個引數監聽

4.1 相關操作

當訊息到達交換器,但是沒有匹配佇列路由儲存時。若通過Mandatory實現監聽處理則需要如下幾個處理過程:

  1. 傳送訊息basicPublish()時將設定mandatory引數為true
  2. 為通道channel增加MandatoryListener監聽
4.2 程式碼示例
  • 首先可以看到因為bindingKey與routingKey不一致,訊息不能路由到佇列
  • 然後可以看到傳送訊息時將mandatory引數設定為true表示增加mandatory監聽
  • 最後可以看到在通道上增加了ReturnLisrtener監聽,取得未路由訊息相關引數資訊
    在這裡插入圖片描述
4.3 引數詳解

ReturnListener中僅僅包含唯一方法handleReturn(),該方法中含有系列引數,引數含義如下表所示:

引數 描述
replyCode 表示本次返回訊息原因編碼,如312訊息未路由
replyText 表示本次返回訊息原因描述
exchange 監聽器接收到返回訊息的交換器
routingKey 本次訊息傳送的路由鍵
properties 本次監聽到返回訊息的屬性設定
body 監聽到返回訊息的訊息體

五:備用交換器

Mandatory增加的ReturnListener監聽需要在傳送訊息程式碼中增加邏輯,這對於追求功能專一性而言不是好訊息。通過RabbitMQ也根本檢測不到這段邏輯,也不利於後續程式碼維護。所以提出備用交換器,建立交換器時繫結,當交換器訊息未找到路由佇列時訊息將轉發到備用交換器

5.1 相關操作

備用交換器其原理類似於交換器與交換器繫結,需要注意以下幾點:

  1. 建立交換器時使用Map引數繫結備用交換器
  2. 備用交換器接收路由到的訊息不會更改任何屬性,包括routingKey
  3. 可以將備用交換器設定為內建交換器
5.2 程式碼示例

通過引數Map繫結備用交換器,驗證效果將訊息傳送路由鍵routingKey設定為備用交換器路由鍵。可以檢視備用交換器建立時的第五個引數,上面也提到最優設定為內建交換器,屬性internal

在這裡插入圖片描述

5.3 測試結果

最後顯示備用交換器中有一條訊息,證明結果的正確性。備用交換器可以用作訊息不能正確路由時的一種解決方案

在這裡插入圖片描述

六:佇列與訊息持久化

這個問題在前面相關佇列與佇列訊息的文章中已經詳細講解,為了整個訊息投遞可靠性的完整,這裡再次描述一下佇列與佇列訊息的持久化。注意以點:

單獨的佇列訊息持久化並不能實現訊息持久化,同理單獨的佇列持久化也不能實現訊息持久化。需要佇列與佇列訊息同時持久化方可

6.1 佇列持久化

持久化即將佇列資訊寫入磁碟持久化儲存,當RabbitMQ應用服務故障宕機重啟時可以自動進行資料恢復的操作稱之為佇列持久化。實現只需要在建立佇列時將持久化引數設定為true即可,如下所示:

在這裡插入圖片描述
進入RabbitMQ應用的WEB頁面控制檯檢視該佇列標誌D,表示持久化。如下所示:
在這裡插入圖片描述

6.2 佇列訊息持久化

將RabbitMQ服務應用重啟,發現佇列恢復,但是佇列中訊息資料並未恢復。因為佇列訊息持久化需要在傳送訊息時進行設定,不然也不會寫入磁碟儲存。程式碼如下所示:

在這裡插入圖片描述

  • 傳送訊息方法引數列表要求傳遞BasicProperties,該類使用建造者模式設計,其中deliveryMod表示訊息持久化。1 預設值不進行持久化,2 將訊息持久化寫入磁碟
  • MessageProperties類封裝常用系列BasicProperties物件,可以直接使用
6.3 總結

佇列持久化 + 佇列訊息持久化 = 完整持久化,持久化對RabbitMQ應用的效能是一種負擔,可以根據資料型別進行範圍資料持久化。如訂單資料、支付資料等等較為重要的資料可以採用持久化的操作儘量避免訊息丟失

七:消費者確認

生產者訊息已經投遞並路由到佇列儲存,當消費者消費時消費應用宕機導致消費邏輯不完整的宕機也是保證訊息百分百投遞消費的關鍵一環。RabbitMQ針對這一點提供消費者確認機制,配置該特性後,當且僅當消費者確認以後RabbitMQ應用才會刪除訊息

講解RabbitMQ消費者確認機制前需要確認預設情況下消費者將自動確認,也就是當訊息從RabbitMQ應用服務取出時將被刪除,這也是誘發訊息丟失的原因。所以為了實現後續手動控制訊息確認的邏輯,消費訊息時就需要將引數autoAck設定為false

在這裡插入圖片描述

7.1 basicAck

訊息確認,引數包含deliveryTag、multiple。作用與生產者確認Confirm一致:

  • deliveryTag:RabbitMQ應用會為每條訊息產生唯一編號,生產者亦或是消費者都需要根據編碼進行相關訊息操作
  • multiple:批量操作,即將編碼小於本次操作編碼的訊息都進行本次一致的操作
7.2 訊息拒絕

訊息拒絕有兩個API,basicReject()與basicNack(),兩者唯一的差距在於前者不能進行multiple的批量操作。兩者共同含有以下兩個引數屬性:

  • deliveryTag:RabbitMQ應用會為每條訊息產生唯一編號,生產者亦或是消費者都需要根據編碼進行相關訊息操作
  • requeue:是否重新放回佇列,這裡拋棄的訊息如果設定了死信轉發,將會被路由到配置的死信交換器