RabbitMQ之spring-rabbit使用
阿新 • • 發佈:2018-12-14
Producer
-
配置
-
config.properteis
rabbitmq.host=jannal.mac.com rabbitmq.username=jannal rabbitmq.password=jannal rabbitmq.vhost=jannal-vhost rabbitmq.port=5672 rabbitmq.routkey=*.# rabbitmq.exchange=jannal.topic.exchange rabbitmq.queue=jannal.topic.queue
-
rabbitmq-producer.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
-
-
程式碼
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration({"classpath*:rabbitmq-producer.xml"}) public class MessagePublishTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testProducer() { for (int i = 0; i < 1; i++) { rabbitTemplate.convertAndSend("Hello, world " + i); } } }
Consumer
-
配置
-
config.properteis
與Producer
一樣 -
rabbitmq-consumer.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="ignoreUnresolvablePlaceholders" value="true" /> <property name="locations"> <list> <value>classpath*:conf.properties</value> </list> </property> </bean> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.vhost}" requested-heartbeat="20" /> <rabbit:admin connection-factory="connectionFactory" /> <!--配置消費端監聽對應SimpleMessageListenerContainer--> <!--配置參考https://docs.spring.io/spring-amqp/docs/1.6.11.RELEASE/reference/html/_reference.html#containerAttributes--> <!--acknowledge屬性, 預設為auto,有三個可選項"auto", "manual", or "none". 即消費者成功處理訊息後必須有個應答, 如果消費者程式發生異常或者宕機, 訊息會被重新放回佇列--> <!--concurrency 如果設定為1 表示啟動一個執行緒消費--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" max-concurrency="1" concurrency="1" message-converter="messageConverter" prefetch="1" > <rabbit:listener ref="commonMessageListener" queue-names="${rabbitmq.queue}" /> </rabbit:listener-container> <bean id="commonMessageListener" class="com.jannal.mq.consumer.ConsumerAutoMessageListener" /> <bean id="messageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> </beans>
-
-
自動確認程式碼
public class ConsumerAutoMessageListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger("rabbitmq-consumer"); @Override public void onMessage(Message message) { logger.info(new String(message.getBody(), StandardCharsets.UTF_8)); //模擬錯誤 //throw new RuntimeException("出現錯誤"); } }
-
如果需要手動確認,將上面配置檔案中
MessageListenter
修改為ChannelAwareMessageListener
/** * @author jannal * 手動確認 **/ public class ConsumerManualMessageListenter implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger("rabbitmq-consumer-manual"); @Override public void onMessage(Message message, Channel channel) throws Exception { logger.info(new String(message.getBody(), StandardCharsets.UTF_8)); boolean multiple = false; channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple); } }
Consumer監聽器異常的處理
- TODO