1. 程式人生 > >RabbitMQ之spring-rabbit使用

RabbitMQ之spring-rabbit使用

Producer

  1. 配置

    • config.properteis

          
          rabbitmq.host=jannal.mac.com
          rabbitmq.username=jannal
          rabbitmq.password=jannal
          rabbitmq.vhost=jannal-vhost
          rabbitmq.port=5672
          rabbitmq.routkey=*.#
          rabbitmq.exchange=jannal.topic.exchange
          rabbitmq.queue=jannal.topic.queue
      
    • rabbitmq-producer.xml

            <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:rabbit="http://www.springframework.org/schema/rabbit"
         xsi:schemaLocation="http://www.springframework.org/schema/rabbit
             http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
             http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd"
      >
      <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="ignoreUnresolvablePlaceholders" value="true"/> <property name="locations"> <list> <value>classpath*:conf.properties</
      value
      >
      </list> </property> </bean> <!--建立CachingConnectionFactory例項,預設快取1個Channel 1. 單擊使用host和port 2.叢集環境使用address="host1:port1,host2:port2" --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.vhost}" requested-heartbeat="20" channel-cache-size="1" connection-cache-size="1" connection-timeout="20000" publisher-returns="true" publisher-confirms="true" /> <bean id="returnCallBack" class="com.jannal.mq.publish.ReturnCallBackImpl"/> <bean id="confirmCallBack" class="com.jannal.mq.publish.ConfirmCallBackImpl"/> <!--json訊息轉換器--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="${rabbitmq.exchange}" routing-key="${rabbitmq.routkey}" retry-template="retryTemplate" mandatory="true" confirm-callback="confirmCallBack" return-callback="returnCallBack" message-converter="jsonMessageConverter" /> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <!--指數退避策略--> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500"/> <property name="multiplier" value="10.0"/> <property name="maxInterval" value="10000"/> </bean> </property> </bean> <rabbit:admin connection-factory="connectionFactory"/> <!--宣告佇列--> <rabbit:queue name="${rabbitmq.queue}" durable="true" auto-delete="false" exclusive="false"> <!--設定佇列引數--> <rabbit:queue-arguments> <!--設定佇列中訊息存活時間為10秒--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/> <!--設定佇列最大訊息數量為2--> <entry key="x-max-length" value="2" value-type="java.lang.Long"/> <entry key="x-ha-policy" value="all"/> </rabbit:queue-arguments> </rabbit:queue> <!--宣告交換器並繫結,配置topic型別exchange--> <rabbit:topic-exchange name="${rabbitmq.exchange}"> <!--以下配置註釋掉,可以測試returnCallBack--> <rabbit:bindings> <!--可以繫結多個佇列--> <rabbit:binding queue="${rabbitmq.queue}" pattern="*.#"/> </rabbit:bindings> </rabbit:topic-exchange>
    ```
  2. 程式碼

        @RunWith(SpringJUnit4ClassRunner.class)
        @ContextConfiguration({"classpath*:rabbitmq-producer.xml"})
        public class MessagePublishTest {
        
            @Autowired
            private RabbitTemplate rabbitTemplate;
        
            @Test
            public void testProducer() {
                for (int i = 0; i < 1; i++) {
                    rabbitTemplate.convertAndSend("Hello, world " + i);
                }
        
            }
        
        }
    

Consumer

  1. 配置

    • config.properteisProducer一樣

    • rabbitmq-consumer.xml

          <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:rabbit="http://www.springframework.org/schema/rabbit"
         xsi:schemaLocation="http://www.springframework.org/schema/rabbit
             http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
             http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd">
      
      <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
          <property name="ignoreUnresolvablePlaceholders" value="true" />
          <property name="locations">
              <list>
                  <value>classpath*:conf.properties</value>
              </list>
          </property>
      </bean>
      
      <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
          password="${rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.vhost}"
                                 requested-heartbeat="20" />
      <rabbit:admin connection-factory="connectionFactory" />
      
      
      <!--配置消費端監聽對應SimpleMessageListenerContainer-->
      <!--配置參考https://docs.spring.io/spring-amqp/docs/1.6.11.RELEASE/reference/html/_reference.html#containerAttributes-->
      <!--acknowledge屬性, 預設為auto,有三個可選項"auto", "manual", or "none". 即消費者成功處理訊息後必須有個應答, 如果消費者程式發生異常或者宕機, 訊息會被重新放回佇列-->
      <!--concurrency 如果設定為1 表示啟動一個執行緒消費-->
      <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
                                 max-concurrency="1" concurrency="1"
                                 message-converter="messageConverter" prefetch="1"    >
          <rabbit:listener ref="commonMessageListener"  queue-names="${rabbitmq.queue}"   />
      
      </rabbit:listener-container>
      
      <bean id="commonMessageListener" class="com.jannal.mq.consumer.ConsumerAutoMessageListener" />
      
      <bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
      
      
      </beans>
      
  2. 自動確認程式碼

        public class ConsumerAutoMessageListener implements MessageListener {
            private static final Logger logger = LoggerFactory.getLogger("rabbitmq-consumer");
        
            @Override
            public void onMessage(Message message) {
                logger.info(new String(message.getBody(), StandardCharsets.UTF_8));
                //模擬錯誤
                //throw new RuntimeException("出現錯誤");
            }
        }
    
  3. 如果需要手動確認,將上面配置檔案中MessageListenter修改為ChannelAwareMessageListener

        /**
         * @author jannal
         * 手動確認
         **/
        public class ConsumerManualMessageListenter implements ChannelAwareMessageListener {
        
            private static final Logger logger = LoggerFactory.getLogger("rabbitmq-consumer-manual");
        
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                logger.info(new String(message.getBody(), StandardCharsets.UTF_8));
                boolean multiple = false;
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple);
            }
        }
    
    

Consumer監聽器異常的處理

  1. TODO