springboot簡易整合rabbitmq
阿新 • • 發佈:2018-11-08
寫在前面:本文采用rabbitmq環境是docker單節點。
專案地址:https://github.com/Blankwhiter/AMQP
一、搭建rabbitmq環境
在centos視窗中,執行如下命令拉取映象,以及建立容器:
docker pull rabbitmq:3.7-management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq-single rabbitmq:3.7-management
注:5672 -> 客戶端與rabbitmq的通訊埠 15672 -> 圖形化管理介面埠
建立容器後,在瀏覽器輸入http://192.168.10.170:15672
注:192.168.10.170是虛擬機器ip ,預設的username/password為guest/guest
二、springboot整合rabbitmq
1.template操作rabbitmq 收發訊息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>template</artifactId>
<version>0.0.1-SNAPSHOT</version>
< packaging>jar</packaging>
<name>template</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--引入序列化
由於本次測試沒有引入spring-boot-starter-web模組,故引入jackson-databind
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
<scope>compile</scope>
</dependency>
<!--引入amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
host: 192.168.9.219
port: 5672
username: guest
password: guest
AmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
測試類,讀者請按照步驟依次執行
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TemplateApplicationTests {
/**
* rabbitmq 給rabbitmq 收發訊息
*/
@Autowired
RabbitTemplate rabbitTemplate;
/**
* amqpTemplate 給amqp中介軟體 收發訊息, 推薦使用
*/
@Autowired
AmqpTemplate amqpTemplate;
/**
* 交換器、佇列管理
* amqp系統管理功能元件
*/
@Autowired
AmqpAdmin amqpAdmin;
/**
* 第一步:
* 使用amqpAdmin初始化。建立交換器,佇列,以及交換器繫結佇列
* 執行完成。訪問 http://192.168.9.219:15672/#/exchanges/%2F/exchange.direct 可以檢視具體資訊
*/
@Test
public void init() {
/**
* exchange 型別主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
* 具體區別更多詳情請檢視 https://blog.csdn.net/belonghuang157405/article/details/83184388 關於amqp相關內容
*/
//建立交換器 點對點模式
amqpAdmin.declareExchange(new DirectExchange("exchange.direct"));
//建立佇列 queue第二個引數:是否持久化
amqpAdmin.declareQueue(new Queue("direct-queue",true));
//交換器繫結佇列
amqpAdmin.declareBinding(new Binding("direct-queue", Binding.DestinationType.QUEUE,"exchange.direct","fruit.apple",null));
}
/**
* 第二步:
* 傳送訊息
*/
@Test
public void sendMessage(){
//第一種.使用send 需要自己構造一個Message,定義訊息內容以及訊息頭
//rabbitTemplate.send("exchange.direct","fruit.apple",new Message("apple-message-made".getBytes(),null));
//第二種.使用convertAndSend 第三個引數object預設當初訊息體,自動序列化傳送給rabbitmq
HashMap<Object, Object> map = new HashMap<>();
map.put("type","red apple");
map.put("data", Arrays.asList(1,2,3));
// convertAndSend 使用的轉換器是SimpleMessageConverter 會採用jdk序列方式,
// 但往往大多數會想使用json方式,故多編寫一個配置類:AmqpConfig
// 這時候再去http://192.168.9.219:15672/#/queues/%2F/direct-queue介面上Get messages時候檢視訊息就是json格式
rabbitTemplate.convertAndSend("exchange.direct","fruit.apple",map);
}
/**
* 第三步:
* 接收訊息
*/
@Test
public void receiveMessage(){
Object message = rabbitTemplate.receiveAndConvert("direct-queue");
System.out.println(message.getClass());
System.out.println(message.toString());
}
}
2.使用@RabbitListene註解,監聽訊息
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>annotation</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>annotation</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--引入amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
host: 192.168.9.219
port: 5672
username: guest
password: guest
AmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
測試類,讀者請按照步驟依次執行
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AnnotationApplicationTests {
/**
* rabbitmq 給rabbitmq 收發訊息
*/
@Autowired
RabbitTemplate rabbitTemplate;
/**
* amqpTemplate 給amqp中介軟體 收發訊息, 推薦使用
*/
@Autowired
AmqpTemplate amqpTemplate;
/**
* 交換器、佇列管理
* amqp系統管理功能元件
*/
@Autowired
AmqpAdmin amqpAdmin;
/**
* 第一步:
* 使用amqpAdmin初始化。建立交換器,佇列,以及交換器繫結佇列
* 執行完成。訪問 http://192.168.9.219:15672/#/exchanges/%2F/exchange.fanout 可以檢視具體資訊
*/
@Test
public void init() {
/**
* exchange 型別主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
* 具體區別更多詳情請檢視 https://blog.csdn.net/belonghuang157405/article/details/83184388 關於amqp相關內容
*/
//建立交換器 廣播模式
amqpAdmin.declareExchange(new FanoutExchange("exchange.fanout"));
//建立佇列 queue第二個引數:是否持久化
amqpAdmin.declareQueue(new Queue("fanout-queue1",true));
amqpAdmin.declareQueue(new Queue("fanout-queue2",true));
amqpAdmin.declareQueue(new Queue("fanout-queue3",true));
//交換器繫結佇列
amqpAdmin.declareBinding(new Binding("fanout-queue1", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-1",null));
amqpAdmin.declareBinding(new Binding("fanout-queue2", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-2",null));
amqpAdmin.declareBinding(new Binding("fanout-queue3", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-3",null));
}
/**
* 第二步:
* 傳送訊息
*/
@Test
public void sendMessage(){
//第一種.使用send 需要自己構造一個Message,定義訊息內容以及訊息頭
//rabbitTemplate.send("exchange.fanout","fruit.apple",new Message("apple-message-made".getBytes(),null));
//第二種.使用convertAndSend 第三個引數object預設當初訊息體,自動序列化傳送給rabbitmq
HashMap<Object, Object> map = new HashMap<>();
map.put("type","red orange");
map.put("data", Arrays.asList(1,2,3));
// convertAndSend 使用的轉換器是SimpleMessageConverter 會採用jdk序列方式,
// 但往往大多數會想使用json方式,故多編寫一個配置類:AmqpConfig
// 這時候再去http://192.168.9.219:15672/#/queues/%2F/fanout-queueX介面上Get messages時候檢視訊息就是json格式
rabbitTemplate.convertAndSend("exchange.fanout","",map);
}
}
按照步驟執行完上述程式碼,編寫MessageService類,並啟動專案。
MessageService.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Service
public class MessageService {
@RabbitListener(queues = "fanout-queue1")
public void receiveQueue1Message(HashMap<String,Object> map){
System.out.println("fanout-queue1 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue1 end");
}
@RabbitListener(queues = "fanout-queue2")
public void receiveQueue2Message(HashMap<String,Object> map){
System.out.println("fanout-queue2 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue2 end");
}
@RabbitListener(queues = "fanout-queue3")
public void receiveQueue3Message(HashMap<String,Object> map){
System.out.println("fanout-queue3 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue3 end");
}
}