1. 程式人生 > >springboot簡易整合rabbitmq

springboot簡易整合rabbitmq

寫在前面:本文采用rabbitmq環境是docker單節點。
專案地址:https://github.com/Blankwhiter/AMQP

一、搭建rabbitmq環境

在centos視窗中,執行如下命令拉取映象,以及建立容器:

docker pull rabbitmq:3.7-management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq-single rabbitmq:3.7-management

注:5672 -> 客戶端與rabbitmq的通訊埠 15672 -> 圖形化管理介面埠

建立容器後,在瀏覽器輸入http://192.168.10.170:15672

注:192.168.10.170是虛擬機器ip ,預設的username/password為guest/guest

二、springboot整合rabbitmq

1.template操作rabbitmq 收發訊息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>template</artifactId> <version>0.0.1-SNAPSHOT</version> <
packaging
>
jar</packaging> <name>template</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入序列化 由於本次測試沒有引入spring-boot-starter-web模組,故引入jackson-databind <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> <scope>compile</scope> </dependency> <!--引入amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

application.yml

spring:
  rabbitmq:
    host: 192.168.9.219
    port: 5672
    username: guest
    password: guest

AmqpConfig.java

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

測試類,讀者請按照步驟依次執行

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TemplateApplicationTests {

    /**
     * rabbitmq 給rabbitmq 收發訊息
     */
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * amqpTemplate 給amqp中介軟體 收發訊息, 推薦使用
     */
    @Autowired
    AmqpTemplate amqpTemplate;

    /**
     * 交換器、佇列管理
     * amqp系統管理功能元件
     */
    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 第一步:
     * 使用amqpAdmin初始化。建立交換器,佇列,以及交換器繫結佇列
     * 執行完成。訪問  http://192.168.9.219:15672/#/exchanges/%2F/exchange.direct 可以檢視具體資訊
     */
    @Test
    public void init() {
        /**
         * exchange 型別主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
         * 具體區別更多詳情請檢視 https://blog.csdn.net/belonghuang157405/article/details/83184388 關於amqp相關內容
         */

        //建立交換器 點對點模式
        amqpAdmin.declareExchange(new DirectExchange("exchange.direct"));
        //建立佇列 queue第二個引數:是否持久化
        amqpAdmin.declareQueue(new Queue("direct-queue",true));
        //交換器繫結佇列
        amqpAdmin.declareBinding(new Binding("direct-queue", Binding.DestinationType.QUEUE,"exchange.direct","fruit.apple",null));

    }


    /**
     * 第二步:
     * 傳送訊息
     */
    @Test
    public void sendMessage(){

        //第一種.使用send 需要自己構造一個Message,定義訊息內容以及訊息頭
        //rabbitTemplate.send("exchange.direct","fruit.apple",new Message("apple-message-made".getBytes(),null));

        //第二種.使用convertAndSend  第三個引數object預設當初訊息體,自動序列化傳送給rabbitmq
        HashMap<Object, Object> map = new HashMap<>();
        map.put("type","red apple");
        map.put("data", Arrays.asList(1,2,3));
        // convertAndSend 使用的轉換器是SimpleMessageConverter 會採用jdk序列方式,
        // 但往往大多數會想使用json方式,故多編寫一個配置類:AmqpConfig
        // 這時候再去http://192.168.9.219:15672/#/queues/%2F/direct-queue介面上Get messages時候檢視訊息就是json格式
        rabbitTemplate.convertAndSend("exchange.direct","fruit.apple",map);

    }

    /**
     * 第三步:
     * 接收訊息
     */
    @Test
    public void receiveMessage(){

        Object message = rabbitTemplate.receiveAndConvert("direct-queue");
        System.out.println(message.getClass());
        System.out.println(message.toString());

    }

}

2.使用@RabbitListene註解,監聽訊息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>annotation</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>annotation</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--引入amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

application.yml

spring:
  rabbitmq:
    host: 192.168.9.219
    port: 5672
    username: guest
    password: guest

AmqpConfig.java

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

測試類,讀者請按照步驟依次執行


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AnnotationApplicationTests {

    /**
     * rabbitmq 給rabbitmq 收發訊息
     */
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * amqpTemplate 給amqp中介軟體 收發訊息, 推薦使用
     */
    @Autowired
    AmqpTemplate amqpTemplate;

    /**
     * 交換器、佇列管理
     * amqp系統管理功能元件
     */
    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 第一步:
     * 使用amqpAdmin初始化。建立交換器,佇列,以及交換器繫結佇列
     * 執行完成。訪問 http://192.168.9.219:15672/#/exchanges/%2F/exchange.fanout 可以檢視具體資訊
     */
    @Test
    public void init() {
        /**
         * exchange 型別主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
         * 具體區別更多詳情請檢視 https://blog.csdn.net/belonghuang157405/article/details/83184388 關於amqp相關內容
         */

        //建立交換器 廣播模式
        amqpAdmin.declareExchange(new FanoutExchange("exchange.fanout"));
        //建立佇列 queue第二個引數:是否持久化
        amqpAdmin.declareQueue(new Queue("fanout-queue1",true));
        amqpAdmin.declareQueue(new Queue("fanout-queue2",true));
        amqpAdmin.declareQueue(new Queue("fanout-queue3",true));
        //交換器繫結佇列
        amqpAdmin.declareBinding(new Binding("fanout-queue1", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-1",null));
        amqpAdmin.declareBinding(new Binding("fanout-queue2", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-2",null));
        amqpAdmin.declareBinding(new Binding("fanout-queue3", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-3",null));

    }

    /**
     * 第二步:
     * 傳送訊息
     */
    @Test
    public void sendMessage(){

        //第一種.使用send 需要自己構造一個Message,定義訊息內容以及訊息頭
        //rabbitTemplate.send("exchange.fanout","fruit.apple",new Message("apple-message-made".getBytes(),null));

        //第二種.使用convertAndSend  第三個引數object預設當初訊息體,自動序列化傳送給rabbitmq
        HashMap<Object, Object> map = new HashMap<>();
        map.put("type","red orange");
        map.put("data", Arrays.asList(1,2,3));
        // convertAndSend 使用的轉換器是SimpleMessageConverter 會採用jdk序列方式,
        // 但往往大多數會想使用json方式,故多編寫一個配置類:AmqpConfig
        // 這時候再去http://192.168.9.219:15672/#/queues/%2F/fanout-queueX介面上Get messages時候檢視訊息就是json格式
        rabbitTemplate.convertAndSend("exchange.fanout","",map);

    }

}

按照步驟執行完上述程式碼,編寫MessageService類,並啟動專案。
MessageService.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;

@Service
public class MessageService {

    @RabbitListener(queues = "fanout-queue1")
    public void receiveQueue1Message(HashMap<String,Object> map){
        System.out.println("fanout-queue1 begin");
        Object  message = map.getOrDefault("data", "no message");
        System.out.println(message.toString());
        System.out.println("fanout-queue1 end");
    }

    @RabbitListener(queues = "fanout-queue2")
    public void receiveQueue2Message(HashMap<String,Object> map){
        System.out.println("fanout-queue2 begin");
        Object  message = map.getOrDefault("data", "no message");
        System.out.println(message.toString());
        System.out.println("fanout-queue2 end");
    }

    @RabbitListener(queues = "fanout-queue3")
    public void receiveQueue3Message(HashMap<String,Object> map){
        System.out.println("fanout-queue3 begin");
        Object  message = map.getOrDefault("data", "no message");
        System.out.println(message.toString());
        System.out.println("fanout-queue3 end");
    }


}