1. 程式人生 > 實用技巧 >RabbitMQ整合SpringBoot2.x

RabbitMQ整合SpringBoot2.x

RabbitMQ整合SpringBoot2.x

1、引入依賴及配置

引入rabbitmq依賴

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置檔案application.yml

spring:
application:
name: rabbimq-springboot
rabbitmq:
host: 安裝rabbit的主機
port: 執行埠
username: 登入使用者
password: 登入密碼
virtual-host: 虛擬主機(可在圖形介面建立,/開頭)

RabbiTemplate 用於簡化操作rabbitmq,直接注入即可。

2、使用RabbiTemplate 操作rabbitmq

注入rabbitTemplate

// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

1)直連模型

直連,不需要經過交換機,生產者直接將訊息放進佇列中

生產者:(生產者執行不會直接建立佇列,必須先有消費者才會自動建立不存在的佇列)

@Test
void test() {
/**
* 引數說明:
* 引數1:佇列名稱
* 引數2:發生訊息,可直接發生物件
*/
rabbitTemplate.convertAndSend("hello","hello rabbitMq");
}

消費者:

@RabbitListener(queuesToDeclare = @Queue("hello"))宣告監聽的佇列

@RabbitHandler代表隊列中取出的訊息的回撥函式

@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

@RabbitHandler
public void raceivel(String message){
System.out.println("massage = "+message);
}
}

需要設定佇列屬性時在消費者端宣告佇列時指定

@RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true",autoDelete = "false",exclusive = "false"))

2)工作佇列

同樣不需要通過交換機,多個消費者

與Hello模型基本一致,多個消費者會平均消費訊息。存在問題:兩個消費者處理速度不同時,任然是平均分配。

@RabbitListener(queuesToDeclare = @Queue("hello"))宣告監聽的佇列註解直接註解在方法上時可以不用@RabbitHandler

生產者:

@Test
void testWork() {
/**
* 引數說明:
* 引數1:佇列名稱
* 引數2:發生訊息,可直接發生物件
*/
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","workQuque "+i);
}
}

消費者:

@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void work1(String massage){
System.out.println("work1 :massage = "+massage);
}

@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void work2(String massage){
System.out.println("work2 :massage = "+massage);
}

執行結果

3)釋出訂閱(廣播模型)

該模式生產者生產的訊息會廣播給所有消費者

生產者:

@Test
void testfanout() {
/**
* 引數說明:
* 引數1:交換機名稱
* 引數2:路由key(該模式下該引數無意義)
* 引數3:傳送的訊息
*/
rabbitTemplate.convertAndSend("log","","fanout 模型發生訊息");
}

消費者:

消費者需生成臨時佇列,並繫結佇列與路由交換機的關係

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "log",type = "fanout")
)
})
public void fanout1(String massage){
System.out.println("fanout1 massage = "+massage);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不指定名稱,生成臨時佇列
exchange = @Exchange(value = "log",type = "fanout") //交換機資訊
)
})
public void fanout2(String massage){
System.out.println("fanout2 massage = "+massage);
}

執行結果:

兩個消費者繫結不同的臨時佇列,消費了相同的訊息。

4)路由模式

與釋出訂閱不同的是可以指定不同的路由key,可指定特定想消費者消費。

消費者:

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info","error"}
)
})
public void route1(String massage){
System.out.println("route1 massage = "+massage);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不指定名稱,生成臨時佇列
exchange = @Exchange(value = "direct",type = "direct") ,//交換機資訊,
key = {"error"}
)
})
public void route2(String massage){
System.out.println("route2 massage = "+massage);
}

生產者:

@Test
void testRoute() {
/**
* 引數說明:
* 引數1:交換機名稱
* 引數2:路由key 用於指定哪些消費者可以消費
* 引數3:傳送的訊息
*/
rabbitTemplate.convertAndSend("direct","info","route 模型發生info訊息");
}

輸出:

由於生產者發生時指定路由key為info,只有繫結key為info的消費者可以消費

修改生產者

@Test
void testRoute() {
/**
* 引數說明:
* 引數1:交換機名稱
* 引數2:路由key 用於指定哪些消費者可以消費
* 引數3:傳送的訊息
*/
rabbitTemplate.convertAndSend("direct","error","route 模型發生error訊息");
}

執行結果:

由於兩個消費者都綁定了error可以,則兩個消費者都能消費error訊息

5)Topics模型

與路由模式基本相同,唯一不同是可以使用萬用字元指定消費者繫結的路由key。

萬用字元

  • #代表0個或多個單詞

  • *代表一個單詞

消費者:

@Component
public class TopicCusomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic",type = "topic"),
key = {"user.#"}
)
})
public void topic1(String massage){
System.out.println("topic1 massage = "+massage);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不指定名稱,生成臨時佇列
exchange = @Exchange(value = "topic",type = "topic") ,//交換機資訊,
key = {"user.*"}
)
})
public void topic2(String massage){
System.out.println("topic2 massage = "+massage);
}
}

生產者:

@Test
void testTopic() {
/**
* 引數說明:
* 引數1:交換機名稱
* 引數2:路由key 用於指定哪些消費者可以消費
* 引數3:傳送的訊息
*/
rabbitTemplate.convertAndSend("topic","user.order","topic 模型發生user.order訊息");
}

結果:

兩個消費者都消費了訊息

修改生產者:

@Test
void testTopic() {
/**
* 引數說明:
* 引數1:交換機名稱
* 引數2:路由key 用於指定哪些消費者可以消費
* 引數3:傳送的訊息
*/
rabbitTemplate.convertAndSend("topic","user.order.info","topic 模型發生user.order訊息");
}

結果:

由於*匹配一個單詞,消費者2不能消費user.order.info