1. 程式人生 > >SpringBoot消費RabbitMQ 通過死信保證無法消費的訊息不會丟失

SpringBoot消費RabbitMQ 通過死信保證無法消費的訊息不會丟失

由於最近剛剛接觸RabbitMQ  自己在測試伺服器搭建了一個RabbitMQ的服務

具體安裝過程參見連線[didi大神的部落格]

大家再用RabbitMQ 的時候經常會遇到消費Mq的訊息失敗的情況,一般情況下會根據不同的業務場景通過不同的辦法去記錄下無法消費的訊息的資料,本文簡單介紹了下springBoot整合RabbitMQ的一個示例, 並且通過程式碼簡單的配置整合死信佇列防止無法消費的資料丟失的情況,其實參考了網上許多案例,但是網上寫的太雜,大多是複製過來簡單改改~而且正確性也無法保證

Mq依賴如下

 <!--RabbitMQ 依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.3.7.RELEASE</version>
        </dependency>

SpringBoot的配置檔案如下 

spring:
  rabbitmq:
    host: 10.10.20.103
    port: 5672
    username: user
    password: userPwd
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 5
          initial-interval: 2000
        default-requeue-rejected: false

重點說明下配置檔案default-requeue-rejected

這個欄位一定要設定成 false 不然無法消費的資料不會進入死信佇列的

Mq的全域性配置程式碼如下

import com.xxxx.xxxxxx.contants.SysContants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 訊息佇列的全域性配置,當前每個資料最多嘗試消費五次,每次2秒 如果五次都沒有辦法消費掉
 * 那麼資料進入死信佇列中,防止無法消費的資料丟失
 * @Author xj
 */
@Configuration
public class RabbitConfig {

    // 死信的交換機名
    final String DEAD_LETTER_EXCHANGE="xxxx_xxxx_dead_exchange";

    @Bean
    public Queue maintainQueue() {
        Map<String,Object> args=new HashMap<>();
        // 設定該Queue的死信的信箱
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 設定死信routingKey
        args.put("x-dead-letter-routing-key", SysContants.DEAD_ROUTING_KEY);
        return new Queue(SysContants.ROUTING_KEY,true,false,false,args);
    }

    @Bean
    public Binding maintainBinding() {
        return BindingBuilder.bind(maintainQueue()).to(DirectExchange.DEFAULT)
            .with(SysContants.ROUTING_KEY);
    }

    @Bean
    public Queue deadLetterQueue(){
        return new Queue(SysContants.DEAD_ROUTING_KEY);
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }

    @Bean
    public Binding deadLetterBindding(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(SysContants.DEAD_ROUTING_KEY);
    }

}

Ps:其中SysContants.ROUTING_KEY和SysContants.DEAD_ROUTING_KEY是二個String常量 大家可以根據自己的需要去隨便定義

訊息的消費者程式碼如下

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = SysContants.ROUTING_KEY)
@Slf4j
public class Receiver {

    @Autowired
    XXXXXService xxxService;

    @RabbitHandler
    public void process(String workOrderData) {
        try {
            log.info("receive  data {} ", workOrderData);
            OrderRequest request = JSON.parseObject(workOrderData, OrderRequest.class);
            BaseResult baseResult=xxxService.save(request);
            log.info("save  data {},result {}", workOrderData,baseResult.toString());
        }catch (Exception e){
            log.warn("cousumers ribbitmq message {} error",workOrderData);
            throw new Exception(ErrorCode.COUSUMERS_MQ_MESSAGE_FAIL);
        }
    }
}

其中SysContants.ROUTING_KEY就是RabbitConfig裡面的那個String常量 假設我程式碼在try塊裡面出現異常的話 那麼這個訊息會嘗試再一次消費,  直到重試到spring.rabbitmq.listener.simple.retry.max-attempts設定的次數位置,  其中每次嘗試的間隔為spring.rabbitmq.listener.simple.retry.initial-interval設定的毫秒數,如果訊息仍然沒有被消費掉 那麼訊息就會進去我們設定的死信佇列中去。這樣我們就能保證我們無法消費的資料不會再加上重試機制之後丟棄了。。