1. 程式人生 > 實用技巧 >RabbitMQ 結合業務實現訊息確認

RabbitMQ 結合業務實現訊息確認

@

目錄

嘮個嗑

網路上搜羅了多次想知道 RabbitMQ 現實業務種怎麼實現訊息的可靠性的,但是大多都不太理想,站在各位大佬巨人的肩膀上研究了一段時間,我也整理了一套簡單可行性的方案,包括訊息異常處理。這篇文章想主要講一些業務處理方案,專案中加入 RabbitMQ 中介軟體很簡單,但是根據具體業務實現訊息的可靠性,這個需要多加考慮。當然下面也會通過測試程式碼來分析,文末也會附上原始碼地址。

1、準備

1.1、環境準備

之前部落格上寫過一篇編譯安裝的方法

地址,大家可以參考,因為 RabbitMQ 底層語言的原因可能稍微麻煩點,那就沒有辦法了嗎?如果你是先搞測試,再在專案中使用的話,那可以使用 docker 安裝,2 行程式碼,如下

docker pull rabbitmq

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

1.2、理論準備

RabbitMQ 的型別包括:direct、topic、fanout、headers、system(翻原始碼看到的)

這裡主要通過 topic 來分析,bindingkey 可以通過萬用字元 # 和 * 來匹配多個 路由鍵 (routingKey),
bindingkey 是繫結交換機(exchange)和佇列(queue)的, 生產者(publisher)發訊息的時候會攜帶 routingKey、exchange 和 訊息傳送給 RabbitMQ,
連線成功後實際是元件 exchange 接收了生產者的訊息,然後通過 bindingkey 匹配 routingKey,決定送給哪個 queue,每個消費者都會有 queue,所以 queue接收到訊息 後就可以確保消費者接可以收到訊息了,最後消費者再消費。

再詳細的內容可以檢視大佬 erlie 的總結 地址

2、訊息確認

RabbitMQ 基礎配置

pom

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.2.5.RELEASE</version>
	<relativePath/> <!-- lookup parent from repository -->
</parent>
	
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.1.105
    port: 5672
    username: guest
    password: guest
    virtual-host: /

# 交換機、佇列和繫結鍵宣告
test:
  exchange: test.exchange
  one:
    queue: one.test.queue
    binding-key: one.test.key

consumer

