1. 程式人生 > 其它 >springboot+rabbitmq 之 消費端配置

springboot+rabbitmq 之 消費端配置

Chapter1

直接上程式碼:

@Slf4j
@Component
public class UserSettlementConsumer {

    @RabbitHandler
    @RabbitListener(queues = "${spring.rabbitmq.mq-name}")
    public void testListener(String msg) {
        log.info("訊息出隊:"+msg);
    }
}

可以看出來,RabbitMQ主要是藉助於@RabbitHandler和@RabbitListener這兩個註解來實現訊息佇列的消費。

@RabbitHandler的javadoc註釋:Annotation that marks a method to be the target of a Rabbit message listener within a class that is annotated with RabbitListener.
@RabbitListener的javadoc註釋:Annotation that marks a method to be the target of a Rabbit message listener on the specified queues() (or bindings()).

Chapter2

需要說明的是,在應用程式服務啟動時,如果指定的訊息在broker中不存在,則會導致mq初始化失敗,服務也無法啟動。如下是錯誤資訊:

2021-06-15 15:13:14.997 [][] [main] INFO  o.s.a.rabbit.connection.CachingConnectionFactory:497 - Created new connection: connectionFactory#53ea380b:0/SimpleConnection@5b1cedfd [delegate=amqp://[email protected]:5672/, localPort= 1142]
2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN o.s.amqp.rabbit.listener.BlockingQueueConsumer:707 - Failed to declare queue: test.queue 2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN o.s.amqp.rabbit.listener.BlockingQueueConsumer:641 - Queue declaration failed; retries left=3 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[test.queue] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:713) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:597) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:584) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1338) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: null at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006) at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:363) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190) at com.sun.proxy.$Proxy252.queueDeclarePassive(Unknown Source) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:692) ... 5 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test.queue' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ... 15 common frames omitted

錯誤描述很明顯,是要求指定的佇列必須存在。

解決辦法有2個:

  • 一是利用@Bean註解來宣告queue。
  • 二是藉助於@RabbitListener的成員:queuesToDeclare

@Bean宣告方式如下:

@Configuration
public class DirectRabbitConfig {

    @Value("${spring.rabbitmq.mq-name}")
    private String mqName;

    @Bean
    public Queue myQueue() {
        return new Queue(mqName, true);
    }
}

org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare 可以在佇列不存在時直接建立:

@Slf4j
@Component
public class UserSettlementConsumer {

    @RabbitHandler
    @RabbitListener(queuesToDeclare ={@Queue(name = "${spring.rabbitmq.mq-name}",durable = "true")})
    public void testListener(String msg) {
        log.info("訊息出隊:"+msg);
    }
}

如下是spring-rabbit-2.2.0.RELEASE.jar裡org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare的javadoc:

package org.springframework.amqp.rabbit.annotation;
public @interface RabbitListener {

    /**
     * The queues for this listener.
     * If there is a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the
     * application context, the queue will be declared on the broker with default
     * binding (default exchange with the queue name as the routing key).
     * Mutually exclusive with {@link #bindings()} and {@link #queues()}.
     * @return the queue(s) to declare.
     * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
     * @since 2.0
     */
    Queue[] queuesToDeclare() default {};

}

Chapter3

回過頭來介紹RabbitListener#queues()

package org.springframework.amqp.rabbit.annotation;
public @interface RabbitListener {

    /**
     * The queues for this listener.
     * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.
     * Expression must be resolved to the queue name or {@code Queue} object.
     * The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with
     * a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application
     * context.
     * Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.
     * @return the queue names or expressions (SpEL) to listen to from target
     * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
     */
    String[] queues() default {};

}

從以上RabbitListener#queues()的javadoc內容可以看出來如下三點資訊,其中第2條指明瞭佇列必須存在:

  1. queues的取值可以是常量(如 MessageQueueConstant.USER_QUEUE),可以是屬性佔位符("#{configToolkitProp['zk.address']}"),可以是SpEL表示式(如"${spring.rabbitmq.mq-name}"、"#{userQueue.name}")
  2. 所指定的佇列必須存在,或者是ApplicationContext裡的一個具有org.springframework.amqp.rabbit.core.RabbitAdmin的bean。
  3. queues()與bindings()和queuesToDeclare()是互斥的。指定了queues(),就不能再指定bindings()和queuesToDeclare()了。