1. 程式人生 > >【RabbitMQ】如何進行訊息可靠投遞【上篇】

【RabbitMQ】如何進行訊息可靠投遞【上篇】

說明

前幾天,突然發生線上報警,釘釘連發了好幾條訊息,一看是RabbitMQ相關的訊息,心頭一緊,難道翻車了?

[橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日誌異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]觸發。
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 產生原因如下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 產生原因如下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:

定睛一看,看樣子像是消費者莫名其妙斷開了連線,正逢公司搬家之際,難道是機房又雙叒叕。。。。斷電了?於是趕緊聯絡了運維,諮詢RabbitMQ是否發生了調整。幾分鐘後,得到了運維的回覆,由於一些不可描述的原因,RabbitMQ進行了重啟,emmmm,雖然重啟只持續了10分鐘,但是導致該叢集下所有消費者都掛了,需要將專案重啟後才能正常進行消費。

專案重啟後,一切似乎又正常運轉起來,但好景不長,沒過多久,工單就找上了門來,經過排查,發現是生產者在RabbitMQ重啟期間訊息投遞失敗,導致訊息丟失,需要手動處理和恢復。

於是,我開始思考,如何才能進行RabbitMQ的訊息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ叢集不可用的時候,無法投遞的訊息該如何處理呢?

可靠投遞

先來說明一個概念,什麼是可靠投遞呢?在RabbitMQ中,一個訊息從生產者傳送到RabbitMQ伺服器,需要經歷這麼幾個步驟:

  1. 生產者準備好需要投遞的訊息。
  2. 生產者與RabbitMQ伺服器建立連線。
  3. 生產者傳送訊息。
  4. RabbitMQ伺服器接收到訊息,並將其路由到指定佇列。
  5. RabbitMQ伺服器發起回撥,告知生產者訊息傳送成功。

所謂可靠投遞,就是確保訊息能夠百分百從生產者傳送到伺服器。

為了避免爭議,補充說明一下,如果沒有設定Mandatory引數,是不需要先路由訊息才發起回撥的,伺服器收到訊息後就會進行回撥確認。

2、3、5步都是通過TCP連線進行互動,有網路呼叫的地方就會有事故,網路波動隨時都有可能發生,不管是內部機房停電,還是外部光纜被切,網路事故無法預測,雖然這些都是小概率事件,但對於訂單等敏感資料處理來說,這些情況下導致訊息丟失都是不可接受的。

RabbitMQ中的訊息可靠投遞

預設情況下,傳送訊息的操作是不會返回任何資訊給生產者的,也就是說,預設情況下生產者是不知道訊息有沒有正確地到達伺服器。

那麼如何解決這個問題呢?

對此,RabbitMQ中有一些相關的解決方案:

  1. 使用事務機制來讓生產者感知訊息被成功投遞到伺服器。
  2. 通過生產者確認機制實現。

在RabbitMQ中,所有確保訊息可靠投遞的機制都會對效能產生一定影響,如使用不當,可能會對吞吐量造成重大影響,只有通過執行效能基準測試,才能在確定性能與可靠投遞之間的平衡。

在使用可靠投遞前,需要先思考以下問題:

  1. 訊息釋出時,保證訊息進入佇列的重要性有多高?
  2. 如果訊息無法進行路由,是否應該將該訊息返回給釋出者?
  3. 如果訊息無法被路由,是否應該將其傳送到其他地方稍後再重新進行路由?
  4. 如果RabbitMQ伺服器崩潰了,是否可以接受訊息丟失?
  5. RabbitMQ在處理新訊息時是否應該確認它已經為釋出者執行了所有請求的路由和持久化?
  6. 訊息釋出者是否可以批量投遞訊息?
  7. 在可靠投遞上是否有可以接受的平衡性?是否可以接受一部分的不可靠性來提升效能?

只考慮平衡性不考慮效能是不行的,至於這個平衡的度具體如何把握,就要具體情況具體分析了,比如像訂單資料這樣敏感的資訊,對可靠性的要求自然要比一般的業務訊息對可靠性的要求高的多,因為訂單資料是跟錢直接相關的,可能會導致直接的經濟損失。

所以不僅應該知道有哪些保證訊息可靠性的解決方案,還應該知道每種方案對效能的影響程度,以此來進行方案的選擇。

RabbitMQ的事務機制

