RabbitMQ學習(十一)之spring整合傳送非同步訊息
阿新 • • 發佈:2019-01-23
實現使用Exchange型別為DirectExchange. routingkey的名稱預設為Queue的名稱。非同步傳送訊息。
1.配置檔案
[plain] view plain copy print?- #============== rabbitmq config ====================
- rabbit.hosts=192.168.36.102
- rabbit.username=admin
- rabbit.password=admin
- rabbit.virtualHost=/
- rabbit.queue=spring-queue-async
- rabbit.routingKey=spring-queue-async#<span style="font-family: Helvetica, Tahoma, Arial, sans-serif; font-size: 14px; line-height: 25.2000007629395px;">routingkey的名稱預設為Queue的名稱</span>
- <?xmlversion="1.0"encoding="UTF-8"?>
- <beansxmlns="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
- <beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
- <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
- <propertyname="ignoreResourceNotFound"value="true"/>
- <propertyname="locations">
- <list>
- <!-- 標準配置 -->
- <value>classpath*:/application.properties</value>
- </list>
- </property>
- </bean>
- <!-- 建立connectionFactory -->
- <beanid="rabbitConnectionFactory"
- class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-argvalue="${rabbit.hosts}"/>
- <propertyname="username"value="${rabbit.username}"/>
- <propertyname="password"value="${rabbit.password}"/>
- <propertyname="virtualHost"value="${rabbit.virtualHost}"/>
- <propertyname="channelCacheSize"value="5"/>
- </bean>
- <!-- 建立rabbitAdmin 代理類 -->
- <beanid="rabbitAdmin"
- class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-argref="rabbitConnectionFactory"/>
- </bean>
- <!-- 建立rabbitTemplate 訊息模板類
- -->
- <beanid="rabbitTemplate"
- class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-argref="rabbitConnectionFactory"></constructor-arg>
- <propertyname="routingKey"value="${rabbit.routingKey}"></property>
- </bean>
- </beans>
- package cn.slimsmart.rabbitmq.demo.spring.async;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- publicclass Send {
- publicstaticvoid main(String[] args) throws InterruptedException {
- ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-send.xml");
- AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);
- for(int i=0;i<1000;i++){
- amqpTemplate.convertAndSend("test spring async=>"+i);
- Thread.sleep(3000);
- }
- }
- }
4.處理訊息類ReceiveMsgHandler.Java
[java] view plain copy print?- package cn.slimsmart.rabbitmq.demo.spring.async;
- publicclass ReceiveMsgHandler {
- publicvoid handleMessage(String text) {
- System.out.println("Received: " + text);
- }
- }
- <?xmlversion="1.0"encoding="UTF-8"?>
- <beansxmlns="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
- <beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
- <propertyname="ignoreResourceNotFound"value="true"/>
- <propertyname="locations">
- <list>
- <!-- 標準配置 -->
- <value>classpath*:/application.properties</value>
- </list>
- </property>
- </bean>
- <!-- 建立connectionFactory -->
- <beanid="rabbitConnectionFactory"
- class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-argvalue="${rabbit.hosts}"/>
- <propertyname="username"value="${rabbit.username}"/>
- <propertyname="password"value="${rabbit.password}"/>
- <propertyname="virtualHost"value="${rabbit.virtualHost}"/>
- <propertyname="channelCacheSize"value="5"/>
- </bean>
- <!-- 宣告訊息轉換器為SimpleMessageConverter -->
- <beanid="messageConverter"
- class="org.springframework.amqp.support.converter.SimpleMessageConverter">
- </bean>
- <!-- 監聽生產者傳送的訊息開始 -->
- <!-- 用於接收訊息的處理類 -->
- <beanid="receiveHandler"
- class="cn.slimsmart.rabbitmq.demo.spring.async.ReceiveMsgHandler">
- </bean>
- <!-- 用於訊息的監聽的代理類MessageListenerAdapter -->
- <beanid="receiveListenerAdapter"
- class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
- <constructor-argref="receiveHandler"/>
- <propertyname="defaultListenerMethod"value="handleMessage"></property>
- <propertyname="messageConverter"ref="messageConverter"></property>
- </bean>
- <!-- 用於訊息的監聽的容器類SimpleMessageListenerContainer,對於queueName的值一定要與定義的Queue的值相同 -->
- <beanid="listenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <propertyname="queueNames"value="${rabbit.queue}"></property>
- <propertyname="connectionFactory"ref="rabbitConnectionFactory"></property>
- <propertyname="messageListener"ref="receiveListenerAdapter"></property>
- </bean>
- <!-- 監聽生產者傳送的訊息結束 -->
- </beans>
- package cn.slimsmart.rabbitmq.demo.spring.async;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- publicclass Receive {
- publicstaticvoid main(String[] args) {
- new ClassPathXmlApplicationContext("applicationContext-rabbitmq-async-receive.xml");
- }
- }
- Received: test spring async=>0
- Received: test spring async=>1
- Received: test spring async=>2
- Received: test spring async=>3
- Received: test spring async=>4
- Received: test spring async=>5
- Received: test spring async=>6
- Received: test spring async=>7
- ......
- log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
- log4j:WARN Please initialize the log4j system properly.
- Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'listenerContainer'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
- at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:170)
- at org.springframework.context.support.DefaultLifecycleProcessor.access$1(DefaultLifecycleProcessor.java:154)
- at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:339)
- at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:143)
- at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)
- at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:931)
- at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:472)
- at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
- at cn.slimsmart.rabbitmq.demo.spring.async.Consumer.main(Consumer.java:7)
- Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
- at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:333)
- at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:360)
- at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)
- ... 8 more
- Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
- at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:228)
- at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:516)
- at java.lang.Thread.run(Unknown Source)
- Caused by: java.io.IOException
- at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
- at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
- at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
- at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:788)
- at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
- at java.lang.reflect.Method.invoke(Unknown Source)
- at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
- at com.sun.proxy.$Proxy8.queueDeclarePassive(Unknown Source)
- at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:213)
- ... 2 more
- Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}
- at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
- at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
- at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
- at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
- at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
- ... 11 more
- Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'spring-queue-async' in vhost '/', class-id=50, method-id=10), null, ""}
- at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473)
- at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313)
- at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
- at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
- at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)