1. 程式人生 > 其它 >springboot整合rabbitmq高階特性配置

springboot整合rabbitmq高階特性配置

技術標籤:rabbitmqrabbitmq

一.ack模式

ack模式
在 spring boot 中提供了三種確認模式:

NONE - 使用rabbitmq的自動確認
AUTO - 使用rabbitmq的手動確認, springboot會自動傳送確認回執 (預設)
MANUAL - 使用rabbitmq的手動確認, 且必須手動執行確認操作
預設的 AUTO 模式中, 處理訊息的方法丟擲異常, 則表示訊息沒有被正確處理, 該訊息會被重新發送.

spring:
  rabbitmq:
    listener:
      simple:
        # acknowledgeMode:
NONE # rabbitmq的自動確認 acknowledgeMode: AUTO # rabbitmq的手動確認, springboot會自動傳送確認回執 (預設) # acknowledgeMode: MANUAL # rabbitmq的手動確認, springboot不傳送回執, 必須自己編碼傳送回執

手動執行確認操作
如果設定為 MANUAL 模式,必須手動執行確認操作

@RabbitListener(queues="task_queue")
	public void receive1(String s, Channel c, @Header
(name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { System.out.println("receiver1 - 收到: "+s); for (int i = 0; i < s.length(); i++) { if (s.charAt(i) == '.') { Thread.sleep(1000); } } // 手動傳送確認回執 c.basicAck(tag, false); }

二、消費限額

工作模式中, 為了合理地分發資料, 需要將 qos 設定成 1, 每次只接收一條訊息, 處理完成後才接收下一條訊息.

spring boot 中是通過 prefetch 屬性進行設定, 改屬性的預設值是 250.

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # qos=1, 預設250

三、當有多條訊息存入到訊息佇列中時,可實現消費者高併發接收佇列訊息問題
/**

  • testDirectRabbit 是監聽對列的名稱
  • concurrency min-max 表示併發數,表示有多少個消費者處理佇列裡的訊息 最小-最大數
    */
    @RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”)
    public class DirectReceiver {
    @RabbitHandler
    public void process(Map testMessage){
    System.out.println(Thread.currentThread().getName()+testMessage.toString());
    }
    }

四、設定訊息過期時間TTL
主要有2種方式:

1、指定一條訊息的過期時間。
2、給佇列設定訊息過期時間,佇列中的所有訊息都有同樣的過期時間。

1、指定訊息的過期時間(訊息推送到佇列後,如果指定時間內沒有被消費,則會自動過期。)

@RestController
public class TTLController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/testTTL")
    public String testTTL() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 設定過期時間,單位:毫秒
        byte[] msgBytes = "測試訊息自動過期".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
        return "ok";
    }
}

2、給佇列中的所有訊息設定過期時間(在佇列繫結的時候新增x-message-ttl引數)

@Configuration
public class TTLQueueRabbitConfig {
    @Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 佇列中的訊息未被消費則30秒後過期
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {
        return new DirectExchange("TTL_EXCHANGE", true, false);
    }

    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }
}

五、實現延時佇列(原理ttl+死信佇列就可以實現)
給緩衝佇列設定超時時間,而消費者並不從這個緩衝佇列中拿訊息,這樣等訊息ttl變成死信佇列後就轉發給了實際佇列,消費者只需要繫結實際佇列,從實際佇列拿訊息,這樣就可以實現延時。
在這裡插入圖片描述

在RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由轉發死信交換機在傳送到指定的佇列。

x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現dead letter之後將dead letter重新按照指定的routing-key傳送

@Configuration
public class DelayQueueConfig {
     public final static String DELAY_QUEUE_PER_QUEUE_TTL_NAME = "delay_queue_per_queue_ttl";
 
    public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue";
 
    public final static String DELAY_EXCHANGE_NAME = "delay_exchange";
 
    public final static int QUEUE_EXPIRATION = 4000;
 
    @Bean
    Queue delayQueuePerQueueTTL() {
        return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
            .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設定佇列的過期時間
            .build();
    }
 
    @Bean
    Queue delayProcessQueue() {
        return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
            .build();
    }
 
    @Bean
    DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }
 
    @Bean
    Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayProcessQueue)
            .to(delayExchange)
            .with(DELAY_PROCESS_QUEUE_NAME);
    }
}