RabbitMQ是支援AMQP事務機制的,在生產者確認機制之前,事務是確保訊息被成功投遞的唯一方法。

在SpringBoot專案中,使用RabbitMQ事務其實很簡單,只需要宣告一個事務管理的Bean,並將RabbitTemplate的事務設定為true即可。

配置檔案如下:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual

先來配置一下交換機和佇列,以及事務管理器。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 宣告業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 宣告業務佇列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 宣告業務佇列繫結關係
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /**
     * 配置啟用rabbitmq事務
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

然後建立一個消費者,來監聽訊息,用以判斷訊息是否成功傳送。

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務訊息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}

然後是訊息生產者:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("訊息已傳送 {}" ,msg);
    }
}

這裡有兩個注意的地方:

  1. 在初始化方法裡,通過使用rabbitTemplate.setChannelTransacted(true); 來開啟事務。
  2. 在傳送訊息的方法上加上 @Transactional 註解,這樣在該方法中發生異常時,訊息將不會發送。

在controller中加一個介面來生產訊息:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}

來驗證一下:

msg:1
訊息已傳送 1
收到業務訊息:1
msg:2
訊息已傳送 2
收到業務訊息:2
msg:3
訊息已傳送 3
收到業務訊息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
    at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...

msg 的值為 exception 時, 在呼叫rabbitTemplate.convertAndSend 方法之後,程式丟擲了異常,訊息並沒有傳送出去,而是被當前事務回滾了。

當然,你可以將事務管理器註釋掉,或者將初始化方法的開啟事務註釋掉,這樣事務就不會生效,即使在呼叫了傳送訊息方法之後,程式發生了異常,訊息也會被正常傳送和消費。

RabbitMQ中的事務使用起來雖然簡單,但是對效能的影響是不可忽視的,因為每次事務的提交都是阻塞式的等待伺服器處理返回結果,而預設模式下,客戶端是不需要等待的,直接傳送就完事了,除此之外,事務訊息需要比普通訊息多4次與伺服器的互動,這就意味著會佔用更多的處理時間,所以如果對訊息處理速度有較高要求時,儘量不要採用事務機制。

RabbitMQ的生產者確認機制

RabbitMQ中的生產者確認功能是AMQP規範的增強功能,當生產者釋出給所有佇列的已路由訊息被消費者應用程式直接消費時,或者訊息被放入佇列並根據需要進行持久化時,一個Basic.Ack請求會被髮送到生產者,如果訊息無法路由,代理伺服器將傳送一個Basic.Nack RPC請求用於表示失敗。然後由生產者決定該如何處理該訊息。

也就是說,通過生產者確認機制,生產者可以在訊息被伺服器成功接收時得到反饋,並有機會處理未被成功接收的訊息。

在Springboot中開啟RabbitMQ的生產者確認模式也很簡單,只多了一行配置:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true

publisher-confirms: true 即表示開啟生產者確認模式。

然後將訊息生產者的代表進行部分修改:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
//        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange, String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("訊息id:{}, msg:{}", correlationData.getId(), msg);

        rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("訊息確認成功, id:{}", id);
        } else {
            log.error("訊息未成功投遞, id:{}, cause:{}", id, s);
        }
    }
}

讓生產者繼承自RabbitTemplate.ConfirmCallback 類,然後實現其confirm 方法,即可用其接收伺服器回撥。

需要注意的是,在傳送訊息時,程式碼也進行了調整:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);

這裡我們為訊息設定了訊息ID,以便在回撥時通過該ID來判斷是對哪個訊息的回撥,因為在回撥函式中,我們是無法直接獲取到訊息內容的,所以需要將訊息先暫存起來,根據訊息的重要程度,可以考慮使用本地快取,或者存入Redis中,或者Mysql中,然後在回撥時更新其狀態或者從快取中移除,最後使用定時任務對一段時間內未傳送的訊息進行重新投遞。

以下是我盜來的圖,原諒我偷懶不想畫了[手動狗頭]:

另外,還需要注意的是,如果將訊息釋出到不存在的交換機上,那麼釋出用的通道將會被RabbitMQ關閉。

此外,生產者確認機制跟事務是不能一起工作的,是事務的輕量級替代方案。因為事務和釋出者確認模式都是需要先跟伺服器協商,對通道啟用的一種模式,不能對同一個通道同時使用兩種模式。

在生產者確認模式中,訊息的確認可以是非同步和批量的,所以相比使用事務,效能會更好。

使用事務機制和生產者確認機制都能確保訊息被正確的傳送至RabbitMQ,這裡的“正確傳送至RabbitMQ”說的是訊息成功被交換機接收,但如果找不到能接收該訊息的佇列,這條訊息也會丟失。至於如何處理那些無法被投遞到佇列的訊息,將會在下篇進行說明。

結題

所以當公司機房“斷電”時,如何處理那些需要傳送的訊息呢?相信看完上文之後,你的心中已經有了答案。

一般來說,這種“斷電”不會持續較長時間,一般幾分鐘到半小時之間,很快能夠恢復,所以如果是重要訊息,可以儲存到資料庫中,如果是非重要訊息,可以使用redis進行儲存,當然,還要根據訊息的數量級來進行判斷。

如果訊息量比較大,可以考慮將訊息傳送到另一個叢集的死信佇列中,事實上,所在公司就有兩個RabbitMQ叢集,所以當一個叢集不可用時,可以往另一個叢集發訊息,emmm,如果兩個機房都停電了的話,當我沒說。

相關推薦

RabbitMQ如何進行訊息可靠投遞

說明 前幾天,突然發生線上報警,釘釘連發了好幾條訊息,一看是RabbitMQ相關的訊息,心頭一緊,難道翻車了? [橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日誌異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listene

RabbitMQ如何進行訊息可靠投遞下篇

說明 上一篇文章裡,我們瞭解瞭如何保證訊息被可靠投遞到RabbitMQ的交換機中,但還有一些不完美的地方,試想一下,如果向RabbitMQ伺服器傳送一條訊息,伺服器確實也接收到了這條訊息,於是給你返回了ACK確認訊息,但伺服器拿到這條訊息一看,找不到路由它的佇列,於是就把它丟進了垃圾桶,emmm,我猜應該屬於

RabbitMQ訊息可靠投遞 --

前言 上一節介紹了有關rabbitmq裡面常用了幾種命令,以及交換機的路由規則等,生產端以及消費端之間的虛擬碼來介紹了各種路由規則下的消費情況,從管控臺的設定,資料分析,瞭解到如何手動設定vhost下的交換機以及使用者,佇列等。這節開始就rabbitmq的高階特性進行詳細的介紹,比如TT

Netty自娛自樂之類Dubbo RPC 框架設計構想

哈哈 ebe cte proc 文件 num one lex round   之前在前一篇的《Netty自娛自樂之協議棧設計》,菜鳥我已經自娛自樂了設計協議棧,gitHub地址為https://github.com/vOoT/ncustomer-protocal。先這一篇中

談談Nancy中讓人又愛又恨的Diagnostics

base isa 但是 get sting erro for 就會 一次 原文:談談Nancy中讓人又愛又恨的Diagnostics【上篇】前言 在Nancy中有個十分不錯的功能-Diagnostics,可以說這個功能讓人又愛又恨。 或許我們都做過下面這樣的一些嘗試:

AC軍團週報(第四周)第一線段樹從入門到入土4(未完成)

本文章連載AC軍團週報 -> 線段樹 : 從入門到入土【4】 前言 從前有一位遠古神犇,他彙集各大資料結構之精華,經過艱苦卓絕的研究,終於煉製成了一種新的,更簡潔的,更快的線段樹——zkw線段樹 (大霧) 四、zkw線段樹基礎 我們已經學過了線段樹的基礎了,相信大家已經熟練掌握線段樹大概的樣

邢不行|量化小講堂系列33-實戰邢不行親歷:一行程式碼虧損5萬美金,量化投資的風險

引言: 邢不行的系列帖子“量化小講堂”,通過實際案例教初學者使用python進行量化投資,瞭解行業研究方向,希望能對大家有幫助。 【歷史文章彙總】請點選此處 個人微信:coinquant,有問題歡迎交流。 邢不行親歷:一行程式碼虧損5萬美金,量化投資的風險

推薦25款很棒的 HTML5 開發框架和開發工具

  HTML5 在不同的領域讓網頁設計更強大的。快速,安全,響應式,互動和美麗,這些優點吸引更多的 Web 開發人員使用 HTML5。HTML5 有許多新的特性功能,允許開發人員和設計師建立應用程式和網站,帶給使用者桌面應用程式的速度,效能和體驗。   這篇文章整理了25款優秀的 HTML5 框架和開發工具

推薦系統演算法DPMF(Dependent Probabilistic Matrix Factorization).

Adams, Ryan Prescott, George E. Dahl, and Iain Murray. “Incorporating side information in probabilistic matrix factorization

經典網頁設計:頂尖的個人作品集網站設計欣賞

作為一個網頁設計師,需要經常去關注優秀的網站作品,獲取創作靈感,掌握最新的設計趨勢。在這個競爭激烈的就業市場,個人作品集網站是最好的求職工具。因此,設計師們都竭盡所能設計一個有創造性的個人作品展示網站,期望給訪客留下深刻的印象。 在此集合中,你會看到各種各樣的創意設計和新趨勢,包括精巧的佈局,令人印象深刻的

思維導圖學習 | 第一:java學習基礎,讓java不再難懂

配套Ximnd學習導圖下載地址 寫在最後 歡迎關注、喜歡、和點贊後續將推出更多的思維導圖學習文章,敬請期待。 歡迎關注我的微信公眾號獲取更多更全

專案實站 php 實現抽獎程式碼詳解 基礎實現

基本思路:使用者生成一個隨機數,和出獎的獎品設定的隨機數比對一下。符合規則則中獎(使用者的隨機數< 獎品設定的概率值),不符則未中獎。 一 專案準備期,需求確認。和產品大哥一陣切磋後,認為需求1.0 //1 抽獎活動有起止時間 //2 獎品有限制個數的大獎,和不限次數的

重學Node.js 第1&2本地搭建Node環境並起RESTful Api服務

本地搭建Node環境並起RESTful Api服務 課程介紹看這裡:https://www.cnblogs.com/zhangran/p/11963616.html 專案github地址:https://github.com/hellozhangran/happy-egg-server 說明:本想分兩章講環境

Newtonsoft 六個超簡單又實用的特性,值得一試

## 一:講故事 看完官方文件,閱讀了一些 `Newtonsoft` 原始碼,對它有了新的認識,先總結 六個超經典又實用的特性,同大家一起分享,廢話不多說,快來一起看看吧~~~ ## 二:特性分析 ### 1. 程式碼格式化 如果你直接使用 `JsonConvert.SerializeObje

RabbitMqrabbitMq訊息確認機制

  一、提出問題 生產者將訊息傳送出去後,訊息是否到達RabbitMq伺服器呢?預設的情況下,是不知道的 二、引入訊息確認機制 兩種方式:           1.AMQP實現事務機制     &

rabbitmq訊息佇列配置

      #erlang語言支援包     #rabbitmq-server安裝支援   #新增使用者     #刪除使用者   #使用者角色   #啟動 &nbs

RabbitMQ知識盤點_訊息佇列介紹及三種訊息路由模式

最近在看訊息佇列的書籍,把一些收穫總結一下。 首先說說什麼是訊息佇列。這裡就不說那種教科書的定義了,以我的理解,訊息佇列就是通過接收和傳送訊息,使不同的應用系統連線起來。實現了業務系統的解耦,也跨越

rabbitMQ之一rabbitMQ之helloworld傳送與接受訊息-go語言

1.準備工作啟動rabbitmq_server,在瀏覽器上開啟rabbitMQ的管理器2.傳送端程式開始如果匯入"github.com/streadway/amqp" 出現錯誤,則先在goLand下面的終端,執行go get "github.com/streadway/amq

直播預告:Java Spring Boot開發實戰系列課程第11講訊息中介軟體 RabbitMQ 與api原始碼解析

內容概要:mq訊息中介軟體在高併發系統架構中扮演關鍵角色,阿里雙11高併發使用了mq技術。本次課程一起學習最新Java Spring Boot 2.0、RabbitMQ中介軟體的最新特性與實戰應用,同樣會分析核心api原始碼。主講人:徐雷(阿里雲棲特邀Java專家)直播時間:2019年1月8日 週二 今晚20

RabbitMQ訊息中介軟體12.RabbitMQ結合SSM框架-編寫倉儲系統

瞭解了RabbitMQ的基本知識和幾大佇列模式,以及Spring-Rabbit開源工程的基本原理後,我們動手來實現在實際工作開發中需要與SSM框架結合使用的工程場景。該場景模擬以下活動:貨倉管理系統用於對貨物的管理,它的每一次進貨(insert)和修改(update)、刪除(