spring boot實戰(第十二篇)整合RabbitMQ
阿新 • • 發佈:2017-10-31
this direct 還需要 添加屬性 創建 還需 topic start routing
創建AmqpConfig文件AmqpConfig.java(後期的配置都在該文件中)
[html] view plain copy
這裏需要顯示調用 [html] view plain copy
在Spring Boot中交換機繼承AbstractExchange類
下面給出完整的配置文件: [html] view plain copy
以上完成 Spring Boot與RabbitMQ的整合
後會自動創建ConnectionFactory以及RabbitTemplate對應Bean,為什麽上面我們還需要手動什麽呢? 自動創建的ConnectionFactory無法完成事件的回調,即沒有設置下面的代碼 [html] view plain copy
前言
本篇主要講述Spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar
[html] view plain copy
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- lt;/dependency>
消息生產者
不論是創建消息消費者或生產者都需要ConnectionFactoryConnectionFactory配置
- @Configuration
- public class AmqpConfig {
- public static final String EXCHANGE = "spring-boot-exchange";
- public static final String ROUTINGKEY = "spring-boot-routingKey";
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true); //必須要設置
- return connectionFactory;
- }
- }
這裏需要顯示調用 [html] view plain copy
- connectionFactory.setPublisherConfirms(true);
RabbitTemplate
通過使用RabbitTemplate來對開發者提供API操作 [html] view plain copy- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- //必須是prototype類型
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
- exchange:交換機名稱
-
routingKey:路由關鍵字
-
object:發送的消息內容
-
correlationData:消息ID
Send.java
[html] view plain copy- @Component
- public class Send {
- private RabbitTemplate rabbitTemplate;
- /**
- * 構造方法註入
- */
- @Autowired
- public Send(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- }
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
- }
- }
如果需要在生產者需要消息發送後的回調,需要對rabbitTemplate設置ConfirmCallback對象,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate
實際的ConfirmCallback為最後一次申明的ConfirmCallback。
下面給出完整的生產者代碼:
[html] view plain copy
- package com.lkl.springboot.amqp;
- import java.util.UUID;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * 消息生產者
- *
- * @author liaokailin
- * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
- */
- @Component
- public class Send implements RabbitTemplate.ConfirmCallback {
- private RabbitTemplate rabbitTemplate;
- /**
- * 構造方法註入
- */
- @Autowired
- public Send(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設置的內容
- }
- public void sendMsg(String content) {
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
- }
- /**
- * 回調
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println(" 回調id:" + correlationData);
- if (ack) {
- System.out.println("消息成功消費");
- } else {
- System.out.println("消息消費失敗:" + cause);
- }
- }
- }
消息消費者
消費者負責申明交換機(生產者也可以申明)、隊列、兩者的綁定操作。
交換機
[html] view plain copy- /**
- * 針對消費者配置
- FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
- HeadersExchange :通過添加屬性key-value匹配
- DirectExchange:按照routingkey分發到指定隊列
- TopicExchange:多關鍵字匹配
- */
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(EXCHANGE);
- }
在Spring Boot中交換機繼承AbstractExchange類
隊列
[html] view plain copy- @Bean
- public Queue queue() {
- return new Queue("spring-boot-queue", true); //隊列持久
- }
綁定
[html] view plain copy- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
- }
消息消費
[html] view plain copy- @Bean
- public SimpleMessageListenerContainer messageContainer() {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueues(queue());
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(1);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- System.out.println("receive msg : " + new String(body));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
- }
- });
- return container;
- }
下面給出完整的配置文件: [html] view plain copy
- package com.lkl.springboot.amqp;
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.config.ConfigurableBeanFactory;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Scope;
- import com.rabbitmq.client.Channel;
- /**
- * Qmqp Rabbitmq
- *
- * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
- *
- * @author lkl
- * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
- */
- @Configuration
- public class AmqpConfig {
- public static final String EXCHANGE = "spring-boot-exchange";
- public static final String ROUTINGKEY = "spring-boot-routingKey";
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setVirtualHost("/");
- connectionFactory.setPublisherConfirms(true); //必須要設置
- return connectionFactory;
- }
- @Bean
- @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- //必須是prototype類型
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate template = new RabbitTemplate(connectionFactory());
- return template;
- }
- /**
- * 針對消費者配置
- * 1. 設置交換機類型
- * 2. 將隊列綁定到交換機
- *
- *
- FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
- HeadersExchange :通過添加屬性key-value匹配
- DirectExchange:按照routingkey分發到指定隊列
- TopicExchange:多關鍵字匹配
- */
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(EXCHANGE);
- }
- @Bean
- public Queue queue() {
- return new Queue("spring-boot-queue", true); //隊列持久
- }
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
- }
- @Bean
- public SimpleMessageListenerContainer messageContainer() {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
- container.setQueues(queue());
- container.setExposeListenerChannel(true);
- container.setMaxConcurrentConsumers(1);
- container.setConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
- container.setMessageListener(new ChannelAwareMessageListener() {
- @Override
- public void onMessage(Message message, Channel channel) throws Exception {
- byte[] body = message.getBody();
- System.out.println("receive msg : " + new String(body));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
- }
- });
- return container;
- }
- }
以上完成 Spring Boot與RabbitMQ的整合
自動配置
在Spring Boot中實現了RabbitMQ的自動配置,在配置文件中添加如下配置信息 [html] view plain copy- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=test
- spring.rabbitmq.password=test
- spring.rabbitmq.virtualHost=test
後會自動創建ConnectionFactory以及RabbitTemplate對應Bean,為什麽上面我們還需要手動什麽呢? 自動創建的ConnectionFactory無法完成事件的回調,即沒有設置下面的代碼 [html] view plain copy
- connectionFactory.setPublisherConfirms(true);
spring boot實戰(第十二篇)整合RabbitMQ