1. 程式人生 > 程式設計 >【RabbitMQ】如何進行訊息可靠投遞【上篇】

【RabbitMQ】如何進行訊息可靠投遞【上篇】

說明

前幾天,突然發生線上報警,釘釘連發了好幾條訊息,一看是RabbitMQ相關的訊息,心頭一緊,難道翻車了?

u=1091165172,1855706818&fm=26&gp=0.jpg

[橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日誌異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]觸發。
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 產生原因如下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
應用xxx 可能原因如下
服務名為:
 異常為:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 產生原因如下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:
複製程式碼

定睛一看,看樣子像是消費者莫名其妙斷開了連線,正逢公司搬家之際,難道是機房又雙叒叕。。。。斷電了?於是趕緊聯絡了運維,諮詢RabbitMQ是否發生了調整。幾分鐘後,得到了運維的回覆,由於一些不可描述的原因,RabbitMQ進行了重啟,emmmm,雖然重啟只持續了10分鐘,但是導致該叢集下所有消費者都掛了,需要將專案重啟後才能正常進行消費。

專案重啟後,一切似乎又正常運轉起來,但好景不長,沒過多久,工單就找上了門來,經過排查,發現是生產者在RabbitMQ重啟期間訊息投遞失敗,導致訊息丟失,需要手動處理和恢復。

於是,我開始思考,如何才能進行RabbitMQ的訊息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ叢集不可用的時候,無法投遞的訊息該如何處理呢?

可靠投遞

先來說明一個概念,什麼是可靠投遞呢?在RabbitMQ中,一個訊息從生產者傳送到RabbitMQ伺服器,需要經歷這麼幾個步驟:

  1. 生產者準備好需要投遞的訊息。
  2. 生產者與RabbitMQ伺服器建立連線。
  3. 生產者傳送訊息。
  4. RabbitMQ伺服器接收到訊息,並將其路由到指定佇列。
  5. RabbitMQ伺服器發起回撥,告知生產者訊息傳送成功。

所謂可靠投遞,就是確保訊息能夠百分百從生產者傳送到伺服器。

{6582FAF9-A46E-4239-810B-E1D6883ED070}.png.jpg

為了避免爭議,補充說明一下,如果沒有設定Mandatory引數,是不需要先路由訊息才發起回撥的,伺服器收到訊息後就會進行回撥確認。

2、3、5步都是通過TCP連線進行互動,有網路呼叫的地方就會有事故,網路波動隨時都有可能發生,不管是內部機房停電,還是外部光纜被切,網路事故無法預測,雖然這些都是小概率事件,但對於訂單等敏感資料處理來說,這些情況下導致訊息丟失都是不可接受的。

20170716034945131.jpg

RabbitMQ中的訊息可靠投遞

預設情況下,傳送訊息的操作是不會返回任何資訊給生產者的,也就是說,預設情況下生產者是不知道訊息有沒有正確地到達伺服器。

那麼如何解決這個問題呢?

對此,RabbitMQ中有一些相關的解決方案:

  1. 使用事務機制來讓生產者感知訊息被成功投遞到伺服器。
  2. 通過生產者確認機制實現。

在RabbitMQ中,所有確保訊息可靠投遞的機制都會對效能產生一定影響,如使用不當,可能會對吞吐量造成重大影響,只有通過執行效能基準測試,才能在確定效能與可靠投遞之間的平衡。

在使用可靠投遞前,需要先思考以下問題:

  1. 訊息釋出時,保證訊息進入佇列的重要性有多高?
  2. 如果訊息無法進行路由,是否應該將該訊息返回給釋出者?
  3. 如果訊息無法被路由,是否應該將其傳送到其他地方稍後再重新進行路由?
  4. 如果RabbitMQ伺服器崩潰了,是否可以接受訊息丟失?
  5. RabbitMQ在處理新訊息時是否應該確認它已經為釋出者執行了所有請求的路由和持久化?
  6. 訊息釋出者是否可以批量投遞訊息?
  7. 在可靠投遞上是否有可以接受的平衡性?是否可以接受一部分的不可靠性來提升效能?

只考慮平衡性不考慮效能是不行的,至於這個平衡的度具體如何把握,就要具體情況具體分析了,比如像訂單資料這樣敏感的資訊,對可靠性的要求自然要比一般的業務訊息對可靠性的要求高的多,因為訂單資料是跟錢直接相關的,可能會導致直接的經濟損失。

所以不僅應該知道有哪些保證訊息可靠性的解決方案,還應該知道每種方案對效能的影響程度,以此來進行方案的選擇。

RabbitMQ的事務機制

RabbitMQ是支援AMQP事務機制的,在生產者確認機制之前,事務是確保訊息被成功投遞的唯一方法。

在SpringBoot專案中,使用RabbitMQ事務其實很簡單,只需要宣告一個事務管理的Bean,並將RabbitTemplate的事務設定為true即可。

配置檔案如下:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
複製程式碼