@RabbitListener(bindings=@QueueBinding(
                //配置交換器
                exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                //配置路由鍵
                key="${test.one.binding-key}",
                //配置佇列名稱
                value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(String msg) {
    log.info("test 收到的訊息為:[{}]", msg);
    //業務程式碼...
}

publisher

@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${test.exchange}")
private String exchange;
@Value("${test.one.binding-key}")
private String routingKey;

public void test() {
    rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");
}

通過上面預設的配置基本上就可以使用 RabbitMQ 了,但是這不是本篇的重點.我們要知道訊息傳送和到消費的過程中出現問題怎麼辦?這就需要我們分段確認訊息是否接收成功,如果失敗了該如何處理,先想想可以怎麼做。
我們先捋一下思路,訊息傳送給 RabbitMQ ,如果連線 RabbitMQ 失敗,則記錄該訊息,如果連線成功但是 exchange 接收失敗則記錄下該訊息,如果 exchange 接收成功但是 queue 接收 exchange 的訊息失敗則記錄下該訊息,訊息從生產者到 queue 有 3 個位置可能因網路抖動或其他原因出現問題,那我們在這三個位置記錄下問題後,統一通過計劃定時獲取記錄的訊息並且重新發送,如果重發三次還沒有成功則標記該記錄為異常訊息。

2.1、生產者訊息確認和回撥

這裡分兩步:

  1. 訊息確認指的是 RabbitMQ(exchange) 確認接收到了消費者傳送的訊息
  2. 訊息回撥指的是 queue 接收 exchange 的訊息失敗,則回撥告訴 RabbitMQ 失敗的訊息

2.1.1、訊息確認

開啟配置

spring:
  rabbitmq:
    #publisher-confirms: true #已過時
    publisher-confirm-type: correlated #開啟生產者訊息確認

還有另外 2 種模式:

  1. none 值是禁用釋出確認模式,是預設值;
  2. simple 值經測試有兩種效果,其一效果和 correlated 值一樣會觸發回撥方法,其二在釋出訊息成功後使用 rabbitTemplate 呼叫 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節點返回傳送結果,根據返回結果來判定下一步的邏輯,要注意的點是 waitForConfirmsOrDie 方法如果返回false則會關閉 channel,則接下來無法傳送訊息到 broker。

當然還需新建一個類實現 RabbitTemplate.ConfirmCallback,重寫方法 confirm,該方法有三個引數 correlationData、ack、cause,主要說下 ack,值為 true 表示 exchange 成功接收到訊息,false 表示 exchange 接收訊息失敗,這裡 2 種結果可以分別處理,比如 false 可以把接收失敗的訊息入庫,然後通過定時器來處理,比較懂的同學現在可能就有疑問了,correlationData 只能得到 msgId,根本沒有具體的訊息,這裡可以發揮你出色的想象力,可以通過物件封裝得到,也可以通過存記憶體或者磁碟儲存得到,方法總比困難多。

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   String msgId = correlationData.getId();
   if (ack) {
       log.info("成功傳送給 mq, msgId:[{}]", msgId);
   } else {
       log.error("傳送給 mq 失敗, msgId:[{}], 失敗理由cause:[{}]", msgId, cause);
       //訊息從生產者沒有到 exchange,那存庫
       saveToDB(msgId,...);
   }
}

這裡還差一步,就是原生的 rabbitTemplate 怎麼知道訊息確認時使用剛建的類呢,注入即可
rabbitTemplate.setConfirmCallback(剛建的類);如有疑問可看文末原始碼。

2.1.2、訊息回撥

spring:
  rabbitmq:
    publisher-returns: true #開啟生產者訊息回撥

同上,需要新建類並實現 RabbitTemplate.ReturnCallback,並且重寫方法 returnedMessage, 最後需要注入如下內容

rabbitTemplate.setReturnCallback(剛建的類);
// 要想使 returnCallback 生效,必須設定為true
rabbitTemplate.setMandatory(true);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    String msg = new String(message.getBody());
    log.error("訊息回撥 msgId:[{}], msg:[{}] 不能被正確路由,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);
    //訊息從 exchange 沒有到 queue, 那存庫
    saveToDB(msgId, exchange, routingKey, msg);
}

如果訊息傳送時走到了回撥方法 returnedMessage 中,說明目前的訊息有問題是需要處理的,同上,入庫。定時器來處理。

當然訊息的傳送方法 rabbitTemplate.convertAndSend() 會多一個引數 correlationData

具體處理方法可以參考原始碼,這裡只提供思路。

2.2、消費者訊息確認

消費者訊息確認是確認消費了佇列中的訊息,如果出現問題 RabbitMQ 會有重試機制,長時間失敗則需要人工干預,這個和生產者的確認是先後關係,實際是沒有關聯關係的,說到這,說下我之前轉的牛角尖,一直想尋找 exchange 如何確認消費者成功消費訊息,但是無果,後來細想,RabbitMQ 應該設計的就是消費者和 queue 互動,沒必要和 exchange 互動。如有大佬知道 exchange 如何 ack 消費者消費訊息可以告訴小弟,不勝感激。
圓規正轉,上消費者程式碼

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #開啟消費者訊息確認; none:自動確認、auto:根據情況確認
@RabbitListener(bindings=@QueueBinding(
                    //配置交換器
                    exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                    //配置路由鍵
                    key="${test.one.binding-key}",
                    //配置佇列名稱
                    value=@Queue(value="${test.one.queue}",autoDelete="true")
))
    public void test(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("test 收到的訊息為:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
        try {
            //業務處理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

主要說 3 個方法:

1、basicAck 是確認訊息,需要傳遞兩個引數

  1. deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 註冊後,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送訊息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條訊息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的範圍僅限於 Channel
  2. multiple:為了減少網路流量,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有訊息,false則只確認傳入值等於 delivery_tag 的訊息

2、basicNack 是拒絕訊息,可以拒絕多條,比 basicAck 多一個布林值的引數,如果為 true,被 nack 後重新入佇列然後重新消費消費;如果為 false 被 nack 就丟了。

3、basicReject 只能拒絕一條訊息,reject 後訊息直接丟了。

總結

這裡簡單實現 RabbitMQ 訊息可靠的方式是通過把訊息傳送時出現問題後直接入庫,然後通過計劃定時查詢再重新發送給 RabbitMQ,如果 exchange 成功 ack 後則標記為重發成功,如果重發 3 次還是失敗則標記異常,需要人工處理。

討論

這種處理其實不算是最優的方案,技術上還可以有如下方案

  1. 生產者發訊息時記錄該條訊息,並設該記錄 1 分鐘後生效,留 1 分鐘給exchange 確認並直接標記該訊息記錄為成功,然後計劃任務定時掃有效且未確認的訊息,再發送給 RabbitMQ ,如果確認後則標記為成功,否則 3 次後標記為失敗。
  2. 還和小編寫的方案類似,記錄失敗的訊息,但是定時任務獲取到失敗的訊息後,直接呼叫消費者的服務處理,不通過 RabbitMQ, 但是這就需要維護訊息和消費者服務的關係了,稍微複雜些。

最後希望可以幫到看官,如果記錄的不對煩請評論指出,一起討論

https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo