springboot整合rabbitmq高階特性配置
一.ack模式
ack模式
在 spring boot 中提供了三種確認模式:
NONE - 使用rabbitmq的自動確認
AUTO - 使用rabbitmq的手動確認, springboot會自動傳送確認回執 (預設)
MANUAL - 使用rabbitmq的手動確認, 且必須手動執行確認操作
預設的 AUTO 模式中, 處理訊息的方法丟擲異常, 則表示訊息沒有被正確處理, 該訊息會被重新發送.
spring:
rabbitmq:
listener:
simple:
# acknowledgeMode: NONE # rabbitmq的自動確認
acknowledgeMode: AUTO # rabbitmq的手動確認, springboot會自動傳送確認回執 (預設)
# acknowledgeMode: MANUAL # rabbitmq的手動確認, springboot不傳送回執, 必須自己編碼傳送回執
手動執行確認操作
如果設定為 MANUAL 模式,必須手動執行確認操作
@RabbitListener(queues="task_queue")
public void receive1(String s, Channel c, @Header (name=AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
System.out.println("receiver1 - 收到: "+s);
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i) == '.') {
Thread.sleep(1000);
}
}
// 手動傳送確認回執
c.basicAck(tag, false);
}
二、消費限額
工作模式中, 為了合理地分發資料, 需要將 qos 設定成 1, 每次只接收一條訊息, 處理完成後才接收下一條訊息.
spring boot 中是通過 prefetch 屬性進行設定, 改屬性的預設值是 250.
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # qos=1, 預設250
三、當有多條訊息存入到訊息佇列中時,可實現消費者高併發接收佇列訊息問題
/**
- testDirectRabbit 是監聽對列的名稱
- concurrency min-max 表示併發數,表示有多少個消費者處理佇列裡的訊息 最小-最大數
*/
@RabbitListener(queues = “testDirectQueue”,concurrency=“5-10”)
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage){
System.out.println(Thread.currentThread().getName()+testMessage.toString());
}
}
四、設定訊息過期時間TTL
主要有2種方式:
1、指定一條訊息的過期時間。
2、給佇列設定訊息過期時間,佇列中的所有訊息都有同樣的過期時間。
1、指定訊息的過期時間(訊息推送到佇列後,如果指定時間內沒有被消費,則會自動過期。)
@RestController
public class TTLController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/testTTL")
public String testTTL() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000"); // 設定過期時間,單位:毫秒
byte[] msgBytes = "測試訊息自動過期".getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
return "ok";
}
}
2、給佇列中的所有訊息設定過期時間(在佇列繫結的時候新增x-message-ttl引數)
@Configuration
public class TTLQueueRabbitConfig {
@Bean
public Queue TTLQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 30000); // 佇列中的訊息未被消費則30秒後過期
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange TTLExchange() {
return new DirectExchange("TTL_EXCHANGE", true, false);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
}
}
五、實現延時佇列(原理ttl+死信佇列就可以實現)
給緩衝佇列設定超時時間,而消費者並不從這個緩衝佇列中拿訊息,這樣等訊息ttl變成死信佇列後就轉發給了實際佇列,消費者只需要繫結實際佇列,從實際佇列拿訊息,這樣就可以實現延時。
在RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由轉發死信交換機在傳送到指定的佇列。
x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現dead letter之後將dead letter重新按照指定的routing-key傳送
@Configuration
public class DelayQueueConfig {
public final static String DELAY_QUEUE_PER_QUEUE_TTL_NAME = "delay_queue_per_queue_ttl";
public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue";
public final static String DELAY_EXCHANGE_NAME = "delay_exchange";
public final static int QUEUE_EXPIRATION = 4000;
@Bean
Queue delayQueuePerQueueTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設定佇列的過期時間
.build();
}
@Bean
Queue delayProcessQueue() {
return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
.build();
}
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayProcessQueue)
.to(delayExchange)
.with(DELAY_PROCESS_QUEUE_NAME);
}
}