先來配置一下交換機和佇列,以及事務管理器。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 宣告業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 宣告業務佇列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 宣告業務佇列繫結關係
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,@Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /**
     * 配置啟用rabbitmq事務
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}
複製程式碼

然後建立一個消費者,來監聽訊息,用以判斷訊息是否成功傳送。

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message,Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務訊息:{}",msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    }
}
複製程式碼

然後是訊息生產者:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"key",msg);
        log.info("msg:{}",msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("訊息已傳送 {}",msg);
    }
}
複製程式碼

這裡有兩個注意的地方:

  1. 在初始化方法裡,通過使用rabbitTemplate.setChannelTransacted(true); 來開啟事務。
  2. 在傳送訊息的方法上加上 @Transactional 註解,這樣在該方法中發生異常時,訊息將不會傳送。

在controller中加一個介面來生產訊息:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}
複製程式碼

來驗證一下:

msg:1
訊息已傳送 1
收到業務訊息:1
msg:2
訊息已傳送 2
收到業務訊息:2
msg:3
訊息已傳送 3
收到業務訊息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...
複製程式碼

msg 的值為 exception 時, 在呼叫rabbitTemplate.convertAndSend 方法之後,程式丟擲了異常,訊息並沒有傳送出去,而是被當前事務回滾了。

當然,你可以將事務管理器註釋掉,或者將初始化方法的開啟事務註釋掉,這樣事務就不會生效,即使在呼叫了傳送訊息方法之後,程式發生了異常,訊息也會被正常傳送和消費。

RabbitMQ中的事務使用起來雖然簡單,但是對效能的影響是不可忽視的,因為每次事務的提交都是阻塞式的等待伺服器處理返回結果,而預設模式下,客戶端是不需要等待的,直接傳送就完事了,除此之外,事務訊息需要比普通訊息多4次與伺服器的互動,這就意味著會佔用更多的處理時間,所以如果對訊息處理速度有較高要求時,儘量不要採用事務機制。

RabbitMQ的生產者確認機制

RabbitMQ中的生產者確認功能是AMQP規範的增強功能,當生產者釋出給所有佇列的已路由訊息被消費者應用程式直接消費時,或者訊息被放入佇列並根據需要進行持久化時,一個Basic.Ack請求會被髮送到生產者,如果訊息無法路由,代理伺服器將傳送一個Basic.Nack RPC請求用於表示失敗。然後由生產者決定該如何處理該訊息。

也就是說,通過生產者確認機制,生產者可以在訊息被伺服器成功接收時得到反饋,並有機會處理未被成功接收的訊息。

在Springboot中開啟RabbitMQ的生產者確認模式也很簡單,只多了一行配置:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    publisher-confirms: true
複製程式碼

publisher-confirms: true 即表示開啟生產者確認模式。

然後將訊息生產者的代表進行部分修改:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
//        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange,String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("訊息id:{},msg:{}",correlationData.getId(),msg);

        rabbitTemplate.convertAndSend(exchange,msg,correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData,boolean b,String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("訊息確認成功,id:{}",id);
        } else {
            log.error("訊息未成功投遞,id:{},cause:{}",id,s);
        }
    }
}
複製程式碼

讓生產者繼承自RabbitTemplate.ConfirmCallback 類,然後實現其confirm 方法,即可用其接收伺服器回撥。

需要注意的是,在傳送訊息時,程式碼也進行了調整:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange,correlationData);
複製程式碼

這裡我們為訊息設定了訊息ID,以便在回撥時通過該ID來判斷是對哪個訊息的回撥,因為在回撥函式中,我們是無法直接獲取到訊息內容的,所以需要將訊息先暫存起來,根據訊息的重要程度,可以考慮使用本地快取,或者存入Redis中,或者Mysql中,然後在回撥時更新其狀態或者從快取中移除,最後使用定時任務對一段時間內未傳送的訊息進行重新投遞。

以下是我盜來的圖,原諒我偷懶不想畫了[手動狗頭]:

5b65729e0001439305000294.jpg

另外,還需要注意的是,如果將訊息釋出到不存在的交換機上,那麼釋出用的通道將會被RabbitMQ關閉。

此外,生產者確認機制跟事務是不能一起工作的,是事務的輕量級替代方案。因為事務和釋出者確認模式都是需要先跟伺服器協商,對通道啟用的一種模式,不能對同一個通道同時使用兩種模式。

在生產者確認模式中,訊息的確認可以是非同步和批量的,所以相比使用事務,效能會更好。

使用事務機制和生產者確認機制都能確保訊息被正確的傳送至RabbitMQ,這裡的“正確傳送至RabbitMQ”說的是訊息成功被交換機接收,但如果找不到能接收該訊息的佇列,這條訊息也會丟失。至於如何處理那些無法被投遞到佇列的訊息,將會在下篇進行說明。

結題

所以當公司機房“斷電”時,如何處理那些需要傳送的訊息呢?相信看完上文之後,你的心中已經有了答案。

一般來說,這種“斷電”不會持續較長時間,一般幾分鐘到半小時之間,很快能夠恢復,所以如果是重要訊息,可以儲存到資料庫中,如果是非重要訊息,可以使用redis進行儲存,當然,還要根據訊息的數量級來進行判斷。

如果訊息量比較大,可以考慮將訊息傳送到另一個叢集的死信佇列中,事實上,所在公司就有兩個RabbitMQ叢集,所以當一個叢集不可用時,可以往另一個叢集發訊息,emmm,如果兩個機房都停電了的話,當我沒說。

111.png.jpg