1. 程式人生 > >10-RabbitMQ-整合SpringBoot

10-RabbitMQ-整合SpringBoot

r.java autowire color 一個 技術 分享圖片 conf imp 多個

RabbitMQ整個SpringBoot

SpringBoot因其配置簡單、快速開發,已經成為熱門的開發之一

消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息

而消費者從消息隊列中消費信息.具體過程如下:

技術分享圖片

從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念

生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之後,

接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分布式系統之間互相信息的傳遞.

使用SpringBoot進行整合RabbitMQ

1.pom文件的引入

這是操作RabbitMQ的starter必須要進行引入的

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件進行基礎的配置

spring.rabbitmq.virtual-host=/user
spring.rabbitmq.port
=5672 spring.rabbitmq.password=user spring.rabbitmq.username=user spring.rabbitmq.host=192.168.43.157

RabbitMQ的模式

1、direct模式

配置Queue(消息隊列).那註意由於采用的是Direct模式,需要在配置Queue的時候,指定一個鍵

使其和交換機綁定.

DirectQueue.java
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectQueue {    //若隊列不存在則進行創建隊列
   //返回的是隊列名字 @Bean
public Queue queue(){ return new Queue("direct_queue"); } }

消息生產者

Sender.java 

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
        String msg = "direct_queue";
        User user = new User();
        user.setName("MrChegns");
        user.setAge(12);
        amqpTemplate.convertAndSend("direct_queue",user);
    }

}

此時發送的消息是一個User類型的對象

對於發送對象需要實現序列化接口

User.java 
package com.cr.rabbitmqs.direct;
import java.io.Serializable;
public class User implements Serializable {
     private String name;
     private int age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public User(String name, int age) {this.name = name;
        this.age = age;
    }
    public User() {
    }
    @Override
    public String toString() {
        return "User{" +
                "name=‘" + name + \‘ +
                ", age=" + age +
                };
    }
}

消費者

Receive.java 

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive {
   //對隊列進行監聽
   //同時可以監聽多個隊列 @RabbitListener(queues
= "direct_queue") public void listen(User msg){ System.out.println(msg); } }

測試:

 @Autowired
    private  Sender sender;

    @Test
    public void test1(){
        sender.send();
    }

得到的結果i:

技術分享圖片

2、topic模式

首先我們看發送端,我們需要配置隊列Queue,再配置交換機(Exchange)

再把隊列按照相應的規則綁定到交換機上

Topic.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Topic {

    //創建隊列
    @Bean(name = "message")
    public Queue Aqueue(){
        return  new Queue("message.topic");
    }

    @Bean(name = "message1")
    public Queue BQueue(){
        return  new Queue("message.topics");
    }

    //交換機
    //若不存在則進行創建交換機
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topic_exchange");
    }

    //交換機和隊列進行綁定
    @Bean
    Binding bindingExchangeTopic(@Qualifier("message")Queue message,TopicExchange exchange){
        return BindingBuilder.bind(message).to(exchange).with("message.topic");
    }
    @Bean
    Binding bindingExchangeTopics(@Qualifier("message1")Queue message,TopicExchange exchange){
        return BindingBuilder.bind(message).to(exchange).with("message.#");
    }
}

消費者

Receive1.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive1 {

    @RabbitListener(queues = "message.topic")
    public void tes(User user){
        System.out.println( "user1111:" + user);
    }
}

Receive2.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive2 {

    @RabbitListener(queues = "message.topics")
    public void tes(User user){
        System.out.println("user222:" + user);
    }
}

消息生產者:

TopicSend.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicSend {
    @Autowired
    private AmqpTemplate amqpTemplate;

    //發送消息
    public void send(){
        User user = new User("name",12);

        amqpTemplate.convertSendAndReceive("topic_exchange","message.dev",user);
    }

    //發送消息
    public void send1(){
   User user = new User("name",12);
   amqpTemplate.convertSendAndReceive("topic_exchange","message.topic",user ); 
} 
}

在開發中這種模式的使用還是相對比較多的,此時測試的是兩種方法

一個方法所有的隊列都可以進行獲取

一個方法只有一個隊列可以獲取到消息

測試:

    @Autowired
    private TopicSend topicSend;

    @Test
    public  void ttt(){
        topicSend.send();
    }

技術分享圖片

測試:

    @Autowired
    private TopicSend topicSend;

    @Test
    public  void ttt(){
        topicSend.send1();
    }

技術分享圖片

後臺查看交換機和隊列的綁定關系以機相關的路由鍵

技術分享圖片

3、fanout

那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發送到路由器的消息會使

得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中

convertAndSend方法的參數2),也會被忽略!那麽直接上代碼,發送端配置如下:

Fanout.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Fanout {

    //隊列
    //如果隊列不存在會自動創建隊列
    @Bean
    public Queue queueA(){
        return new Queue("queueA");
    }

    @Bean
    public Queue queueB(){
        return new Queue("queueB");
    }

    @Bean
    public Queue queueC(){
        return new Queue("queueC");
    }

    //交換機
    //如果交換機不存在會自動創建隊列
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    //將交換機和隊列進行綁定
    @Bean
    Binding bindingExchangequeueA(Queue queueA,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangequeueB(Queue queueB,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangequeueC(Queue queueC,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queueC).to(fanoutExchange);
    }
}

消費者:

FanoutReceive.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
//監聽器
@RabbitListener(queues = "queueA")
public class FanoutReceive {

    //監聽的方法
    @RabbitHandler
    public void listen(String  msg){
        System.out.println("queueA" + msg);
    }


}

技術分享圖片

FanoutSender.java

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //發送消息
    public void send(){
        String  msg = "test fanout....";
        //發送消息:參數依次是  交換機名字--路由鍵(此時設置路由鍵沒有作用)--消息
        amqpTemplate.convertAndSend("fanoutExchange","",msg);
    }
}

測試:

@RunWith(SpringRunner.class)
@SpringBootTest
public class BpptandrabbitmqApplicationTests {
    //測試fanout
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void fanout() {
        fanoutSender.send();
    }

}

此時3個隊列都能接收到消息

技術分享圖片

交換機、隊列以及路由鍵

技術分享圖片

10-RabbitMQ-整合SpringBoot