1. 程式人生 > >spring boot實戰(第十二篇)整合RabbitMQ

spring boot實戰(第十二篇)整合RabbitMQ

this direct 還需要 添加屬性 創建 還需 topic start routing

前言

本篇主要講述Spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar

[html] view plain copy
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. lt;/dependency>

消息生產者

不論是創建消息消費者或生產者都需要ConnectionFactory

ConnectionFactory配置

創建AmqpConfig文件AmqpConfig.java(後期的配置都在該文件中) [html] view plain copy
  1. @Configuration
  2. public class AmqpConfig {
  3. public static final String EXCHANGE = "spring-boot-exchange";
  4. public static final String ROUTINGKEY = "spring-boot-routingKey";
  5. @Bean
  6. public ConnectionFactory connectionFactory() {
  7. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  8. connectionFactory.setAddresses("127.0.0.1:5672");
  9. connectionFactory.setUsername("guest");
  10. connectionFactory.setPassword("guest");
  11. connectionFactory.setVirtualHost("/");
  12. connectionFactory.setPublisherConfirms(true); //必須要設置
  13. return connectionFactory;
  14. }
  15. }

這裏需要顯示調用 [html] view plain copy
  1. connectionFactory.setPublisherConfirms(true);
才能進行消息的回調。

RabbitTemplate

通過使用RabbitTemplate來對開發者提供API操作 [html] view plain copy
  1. @Bean
  2. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  3. //必須是prototype類型
  4. public RabbitTemplate rabbitTemplate() {
  5. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  6. return template;
  7. }
這裏設置為原型,具體的原因在後面會講到 在發送消息時通過調用RabbitTemplate中的如下方法 [html] view plain copy
  1. public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
  • exchange:交換機名稱
  • routingKey:路由關鍵字

  • object:發送的消息內容

  • correlationData:消息ID

因此生產者代碼詳單簡潔

Send.java

[html] view plain copy
  1. @Component
  2. public class Send {
  3. private RabbitTemplate rabbitTemplate;
  4. /**
  5. * 構造方法註入
  6. */
  7. @Autowired
  8. public Send(RabbitTemplate rabbitTemplate) {
  9. this.rabbitTemplate = rabbitTemplate;
  10. }
  11. public void sendMsg(String content) {
  12. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  13. rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  14. }
  15. }

如果需要在生產者需要消息發送後的回調,需要對rabbitTemplate設置ConfirmCallback對象,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate

實際的ConfirmCallback為最後一次申明的ConfirmCallback。

下面給出完整的生產者代碼:

[html] view plain copy
  1. package com.lkl.springboot.amqp;
  2. import java.util.UUID;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.rabbit.support.CorrelationData;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * 消息生產者
  9. *
  10. * @author liaokailin
  11. * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
  12. */
  13. @Component
  14. public class Send implements RabbitTemplate.ConfirmCallback {
  15. private RabbitTemplate rabbitTemplate;
  16. /**
  17. * 構造方法註入
  18. */
  19. @Autowired
  20. public Send(RabbitTemplate rabbitTemplate) {
  21. this.rabbitTemplate = rabbitTemplate;
  22. rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設置的內容
  23. }
  24. public void sendMsg(String content) {
  25. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  26. rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  27. }
  28. /**
  29. * 回調
  30. */
  31. @Override
  32. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  33. System.out.println(" 回調id:" + correlationData);
  34. if (ack) {
  35. System.out.println("消息成功消費");
  36. } else {
  37. System.out.println("消息消費失敗:" + cause);
  38. }
  39. }
  40. }

消息消費者

消費者負責申明交換機(生產者也可以申明)、隊列、兩者的綁定操作。

交換機

[html] view plain copy
  1. /**
  2. * 針對消費者配置
  3. FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
  4. HeadersExchange :通過添加屬性key-value匹配
  5. DirectExchange:按照routingkey分發到指定隊列
  6. TopicExchange:多關鍵字匹配
  7. */
  8. @Bean
  9. public DirectExchange defaultExchange() {
  10. return new DirectExchange(EXCHANGE);
  11. }

在Spring Boot中交換機繼承AbstractExchange類

隊列

[html] view plain copy
  1. @Bean
  2. public Queue queue() {
  3. return new Queue("spring-boot-queue", true); //隊列持久
  4. }

綁定

