RabbitMQ從在阿里雲安裝到在spring boot中如何使用這一篇就可以了
//某個服務的具體情況
ps -ef | grep XXX
//殺死程序
kill -9 程序ID,第一個
//檢視記憶體
free或者top
//檢視磁碟使用情況
df -l
//尋找檔案
find -name xxx
//檢視埠使用情況
netstat -an | grep 15672
RabbitMQ安裝:
1.更新
sudo agt-get update
2. 安裝erlang環境
yum install erlang
3. 安裝rabbitMQ環境
sudo apt-get install rabbitmq-server
4. 啟用RabbitMQWeb管理外掛
不啟用你的http://ip+15672訪問不到
rabbitmq-plugins enable rabbitmq_management
5. 常用命令
啟動、停止、重啟、狀態rabbitMq命令:
啟動:sudo rabbitmq-server start
關閉: sudo rabbitmq-server stop
重啟: sudo rabbitmq-server restart
檢視狀態:sudo rabbitmqctl status
6. 檢視啟動狀態
[email protected]:/# service rabbitmq-server status
● rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2018-11-08 20:15:25 CST; 7min ago
Process: 16987 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
Process: 17074 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=0/SUCCESS)
Main PID: 17073 (rabbitmq-server)
Tasks: 70
Memory: 39.6M
CPU: 1.538s
CGroup: /system.slice/rabbitmq-server.service
├─17073 /bin/sh /usr/sbin/rabbitmq-server
├─17084 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server
├─17171 /usr/lib/erlang/erts-7.3/bin/epmd -daemon
├─17211 /usr/lib/erlang/erts-7.3/bin/beam -W w -A 64 -P 1048576 -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/ra
├─17312 inet_gethost 4
└─17313 inet_gethost 4
Nov 08 20:15:23 yan systemd[1]: Stopped RabbitMQ Messaging Server.
Nov 08 20:15:23 yan systemd[1]: Starting RabbitMQ Messaging Server...
Nov 08 20:15:23 yan rabbitmq[17074]: Waiting for [email protected] ...
Nov 08 20:15:23 yan rabbitmq[17074]: pid is 17084 ...
Nov 08 20:15:25 yan systemd[1]: Started RabbitMQ Messaging Server.
lines 1-22/22 (END)
7 新增使用者
新增admin使用者,密碼設定為admin。
sudo rabbitmqctl add_user admin admin
賦予許可權
sudo rabbitmqctl set_user_tags admin administrator
賦予virtual host中所有資源的配置、寫、讀許可權以便管理其中的資源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
8. 訪問:
spring boot整合rabbitMQ
1、配置pom包,主要是新增spring-boot-starter-amqp的支援
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置檔案
配置rabbitmq的安裝地址、埠以及賬戶資訊
-
一定不要把埠號設定成15672,因為那個已經被佔用了,所以你新設定一個5672
-
同時新增到阿里雲伺服器安全組配置
spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密碼
重啟,並開啟http://ip+15672就能看到一個連線資訊了
如圖有一個admin連線上了
3. 新建一個rabbitMQ的包在你的專案下
新建如下幾個檔案
//佇列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}
// 傳送者
// rabbitTemplate是springboot 提供的預設實現
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "這是傳送的資訊 "+"---------------------" + new Date();
System.out.println("=============================");
System.out.println("Sender : " + context);
System.out.println("=============================");
this.rabbitTemplate.convertAndSend("hello", context);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("------------------1--------------------");
System.out.println("Receiver : " + hello);
System.out.println("-----------------1---------------------");
}
}
// 新建一個controller測試
import cn.nxcoder.blog.rabbit.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class RabbitController {
@Autowired
private HelloSender helloSender;
/*
一個生產者和一個消費者
*/
@PostMapping ("/rabbitHello")
@ResponseBody
public void hello( ) {
helloSender.send();
}
}
//用postman本地測試
http://localhost:8088/rabbitHello
//測試結果
=============================
Sender : 這是傳送的資訊 ---------------------Fri Nov 09 15:09:01 CST 2018
=============================
15:09:01.188 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:09:01.188 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - Result:null
------------------1--------------------
Receiver : 這是傳送的資訊 ---------------------Fri Nov 09 15:09:01 CST 2018
-----------------1---------------------
以上最簡單的MQ完成了
B: 一個生產者,多個消費者的情況
B:1
在我門的生產者裡面新家一個sendMsg方法,該方法需要傳參
看下面的,hello不能變,或者說是必須和你的消費者引用的名稱要一樣,這裡我就沒有改
this.rabbitTemplate.convertAndSend("hello", sendMsg);
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "這是傳送的資訊 "+"---------------------"
+ new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
public void sendMsg(String msg) {
String sendMsg = msg + new Date();
System.out.println("Sender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend("hello", sendMsg);
}
}
B:2
新加一個消費者,但是引用的還是hello
@Component
@RabbitListener(queues = "hello")
public class HelloReceiverTwo {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
再看原來的消費者1,他們的@RabbitListener(queues = “hello”)
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
B:3
在controller裡面新加一個一對多測試
@Controller
public class RabbitController {
@Autowired
private HelloSender helloSender;
/*
一個生產者和一個消費者
*/
@PostMapping ("/rabbitHello")
@ResponseBody
public void hello( ) {
helloSender.send();
}
/**
* 單生產者-多消費者
*/
@PostMapping("/oneToMany")
@ResponseBody
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender.sendMsg("這是第二個生產者傳送的訊息:==="+i+"====個");
}
}
}
B:4
postman測試
http://localhost:8088/oneToMany
B:5
測試結果:
Sender2 : 這是第二個生產者傳送的訊息:===0====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===1====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===2====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===3====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===4====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===5====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===6====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===7====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===8====個Fri Nov 09 15:33:33 CST 2018
Sender2 : 這是第二個生產者傳送的訊息:===9====個Fri Nov 09 15:33:33 CST 2018
15:33:33.224 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:33:33.224 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - Result:null
Receiver2 : 這是第二個生產者傳送的訊息:===1====個Fri Nov 09 15:33:33 CST 2018
Receiver : 這是第二個生產者傳送的訊息:===0====個Fri Nov 09 15:33:33 CST 2018
Receiver2 : 這是第二個生產者傳送的訊息:===3====個Fri Nov 09 15:33:33 CST 2018
Receiver : 這是第二個生產者傳送的訊息:===2====個Fri Nov 09 15:33:33 CST 2018
Receiver2 : 這是第二個生產者傳送的訊息:===5====個Fri Nov 09 15:33:33 CST 2018
Receiver : 這是第二個生產者傳送的訊息:===4====個Fri Nov 09 15:33:33 CST 2018
Receiver2 : 這是第二個生產者傳送的訊息:===7====個Fri Nov 09 15:33:33 CST 2018
Receiver : 這是第二個生產者傳送的訊息:===6====個Fri Nov 09 15:33:33 CST 2018
Receiver2 : 這是第二個生產者傳送的訊息:===9====個Fri Nov 09 15:33:33 CST 2018
Receiver : 這是第二個生產者傳送的訊息:===8====個Fri Nov 09 15:33:33 CST 2018
以上一對多的就完成了
C:多個消費者和多個生產者
-
在剛才我們實現了一個生產者和2個消費者
-
其實多對多更簡單,只需要基於一對多把生產者copy一份就可以了,裡面的東西不要變,換個名字
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSenderTwo {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "這是傳送的資訊 "+"
---------------------" + new Date();
System.out.println("生產者2_Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
public void sendMsg(String msg) {
String sendMsg = msg + new Date();
System.out.println("生產者2_Sender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend("hello", sendMsg);
}
}
controller新加一個方法
import cn.nxcoder.blog.rabbit.HelloSender;
import cn.nxcoder.blog.rabbit.HelloSenderTwo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class RabbitController {
@Autowired
private HelloSender helloSender;
@Autowired
private HelloSenderTwo helloSenderTwo;
/*
一個生產者和一個消費者
*/
@PostMapping ("/rabbitHello")
@ResponseBody
public void hello( ) {
helloSender.send();
}
/**
* 單生產者-多消費者
*/
@PostMapping("/oneToMany")
@ResponseBody
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender.sendMsg("這是第二個生產者傳送的訊息:==="+i+"====個");
}
}
/**
* 多生產者-多消費者
*/
@PostMapping("/manyToMany")
@ResponseBody
public void manyToMany() {
for(int i=0;i<10;i++){
helloSender.sendMsg("hellomsg:"+i+" ");
helloSenderTwo.sendMsg("hellomsg:"+i+" ");
}
}
}
// postman測試
http://localhost:8088/manyToMany
//測試結果
Sender2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018
Sender2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018
生產者2_Sender2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018
15:49:42.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
15:49:42.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - Result:null
Receiver : hellomsg:0 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:1 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:2 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:3 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:4 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:5 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:6 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:7 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:8 Fri Nov 09 15:49:42 CST 2018
Receiver : hellomsg:9 Fri Nov 09 15:49:42 CST 2018
Receiver2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018
C: 用實體類傳送訊息佇列
大部分的情況是資料是用物件封裝的,所以我們來測試一下實體類
C.1 新建一個實體類並實現序列化介面(必須)
springboot完美的支援物件的傳送和接收,不需要格外的配置。
實體類(必須實現序列化介面):
public class RabbitTest implements Serializable {
private String name;
private String pass;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
}
C.2 在我們的生產者裡面新建一個方法
需要更改一下他的名字
this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);
原來的生產者變成這樣;
import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "這是傳送的資訊 "+"---------------------" + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
public void sendMsg(String msg) {
String sendMsg = msg + new Date();
System.out.println("Sender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend("hello", sendMsg);
}
public void sendEntity() {
RabbitTest rabbitTest =new RabbitTest();
rabbitTest.setName("琬琬");
rabbitTest.setPass("123456987");
this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);
}
}
C3 新建一個消費者指定他的名字為entityQueue
必須新建一個消費者,因為一個消費者只能有一個名字,剛才我們新家的消費者名字都是hello
現在我們給他定義一個新的名字entityQueue
import cn.nxcoder.blog.entity.RabbitTest;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "entityQueue")
public class EntityReceiver {
@RabbitHandler
public void process(RabbitTest rabbitTest) {
System.out.println("rabbitTest receive : " +
rabbitTest.getName()+"/"+rabbitTest.getPass());
}
}
注意:這樣繼續下去時會報錯的
因為你一旦新定義一個名字,就必須往config檔案中新增這個名字
現在我們的配置類多了一個entityQueue
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
@Bean
public Queue entityQueue() {
return new Queue("entityQueue");
}
}
C4 controller測試
/**
* 實體類傳輸測試
*/
@PostMapping("/entityTest")
@ResponseBody
public void userTest() {
helloSender.sendEntity();
}
C5 postman測試
//測試結果
rabbitTest receive : 琬琬/123456987
topic ExChange 示例
-
topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列
-
首先對topic規則配置,這裡使用兩個佇列來測試
-
(也就是在Application類中建立和繫結的topic.message1和topic.message2兩個佇列)
其中topic.message的bindting_key為
“topic.message1”,topic.message2的binding_key為“topic.#”;
1. D 現在我們的config檔案中新增如下記錄
//===============以下是驗證topic Exchange的佇列==========
@Bean
public Queue queueMessage() {
return new Queue("topic.message1");
}
@Bean
public Queue queueMessages() {
return new Queue("topic.message2");
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
/**
* 將佇列topic.messages與exchange繫結,binding_key為topic.#,模糊匹配
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
2. D 在我們的生產者裡面新增如下方法:
3. D 新增兩個消費者
@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topic.messageReceiver1 : " +msg);
}
}
@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {
@RabbitHandler
public void process(String msg) {
System.out.println("topic.messageReceiver2 : " +msg);
}
}
4. D controller測試
/**
* topic exchange型別rabbitmq測試
*/
@PostMapping("/topicTest")
@ResponseBody
public void topicTest() {
helloSender.sendTopic();
}
測試:
http://localhost:8088/topicTest
//結果
sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
16:50:10.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
16:50:10.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - Result:null
topic.messageReceiver2 : I am topic.mesaage msg======
topic.messageReceiver2 : I am topic.mesaages msg########
6、fanout ExChange示例
Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。
這裡使用三個佇列來測試(也就是在config類中建立和繫結的fanout.A、fanout.B、fanout.C)這三個佇列都和config中建立的fanoutExchange轉發器繫結。
6.1 新增config檔案
//===============以下是驗證Fanout Exchange的佇列==========
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
//===============以上是驗證Fanout Exchange的佇列==========
6.2 新增生產者方法
public void sendFanout() {
String msgString="fanoutSender :hello i am hzb";
System.out.println(msgString);
this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);
}
6.3 新增三個消費者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
6.4 controller方法
/**
* fanout exchange型別rabbitmq測試
*/
@PostMapping("/fanoutTest")
@ResponseBody
public void fanoutTest() {
helloSender.sendFanout();
}
6.5 測試
http://localhost:8088/fanoutTest
//結果
fanoutSender :hello i am hzb
17:35:14.911 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======執行方法後,執行該方法======
17:35:14.911 [http-nio-8088-exec-1] INFO
c.n.blog.handler.LogAspectHandler - Result:null
FanoutReceiverB : fanoutSender :hello i am hzb
FanoutReceiverA : fanoutSender :hello i am hzb
FanoutReceiverC : fanoutSender :hello i am hzb
6.6 結果分析:
由以上結果可知:就算fanoutSender傳送訊息的時候,指定了routing_key為"abcd.ee",但是所有接收者都接受到了訊息
7、帶callback的訊息傳送
增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。該示例中沒有新建佇列和exchange,用的是第5節中的topic.messages佇列和exchange轉發器。消費者也是第5節中的topicMessagesReceiver
7.1 在application.properties中新增一些資訊
新增;
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
//現在變為:
spring.application.name=Spring-boot-rabbitmq
spring.rabbitmq.host=101.200.55.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=你的密碼
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
7.2 新增config檔案
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
public class RabbitConfig2 {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses+":"+port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要進行訊息回撥,則這裡必須要設定為true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplatenew() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
7.3 新增生產者類:
import java.util.Date;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CallBackSender implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplatenew;
public void send() {
rabbitTemplatenew.setConfirmCallback(this);
String msg="callbackSender : i am callback sender";
System.out.println(msg );
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("callbackSender UUID: " + correlationData.getId());
this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO Auto-generated method stub
System.out.println("callbakck confirm: " + correlationData.getId());
}
}
7.4 用原來topic的消費者類,這裡再貼一次
@Component
@RabbitListener(queues = "topic.message1")
public class topicMessageReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topic.messageReceiver1 : " +msg);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.message2")
public class topicMessageReceiverTwo {
@RabbitHandler
public void process(String msg) {
System.out.println("topic.messageReceiver2 : " +msg);
}
}
7.5 controller測試
@PostMapping("/callback")
@ResponseBody
public void callbak() {
callBackSender.send();
}
http://localhost:8088/callback
//測試結果
callbackSender : i am callback sender
callbackSender UUID: 48be7d7e-69f8-4d9c-b264-191402dec3de
callbakck confirm: 48be7d7e-69f8-4d9c-b264-191402dec3de
topic.messageReceiver2 : callbackSender : i am callback sender
7.6 結果分析
從上面可以看出callbackSender發出的UUID,收到了迴應,又傳回來了。
到此,rabbitMQ先分析到這裡,接下來我們會用它做些高階的功能
有問題的可以聯絡
[email protected]