rabbitmq 延遲,死信佇列
@RequestMapping("/dead")
public ResponseEntity deadLetter(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 宣告訊息處理器 這個對訊息進行處理 可以設定一些引數 對訊息進行一些定製化處理 我們這裡 來設定訊息的編碼 以及訊息的過期時間 因為在.net 以及其他版本過期時間不一致 這裡的時間毫秒值 為字串
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setContentEncoding("utf-8");
messageProperties.setExpiration("5000"); //5分鐘
return message;
}
};
// 向DL_QUEUE 傳送訊息 10*1000毫秒後過期 形成死信
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
return ResponseEntity.ok();
}
·········
配置
package cn.felord.message.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* 佇列配置.
*
* @author dax.
* @version v1.0
* @since 2018 /2/23 14:28
*/
@Configuration
public class RabbitConfig {
@Resource
private RabbitTemplate rabbitTemplate;
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
}
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "DL_KEY");
args.put("x-dead-letter-routing-key", "KEY_R");
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
public Queue redirectQueue() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
}
}
························
接收
@RabbitListener(queues = "REDIRECT_QUEUE")
public void redirect(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(new String (message.getBody()));
}
·················
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 支援釋出確認
publisher-confirms: true
# 支援釋出返回
publisher-returns: true
listener:
simple:
# 採用手動應答
acknowledge-mode: manual
# 當前監聽容器數
concurrency: 1
# 最大數
max-concurrency: 1
# 是否支援重試
retry:
enabled: true
````````````````````````````
pom.xml
<dependencies>
<!--amqp rabbitmq 依賴必須 必須-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--springboot單元測試 選-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--springboot健康監控 選-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--web支援 選-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>