[html] view plain copy
  1. @Bean
  2. public Binding binding() {
  3. return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  4. }
完成以上工作後,在spring boot中通過消息監聽容器實現消息的監聽,在消息到來時執行回調操作。

消息消費

[html] view plain copy
  1. @Bean
  2. public SimpleMessageListenerContainer messageContainer() {
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  4. container.setQueues(queue());
  5. container.setExposeListenerChannel(true);
  6. container.setMaxConcurrentConsumers(1);
  7. container.setConcurrentConsumers(1);
  8. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
  9. container.setMessageListener(new ChannelAwareMessageListener() {
  10. @Override
  11. public void onMessage(Message message, Channel channel) throws Exception {
  12. byte[] body = message.getBody();
  13. System.out.println("receive msg : " + new String(body));
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
  15. }
  16. });
  17. return container;
  18. }

下面給出完整的配置文件: [html] view plain copy
  1. package com.lkl.springboot.amqp;
  2. import org.springframework.amqp.core.AcknowledgeMode;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.DirectExchange;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.Queue;
  8. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  9. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  10. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  13. import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  14. import org.springframework.context.annotation.Bean;
  15. import org.springframework.context.annotation.Configuration;
  16. import org.springframework.context.annotation.Scope;
  17. import com.rabbitmq.client.Channel;
  18. /**
  19. * Qmqp Rabbitmq
  20. *
  21. * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
  22. *
  23. * @author lkl
  24. * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
  25. */
  26. @Configuration
  27. public class AmqpConfig {
  28. public static final String EXCHANGE = "spring-boot-exchange";
  29. public static final String ROUTINGKEY = "spring-boot-routingKey";
  30. @Bean
  31. public ConnectionFactory connectionFactory() {
  32. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  33. connectionFactory.setAddresses("127.0.0.1:5672");
  34. connectionFactory.setUsername("guest");
  35. connectionFactory.setPassword("guest");
  36. connectionFactory.setVirtualHost("/");
  37. connectionFactory.setPublisherConfirms(true); //必須要設置
  38. return connectionFactory;
  39. }
  40. @Bean
  41. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  42. //必須是prototype類型
  43. public RabbitTemplate rabbitTemplate() {
  44. RabbitTemplate template = new RabbitTemplate(connectionFactory());
  45. return template;
  46. }
  47. /**
  48. * 針對消費者配置
  49. * 1. 設置交換機類型
  50. * 2. 將隊列綁定到交換機
  51. *
  52. *
  53. FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
  54. HeadersExchange :通過添加屬性key-value匹配
  55. DirectExchange:按照routingkey分發到指定隊列
  56. TopicExchange:多關鍵字匹配
  57. */
  58. @Bean
  59. public DirectExchange defaultExchange() {
  60. return new DirectExchange(EXCHANGE);
  61. }
  62. @Bean
  63. public Queue queue() {
  64. return new Queue("spring-boot-queue", true); //隊列持久
  65. }
  66. @Bean
  67. public Binding binding() {
  68. return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  69. }
  70. @Bean
  71. public SimpleMessageListenerContainer messageContainer() {
  72. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  73. container.setQueues(queue());
  74. container.setExposeListenerChannel(true);
  75. container.setMaxConcurrentConsumers(1);
  76. container.setConcurrentConsumers(1);
  77. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
  78. container.setMessageListener(new ChannelAwareMessageListener() {
  79. @Override
  80. public void onMessage(Message message, Channel channel) throws Exception {
  81. byte[] body = message.getBody();
  82. System.out.println("receive msg : " + new String(body));
  83. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
  84. }
  85. });
  86. return container;
  87. }
  88. }


以上完成 Spring Boot與RabbitMQ的整合

自動配置

在Spring Boot中實現了RabbitMQ的自動配置,在配置文件中添加如下配置信息 [html] view plain copy
  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=test
  4. spring.rabbitmq.password=test
  5. spring.rabbitmq.virtualHost=test

後會自動創建ConnectionFactory以及RabbitTemplate對應Bean,為什麽上面我們還需要手動什麽呢? 自動創建的ConnectionFactory無法完成事件的回調,即沒有設置下面的代碼 [html] view plain copy
  1. connectionFactory.setPublisherConfirms(true);
具體分析見後續文章的源碼解讀.

spring boot實戰(第十二篇)整合RabbitMQ