10-RabbitMQ-整合SpringBoot
RabbitMQ整個SpringBoot
SpringBoot因其配置簡單、快速開發,已經成為熱門的開發之一
消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息
而消費者從消息隊列中消費信息.具體過程如下:
從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念
生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之後,
接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分布式系統之間互相信息的傳遞.
使用SpringBoot進行整合RabbitMQ
1.pom文件的引入
這是操作RabbitMQ的starter必須要進行引入的
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置文件進行基礎的配置
spring.rabbitmq.virtual-host=/user spring.rabbitmq.port=5672 spring.rabbitmq.password=user spring.rabbitmq.username=user spring.rabbitmq.host=192.168.43.157
RabbitMQ的模式
1、direct模式
配置Queue(消息隊列).那註意由於采用的是Direct模式,需要在配置Queue的時候,指定一個鍵
使其和交換機綁定.
DirectQueue.java
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configurationpublic class DirectQueue { //若隊列不存在則進行創建隊列
//返回的是隊列名字 @Bean public Queue queue(){ return new Queue("direct_queue"); } }
消息生產者
Sender.java
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private AmqpTemplate amqpTemplate; public void send(){ String msg = "direct_queue"; User user = new User(); user.setName("MrChegns"); user.setAge(12); amqpTemplate.convertAndSend("direct_queue",user); } }
此時發送的消息是一個User類型的對象
對於發送對象需要實現序列化接口
User.java
package com.cr.rabbitmqs.direct; import java.io.Serializable; public class User implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public User(String name, int age) {this.name = name; this.age = age; } public User() { } @Override public String toString() { return "User{" + "name=‘" + name + ‘\‘‘ + ", age=" + age + ‘}‘; } }
消費者
Receive.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receive { //對隊列進行監聽
//同時可以監聽多個隊列 @RabbitListener(queues = "direct_queue") public void listen(User msg){ System.out.println(msg); } }
測試:
@Autowired private Sender sender; @Test public void test1(){ sender.send(); }
得到的結果i:
2、topic模式
首先我們看發送端,我們需要配置隊列Queue,再配置交換機(Exchange)
再把隊列按照相應的規則綁定到交換機上
Topic.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class Topic { //創建隊列 @Bean(name = "message") public Queue Aqueue(){ return new Queue("message.topic"); } @Bean(name = "message1") public Queue BQueue(){ return new Queue("message.topics"); } //交換機 //若不存在則進行創建交換機 @Bean public TopicExchange exchange(){ return new TopicExchange("topic_exchange"); } //交換機和隊列進行綁定 @Bean Binding bindingExchangeTopic(@Qualifier("message")Queue message,TopicExchange exchange){ return BindingBuilder.bind(message).to(exchange).with("message.topic"); } @Bean Binding bindingExchangeTopics(@Qualifier("message1")Queue message,TopicExchange exchange){ return BindingBuilder.bind(message).to(exchange).with("message.#"); } }
消費者
Receive1.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receive1 { @RabbitListener(queues = "message.topic") public void tes(User user){ System.out.println( "user1111:" + user); } }
Receive2.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receive2 { @RabbitListener(queues = "message.topics") public void tes(User user){ System.out.println("user222:" + user); } }
消息生產者:
TopicSend.java
import com.cr.rabbitmqs.direct.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TopicSend { @Autowired private AmqpTemplate amqpTemplate; //發送消息 public void send(){ User user = new User("name",12); amqpTemplate.convertSendAndReceive("topic_exchange","message.dev",user); } //發送消息 public void send1(){
User user = new User("name",12);
amqpTemplate.convertSendAndReceive("topic_exchange","message.topic",user );
}
}
在開發中這種模式的使用還是相對比較多的,此時測試的是兩種方法
一個方法所有的隊列都可以進行獲取
一個方法只有一個隊列可以獲取到消息
測試:
@Autowired private TopicSend topicSend; @Test public void ttt(){ topicSend.send(); }
測試:
@Autowired private TopicSend topicSend; @Test public void ttt(){ topicSend.send1(); }
後臺查看交換機和隊列的綁定關系以機相關的路由鍵
3、fanout
那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發送到路由器的消息會使
得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中
convertAndSend方法的參數2),也會被忽略!那麽直接上代碼,發送端配置如下:
Fanout.java
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class Fanout { //隊列 //如果隊列不存在會自動創建隊列 @Bean public Queue queueA(){ return new Queue("queueA"); } @Bean public Queue queueB(){ return new Queue("queueB"); } @Bean public Queue queueC(){ return new Queue("queueC"); } //交換機 //如果交換機不存在會自動創建隊列 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } //將交換機和隊列進行綁定 @Bean Binding bindingExchangequeueA(Queue queueA,FanoutExchange fanoutExchange){ return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean Binding bindingExchangequeueB(Queue queueB,FanoutExchange fanoutExchange){ return BindingBuilder.bind(queueB).to(fanoutExchange); } @Bean Binding bindingExchangequeueC(Queue queueC,FanoutExchange fanoutExchange){ return BindingBuilder.bind(queueC).to(fanoutExchange); } }
消費者:
FanoutReceive.java
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component //監聽器 @RabbitListener(queues = "queueA") public class FanoutReceive { //監聽的方法 @RabbitHandler public void listen(String msg){ System.out.println("queueA" + msg); } }
FanoutSender.java
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class FanoutSender { @Autowired private AmqpTemplate amqpTemplate; //發送消息 public void send(){ String msg = "test fanout...."; //發送消息:參數依次是 交換機名字--路由鍵(此時設置路由鍵沒有作用)--消息 amqpTemplate.convertAndSend("fanoutExchange","",msg); } }
測試:
@RunWith(SpringRunner.class) @SpringBootTest public class BpptandrabbitmqApplicationTests { //測試fanout @Autowired private FanoutSender fanoutSender; @Test public void fanout() { fanoutSender.send(); } }
此時3個隊列都能接收到消息
交換機、隊列以及路由鍵
10-RabbitMQ-整合SpringBoot