03訊息佇列系列-RabbitMQ在Spring boot中的使用
03訊息佇列系列-RabbitMQ在Spring boot中的使用
一、專案簡介
建立兩個專案,一個是rabbitmq-consumer 訊息消費者,另一個是rabbitmq-provider訊息傳送者。基於訊息的傳送接收去了解直連型交換機、主題交換機、扇形交換機等交換機的使用和訊息確認的使用。
二、專案實戰
2.1 用到的依賴
implementation 'org.springframework.boot:spring-boot-starter-amqp'
2.2 RabbitMQ在application.properties裡的配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2.3 Direct Exchange直連型交換機的使用
2.3.1 建立rabbitmq-provider
2.3.1.1 佇列、交換機、繫結資訊的配置
建立DirectRabbitConfig
類檔案,來配置佇列、交換機、以及繫結資訊
package com.lucky.spring.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by zhangdd on 2020/9/16 */ @Configuration public class DirectRabbitConfig { /** * 佇列配置 * @return */ @Bean public Queue directQueue() { // durable:是否持久化,預設是false,持久化佇列:會被儲存在磁碟上,當訊息代理重啟時仍然存在,暫存佇列:當前連線有效 // exclusive:預設也是false,只能被當前建立的連線使用,而且當連線關閉後佇列即被刪除。此參考優先順序高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此佇列,該佇列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設定一下佇列的持久化就好,其餘兩個就是預設false return new Queue("directQueue", true); } @Bean public DirectExchange directExchange() { //durable:是否持久化 //autoDelete:是否自動刪除 return new DirectExchange("directExchange", true, false); } @Bean public Binding directBinding(){ return BindingBuilder.bind(directQueue()).to(directExchange()) .with("directRouting"); } }
2.3.1.2 暴露傳送訊息介面
package com.lucky.spring.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * Created by zhangdd on 2020/9/16 */ @RestController public class SendDirectMsgController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMsg") public void sendDirectMsg() { String uuid = String.valueOf(UUID.randomUUID()); String msg = "test message, hello!"; String createTime = new Date().toString(); Map<String,Object> map=new HashMap<>(); map.put("messageId",uuid); map.put("messageData",msg); map.put("createTime",createTime); //將訊息攜帶繫結鍵值:directRouting 傳送到交換機 directExchange rabbitTemplate.convertAndSend("directExchange", "directRouting", map); } }
這裡對外提供一個介面,用來觸發傳送訊息。呼叫介面之後,在RabbitMQ管理臺的Exchange面板可以看到配置的交換機directExchange以及在佇列面板可以看到建立的佇列directQueue以及兩者的繫結關係。
可以看到訊息已經到達佇列裡了。
2.3.2 建立rabbitmq-consumer
如果是一個單獨的訊息消費者,那麼只需要建立訊息接收監聽即可。如果既是訊息傳送者同時又是訊息接收者,那就需要建立接收訊息監聽的同時也需要交換機的配置、佇列的配置等資訊。這裡僅以接收訊息說明。
2.3.2.1 建立訊息接收監聽
package com.lucky.spring.receiver.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/19
*/
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiver {
@RabbitHandler
public void process(Map msg) {
System.out.println("DirectReceiver:收到的訊息是:" + msg.toString());
}
}
2.3.2.2 接收訊息
啟動rabbitmq-consumer專案,可以看到把之前推送的那條訊息消費了下來。可以再次呼叫rabbitmq-provider裡的訊息推送介面,可以看到消費者是及時的接收到了訊息。
2.3.3 多個監聽繫結同一個佇列
既然直連交換機是一對一的,如果有多個監聽繫結到同一個監聽佇列,會怎麼樣。這裡在創建出兩個 訊息接收監聽 DirectReceiverA
、DirectReceiverB
。
package com.lucky.spring.receiver.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/19
*/
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiverA {
@RabbitHandler
public void process(Map msg) {
System.out.println("DirectReceiverA:收到的訊息是:" + msg.toString());
}
}
package com.lucky.spring.receiver.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/19
*/
@Component
@RabbitListener(queues = "directQueue")//配置要監聽的佇列的名字
public class DirectReceiverB {
@RabbitHandler
public void process(Map msg) {
System.out.println("DirectReceiverB:收到的訊息是:" + msg.toString());
}
}
呼叫rabbitmq-provider的訊息推送介面,控制檯日誌資訊如下:
DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:29:57 CST 2020, messageId=7ffd1517-b3bc-4e72-8355-6b58de4c3277, messageData=test message, hello!}
DirectReceiverA:收到的訊息是:{createTime=Sun Sep 27 08:29:59 CST 2020, messageId=2f296ca3-4295-418e-9592-d5d3c90d373a, messageData=test message, hello!}
DirectReceiverB:收到的訊息是:{createTime=Sun Sep 27 08:30:00 CST 2020, messageId=759b34c9-98de-4aad-8bc5-fdac2ec5b5ed, messageData=test message, hello!}
DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:30:07 CST 2020, messageId=bb276692-cf14-405f-a91a-76df7b6e2fca, messageData=test message, hello!}
DirectReceiverA:收到的訊息是:{createTime=Sun Sep 27 08:30:13 CST 2020, messageId=15381f29-5769-442d-bece-e5acec33f478, messageData=test message, hello!}
DirectReceiverB:收到的訊息是:{createTime=Sun Sep 27 08:30:13 CST 2020, messageId=0dd855f1-f9ef-45f6-9883-c65d6480f0aa, messageData=test message, hello!}
DirectReceiver:收到的訊息是:{createTime=Sun Sep 27 08:30:15 CST 2020, messageId=6f902f7c-ff61-4a3b-a6f6-28e428ca8b43, messageData=test message, hello!}
可以看到是實現了輪詢的方式對訊息進行消費,而且不存在重複消費。
2.4 Topic Exchange 主題交換機的使用
2.4.1 rabbitmq-provider端配置
2.4.1.1 佇列、交換機、繫結資訊的配置
建立 TopicRabbitConfig
類進行交換機、佇列、繫結資訊的配置。
package com.lucky.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/9/19
*/
@Configuration
public class TopicRabbitConfig {
//繫結鍵
public final static String MAN = "topic.man";
public final static String WOMAN = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(MAN);
}
@Bean
public Queue secondQueue() {
return new Queue(WOMAN);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
* 將 firstQueue與topicExchange繫結,而且繫結的鍵值為topic.man
* 這樣只要是訊息 攜帶的路由鍵是 topic.man 才會分發到該佇列
* @return
*/
@Bean
public Binding bindingExchange1() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
}
/**
* 將 secondQueue與topicExchange繫結,而且繫結的鍵值為用上通配路由鍵規則topic.#
* 這樣只要是訊息 攜帶的路由鍵是 topic. 開頭都會分發到該佇列
* @return
*/
@Bean
public Binding bindingExchange2() {
return BindingBuilder.bind(secondQueue()).
to(exchange())
.with("topic.#");
}
}
2.4.1.2 新增訊息推送介面
package com.lucky.spring.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Created by zhangdd on 2020/9/20
*/
@RestController
public class SendTopicMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = new Date().toString();
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = new Date().toString();
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
}
2.4.2 rabbitmq-consumer端配置
2.4.2.1 建立訊息接收監聽類
建立訊息接收監聽類,TopicManReceiver
、TopicTotalReceiver
package com.lucky.spring.receiver.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map msg) {
System.out.println("TopicManReceiver消費者收到訊息 : " +
msg.toString());
}
}
package com.lucky.spring.receiver.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver消費者收到訊息 : " +
testMessage.toString());
}
}
2.4.2.2 推送訊息
- 呼叫/sendTopicMessage1介面:
在rabbitmq-consumer端 控制檯輸出資訊如下:
TopicManReceiver消費者收到訊息 : {createTime=Sun Sep 27 08:41:18 CST 2020, messageId=b949f9fd-0c9e-412d-a55f-9476d2427e3b, messageData=message: M A N }
TopicTotalReceiver消費者收到訊息 : {createTime=Sun Sep 27 08:41:18 CST 2020, messageId=b949f9fd-0c9e-412d-a55f-9476d2427e3b, messageData=message: M A N }
TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#
當前推送的訊息,攜帶的路由鍵為:topic.man,所以可以看到兩個監聽消費者receiver都成功消費到了訊息,因為這兩個recevier監聽的佇列的繫結鍵都能與這條訊息攜帶的路由鍵匹配上。
- 呼叫/sendTopicMessage2介面
在rabbitmq-consumer端 控制檯輸出資訊如下:
TopicTotalReceiver消費者收到訊息 : {createTime=Sun Sep 27 08:44:32 CST 2020, messageId=47c434ee-3b6d-4f52-bb75-2482a1dc5893, messageData=message: woman is all }
TopicManReceiver監聽佇列1,繫結鍵為:topic.man
TopicTotalReceiver監聽佇列2,繫結鍵為:topic.#
當前推送的訊息,攜帶的路由鍵為:topic.woman,所以可以看到兩個監聽消費者只有TopicTotalReceiver成功消費到了訊息,因為這個時候只有TopicTotalReceiver監聽的佇列繫結的鍵才能與當前訊息攜帶的路由鍵匹配上。
2.5 Fanout Exchange 扇形交換機的使用
2.5.1 rabbitmq-provider端的配置
2.5.1.1 交換機、佇列、繫結等資訊的配置
建立FanoutRabbitConfig
類對扇形交換機的佇列、交換機、繫結等資訊配置。
package com.lucky.spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/9/20
*/
@Configuration
public class FanoutRabbitConfig {
/**
* 建立三個佇列 :fanout.A fanout.B fanout.C
* 將三個佇列都繫結在交換機 fanoutExchange 上
* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA())
.to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB())
.to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC())
.to(fanoutExchange());
}
}
2.5.1.2 新增訊息推送介面
package com.lucky.spring.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Created by zhangdd on 2020/9/20
*/
@RestController
public class SendFanoutMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = new Date().toString();
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
}
2.5.2 rabbitmq-consumer 端配置
2.5.2.1 建立訊息消費監聽類
這裡建立三個 監聽類,主要是基於扇形交換機無需路由鍵配置。
package com.lucky.spring.receiver.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutAReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消費者收到訊息 : "
+ testMessage.toString());
}
}
package com.lucky.spring.receiver.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutBReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverB消費者收到訊息 : " +
testMessage.toString());
}
}
package com.lucky.spring.receiver.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutCReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverC消費者收到訊息 : " +
testMessage.toString());
}
}
2.5.2.2 推送訊息
- 呼叫/sendFanoutMessage介面傳送訊息
rabbitmq-consumer端控制檯日誌資訊如下:
FanoutReceiverA消費者收到訊息 : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }
FanoutReceiverB消費者收到訊息 : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }
FanoutReceiverC消費者收到訊息 : {createTime=Sun Sep 27 12:48:25 CST 2020, messageId=dc5e48bf-d800-4791-8f34-2d6ab8056669, messageData=message: testFanoutMessage }
可以看到只要傳送到fanoutExchange 這個扇型交換機的訊息, 三個佇列都繫結這個交換機,所以三個訊息接收類都監聽到了這條訊息。
2.6訊息確認
訊息的回撥即訊息確認(生產者推送訊息成功,消費者接收訊息成功)。
2.6.1 生產者推送訊息確認
2.6.1.1 新增application.properties配置
#確認訊息已傳送到交換機(Exchange)
spring.rabbitmq.publisher-confirms=true
#確認訊息已傳送到佇列(Queue)
spring.rabbitmq.publisher-returns=true
2.6.1.2 配置訊息確認回撥函式
package com.lucky.spring.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/9/20
*/
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory
connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
//設定開啟Mandatory,才能觸發回撥函式,無論訊息推送結果怎麼樣都強制呼叫回撥函式
template.setMandatory(true);
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相關資料:"+correlationData);
System.out.println("ConfirmCallback: "+"確認情況:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"訊息:"+message);
System.out.println("ReturnCallback: "+"迴應碼:"+replyCode);
System.out.println("ReturnCallback: "+"迴應資訊:"+replyText);
System.out.println("ReturnCallback: "+"交換機:"+exchange);
System.out.println("ReturnCallback: "+"路由鍵:"+routingKey);
}
});
return template;
}
}
上面就是生產者推送訊息的相關回調函式配置,一個是ConfirmCallback
另一個是ReturnCallback
:
先從總體的情況分析,推送訊息存在四種情況:
①訊息推送到server,但是在server裡找不到交換機
②訊息推送到server,找到交換機了,但是沒找到佇列
③訊息推送到sever,交換機和佇列啥都沒找到
④訊息推送成功
這裡先寫幾個介面來分別測試和認證下以上4種情況,訊息確認觸發回撥函式的情況:
①訊息推送到server,但是在server裡找不到交換機
寫個測試介面,把訊息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有建立沒有配置的):
@GetMapping("/testMessageAck")
public String TestMessageAck() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: non-existent-exchange test message ";
String createTime = new Date().toString();
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "ok";
}
呼叫介面,檢視rabbitmq-provider專案的控制檯輸出情況:
ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
2020-09-27 14:02:47.167 ERROR 67934 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)
結論: ①這種情況觸發的是ConfirmCallback 回撥函式。
**②訊息推送到server,找到交換機了,但是沒找到佇列**
這種情況就是需要新增一個交換機,但是不給這個交換機繫結佇列,新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何繫結配置操作
package com.lucky.spring.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/9/20
*/
@Configuration
public class MsgCallbackRabbitConfig {
/**
* 新增一個交換機,但是不給這個交換機繫結佇列
* @return
*/
@Bean
public DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
然後寫個測試介面,把訊息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何佇列配置的):
@GetMapping("/testMessageAck2")
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = new Date().toString();
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "ok";
}
呼叫介面,檢視rabbitmq-provuder專案的控制檯輸出情況:
ReturnCallback: 訊息:(Body:'{createTime=Sun Sep 27 14:08:43 CST 2020, messageId=9009ed23-0329-41fc-9d2a-77bb7ca67680, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 迴應碼:312
ReturnCallback: 迴應資訊:NO_ROUTE
ReturnCallback: 交換機:lonelyDirectExchange
ReturnCallback: 路由鍵:TestDirectRouting
ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null
可以看到這種情況,兩個函式都被呼叫了;
這種情況下,訊息是推送成功到伺服器了的,所以ConfirmCallback對訊息確認情況是true;
而在RetrunCallback回撥函式的列印引數裡面可以看到,訊息是推送到了交換機成功了,但是在路由分發給佇列的時候,找不到佇列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是ConfirmCallback和RetrunCallback兩個回撥函式。
③訊息推送到sever,交換機和佇列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回撥是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是ConfirmCallback 回撥函式。
④訊息推送成功
那麼測試下,按照正常呼叫之前訊息推送的介面就行,就呼叫下 /sendFanoutMessage介面,可以看到控制檯輸出:
ConfirmCallback: 相關資料:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null
結論: ④這種情況觸發的是ConfirmCallback 回撥函式。
2.6.2 消費者接收訊息確認機制
和生產者的訊息確認機制不同,因為訊息接收本來就是在監聽訊息,符合條件的訊息就會消費下來。所以訊息接收的確認機制主要存在三種模式。
- 自動確認:這也是預設的訊息確認情況,AcknowledgeMode.NONE
- RabbitMQ成功將訊息傳送(即將訊息寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞。
- 所以這種情況如果消費端消費邏輯跑出異常,也就是消費端沒有處理成功這條訊息,那麼就相當於丟失了訊息。一般這種情況下我們都是使用try catch捕獲異常後,列印日誌用於追蹤資料,這樣找出對應資料在做後續處理
- 根據情況確認,這個不做介紹
- 手動確認,這個比較關鍵,也是我們配置接收訊息確認機制時,多數選擇的模式
消費者收到訊息後,手動呼叫basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些訊息後,才認為本次投遞成功。
- basic.ack:用於肯定確認
- basic.nack:用於否定確認
- basic.reject:用於否定確認,但與basic.nack相比有一個限制:basic.reject一次只能拒絕單條訊息
手動確認模式下,消費者端對於以上三個方法都表示訊息已經被正確投遞,但是basic.ack表示訊息已經被正確處理,而basic.nack和basic.reject表示沒有被正確處理。
2.6.2.1 basic.reject模式
著重講下reject,因為有時候一些場景是需要重新入佇列的。channel.basicReject(deliveryTag,true),拒絕消費當前訊息,如果第二個引數傳入true,就是表示將資料重新丟回佇列裡,那麼下次還會消費這條訊息。設定false,就是告訴伺服器,我已經知道這條訊息資料了,因為一些原因拒絕他,而且服務棄也把它丟掉就行了,下次不想在消費這條訊息了。
使用拒絕後重新入佇列這個模式要謹慎,因為一般都是出現異常的時候,catch異常時選擇是否重入佇列,但是如果使用不當會導致一些被重入佇列的訊息一直消費:異常->重入佇列->接收訊息處理->異常。這樣迴圈會導致訊息積壓。
2.6.2.2 basic.nack模式
對於basic.nack而言,這個也是設定不消費某條訊息.channel.basicNack(deliveryTag,false,true)
第一個引數依然是當前訊息資料的唯一ID
第二個引數是指是否針對多條訊息,如果是true,也就是說一次性針對當前通道的訊息tagID小於當前這小訊息的都拒絕確認。
第三個引數是指是否重新入佇列,也就是指不確認的訊息是否重新丟回到佇列裡面去
2.6.2.3 消費端配置訊息確認
首先建立MyAckReceiver
類即手動確認訊息監聽類,需要實現ChannelAwareMessageListener
介面:
package com.lucky.spring.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Created by zhangdd on 2020/9/20
*/
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//因為傳遞訊息的時候用的map傳遞,所以將Map從Message內取出需要做些處理
String msg = message.toString();
String[] msgArray = msg.split("'");//可以點進Message裡面看原始碼,單引號直接的資料就是我們的map訊息資料
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(), 3);
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
System.out.println(" MyAckReceiver messageId:" + messageId + " messageData:" + messageData + " createTime:" + createTime);
System.out.println("消費的主題訊息來自:" + message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true);
// channel.basicReject(deliveryTag, true);//為true會重新放回佇列
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
//{key=value,key=value,key=value} 格式轉換成map
private Map<String, String> mapStringToMap(String str, int entryNum) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",", entryNum);
Map<String, String> map = new HashMap<String, String>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
接著建立MessageListenerConfig新增相關的配置程式碼:
package com.lucky.spring.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/9/20
*/
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
//訊息接收處理類
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ預設是自動確認,這裡改為手動確認訊息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設定一個佇列
container.setQueueNames("directQueue");
//設定 手動確認訊息監聽類
container.setMessageListener(myAckReceiver);
return container;
}
}
因為上面配置的是directQueue這個佇列作為示例,所以呼叫/sendDirectMsg
介面進行檢視結果:
MyAckReceiver messageId:b7b95a08-2354-4d90-9f21-376b5bcc1fef messageData:test message, hello! createTime:Mon Sep 28 08:41:06 CST 2020
消費的主題訊息來自:directQueue
可以看到監聽器正常的消費的訊息。
到此RabbitMQ在Spring boot中的簡單使用就結束了。