RabbitMQ進階使用-延時佇列的配置(Spring Boot)
阿新 • • 發佈:2019-01-23
依賴
MAVEN配置pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Gradle配置build.gradle
compile('org.springframework.boot:spring-boot-starter-amqp')
連線配置
得益於spring boot的約定大於配置,只需要在application.yml加入下面配置即可。
spring:
rabbitmq:
host: host
port: port
username: admin
password: passwd
簡單自定義RabbitTemplate和Queue配置
預設的配置還是略顯不足,增加序列化配置如下:
@Configuration public class QueueConfig { /** * 自動注入為SimpleRabbitListenerContainerFactory的訊息序列化轉換器 */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 持久化交換機 */ @Bean(name = "exchange") public FanoutExchange exchange() { return new FanoutExchange("exchange1", true, false); } /** * 持久化佇列 */ @Bean public Queue queue() { return new Queue("queue", true); } /** * 將佇列和exchange繫結 * * @return binding */ @Bean Binding bindingSmsExchangeSmsQueue() { return BindingBuilder.bind(queue()).to(exchange()); } }
特殊延時佇列的配置
延時佇列的用法這裡就不詳細說了,參考Spring Boot與RabbitMQ結合實現延遲佇列的示例,有些場景如未支付訂單30分鐘過期等,可通過延時佇列實現
@Bean public Queue delayQueue(){ return QueueBuilder.durable("delayQueue") //佇列名稱 .withArgument("x-message-ttl",10000) //死信時間 .withArgument("x-dead-letter-exchange", "") //死信重新投遞的交換機 .withArgument("x-dead-letter-routing-key", "queue")//路由到佇列的routingKey .build(); }
啟動應用測試一下
啟動應用,在rabbit管理web檢視所有佇列
所有佇列
檢視delayQueue詳情,框框中為延時配置
將"x-message-ttl"引數改成20000重啟發現問題,控制佇列裡面的引數也沒有修改成功
修改帶引數佇列失敗的問題
問題分析
根據日誌提示,佇列已經存在而且引數不一致導致,然後檢視原始碼在RabbitAdmin發現下面程式碼,在建立佇列失敗的時候會呼叫logOrRethrowDeclarationException方法,logOrRethrowDeclarationException方法中釋出了一個DeclarationExceptionEvent事件,到這裡解決思路有,監聽這個事件,然後刪除相應的佇列
private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
for (int i = 0; i < queues.length; i++) {
Queue queue = queues[i];
if (!queue.getName().startsWith("amq.")) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Queue '" + queue.getName() + "'");
}
try {
try {
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
declareOks.add(declareOk);
}
catch (IllegalArgumentException e) {
if (this.logger.isDebugEnabled()) {
this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
}
try {
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
}
}
catch (TimeoutException e1) {
}
throw new IOException(e);
}
}
catch (IOException e) {
logOrRethrowDeclarationException(queue, "queue", e);
}
}
else if (this.logger.isDebugEnabled()) {
this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
}
}
return declareOks.toArray(new DeclareOk[declareOks.size()]);
}
private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element, String elementType, T t)
throws T {
DeclarationExceptionEvent event = new DeclarationExceptionEvent(this, element, t);
this.lastDeclarationExceptionEvent = event;
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
if (this.ignoreDeclarationExceptions || (element != null && element.isIgnoreDeclarationExceptions())) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Failed to declare " + elementType
+ ": " + (element == null ? "broker-generated" : element)
+ ", continuing...", t);
}
else if (this.logger.isWarnEnabled()) {
Throwable cause = t;
if (t instanceof IOException && t.getCause() != null) {
cause = t.getCause();
}
this.logger.warn("Failed to declare " + elementType
+ ": " + (element == null ? "broker-generated" : element)
+ ", continuing... " + cause);
}
}
else {
throw t;
}
}
解決方法
寫一個DeclarationExceptionEvent事件監聽,處理建立失敗的佇列,既刪除掉
@Component
public class DeclarationExceptionEventListener {
@Autowired
private AmqpAdmin rabbitAdmin;
@EventListener(classes = DeclarationExceptionEvent.class)
public void listen(DeclarationExceptionEvent event) {
final Declarable declarable = event.getDeclarable();
if (declarable instanceof Queue) {
final Queue queue = (Queue) declarable;
rabbitAdmin.deleteQueue(queue.getName());
}
}
}
改完重啟應用,只有一條異常日誌(原來4條),還有一條的原因是第一次建立失敗釋出事件,我們監聽了事件進行處理。檢視rabbit控制檯,引數修改成功。