04訊息佇列系列-RabbitMQ 利用MQ實現事物補償
04訊息佇列系列-RabbitMQ 利用MQ實現事物補償
一、介紹
本篇使用SpringBoot整合RabbitMQ,為後續業務處理開發做鋪墊。
二、整合實戰
2.1 建立一個gradle專案,引入amqp依賴
implementation 'org.springframework.boot:spring-boot-starter-amqp'
2.2 在application.properties檔案裡新增RabbitMQ的配置資訊
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
2.3 編寫RabbitUtil工具類
package com.lucky.spring.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import java.util.Map; /** * Created by zhangdd on 2020/10/7 */ public class RabbitUtil { private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class); @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; /** * 建立 Exchange * * @param exchangeType * @param exchangeName */ public void addExchange(String exchangeType, String exchangeName) { Exchange exchange = createExchange(exchangeType, exchangeName); rabbitAdmin.declareExchange(exchange); } /** * 刪除一個Exchange * * @param exchangeName * @return */ public boolean deleteExchange(String exchangeName) { return rabbitAdmin.deleteExchange(exchangeName); } /** * 建立一個指定的Queue * * @param queueName */ public void addQueue(String queueName) { Queue queue = createQueue(queueName); rabbitAdmin.declareQueue(queue); } /** * 刪除一個 queue * * @param queueName * @return */ public boolean deleteQueue(String queueName) { return rabbitAdmin.deleteQueue(queueName); } /** * 按照篩選條件,刪除佇列 * * @param queueName * @param unused * @param empty */ public void deleteQueue(String queueName, boolean unused, boolean empty) { rabbitAdmin.deleteQueue(queueName, unused, empty); } /** * 清空某個佇列中的訊息,注意,清空的訊息並沒有被消費 * * @param queueName */ public void purgeQueue(String queueName) { rabbitAdmin.purgeQueue(queueName, false); } /** * 判斷指定的佇列是否存在 * * @param queueName * @return */ public boolean existQueue(String queueName) { return rabbitAdmin.getQueueProperties(queueName) == null ? false : true; } /** * 繫結一個佇列到一個匹配型交換器使用一個routingKey * * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers EADERS模式型別設定,其他模式型別傳空 */ public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers) { Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); rabbitAdmin.declareBinding(binding); } /** * 宣告繫結 * * @param binding */ public void addBinding(Binding binding) { rabbitAdmin.declareBinding(binding); } /** * 解除交換器和佇列的繫結 * * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers */ public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers) { Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); removeBinding(binding); } /** * 解除交換器與佇列的繫結 * * @param binding */ public void removeBinding(Binding binding) { rabbitAdmin.removeBinding(binding); } /** * create a exchange,queue and bind queue at the same time * * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers */ public void addExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers) { //宣告交換器 addExchange(exchangeType, exchangeName); //declare queue addQueue(queueName); //declare relationship of binding addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers); } /** * send message * * @param exchange * @param routingKey * @param object */ public void convertAndSend(String exchange, String routingKey, final Object object) { rabbitTemplate.convertAndSend(exchange, routingKey, object); } /** * switch message object * * @param messageType * @param msg * @return */ public Message getMessage(String messageType, Object msg) { MessageProperties properties = new MessageProperties(); properties.setContentType(messageType); Message message = new Message(msg.toString().getBytes(), properties); return message; } /** * declare exchange * * @param exchangeType * @param exchangeName * @return */ private Exchange createExchange(String exchangeType, String exchangeName) { if (ExchangeTypes.DIRECT.equals(exchangeType)) { return new DirectExchange(exchangeName); } if (ExchangeTypes.TOPIC.equals(exchangeType)) { return new TopicExchange(exchangeName); } if (ExchangeTypes.HEADERS.equals(exchangeType)) { return new HeadersExchange(exchangeName); } if (ExchangeTypes.FANOUT.equals(exchangeType)) { return new FanoutExchange(exchangeName); } return null; } /** * declare relation of binding * * @param exchangeType * @param exchangeName * @param queueName * @param routingKey * @param isWhereAll * @param headers * @return */ private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers) { if (ExchangeTypes.DIRECT.equals(exchangeType)) { return BindingBuilder.bind(new Queue(queueName)) .to(new DirectExchange(exchangeName)) .with(routingKey); } if (ExchangeTypes.TOPIC.equals(exchangeType)) { return BindingBuilder.bind(new Queue(queueName)) .to(new TopicExchange(exchangeName)) .with(routingKey); } if (ExchangeTypes.HEADERS.equals(exchangeType)) { if (isWhereAll) { return BindingBuilder.bind(new Queue(queueName)) .to(new HeadersExchange(exchangeName)) .whereAll(headers) .match(); } else { return BindingBuilder.bind(new Queue(queueName)) .to(new HeadersExchange(exchangeName)) .whereAny(headers) .match(); } } if (ExchangeTypes.FANOUT.equals(exchangeType)) { return BindingBuilder.bind(new Queue(queueName)) .to(new FanoutExchange(exchangeName)); } return null; } /** * declare queue * * @param queueName * @return */ private Queue createQueue(String queueName) { return new Queue(queueName); } }
2.4 編寫RabbitMQ配置類
package com.lucky.spring.config; import com.lucky.spring.util.RabbitUtil; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by zhangdd on 2020/10/7 */ @Configuration public class RabbitConfig { /** * init factory of connection * * @param host * @param port * @param username * @param password * @return */ @Bean public ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } /** * 重新例項化 RabbitAdmin操作類 * * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 重新例項化 RabbitTemplate操作類 * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //資料轉換為json存入訊息佇列 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean public RabbitUtil rabbitUtil() { return new RabbitUtil(); } }
2.5 編寫佇列監聽類(靜態)
我們已經知道這個概念佇列監聽這個概念,只需要在方法上加上@RabbitListener(queues = "")
即可收到對應佇列的訊息。但此時的佇列是已經知道了的所以稱之為監聽靜態佇列。
package com.lucky.spring.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/10/7
*/
@Configuration
public class DirectConsumeListener {
private Logger log = LoggerFactory.getLogger(DirectConsumeListener.class);
/**
* 監聽指定佇列 mq.direct.1
*
* @param message
*/
@RabbitListener(queues = "mq.direct.1")
public void consume(Message message) {
log.info("DirectConsumeListener,收到訊息: {}", message.toString());
}
}
2.6 編寫佇列監聽類(動態)
在服務執行過程中,動態的新增佇列,然後在監聽新增的佇列的行為,就是監聽動態佇列。
package com.lucky.spring.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by zhangdd on 2020/10/7
*/
@Configuration
public class DynamicConsumeListener {
private Logger log = LoggerFactory.getLogger(DynamicConsumeListener.class);
/**
* 使用 SimpleMessageListenerContainer 實現動態監聽
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(message -> {
log.info("ConsumerMessageListen,收到訊息: {}", message.toString());
});
return container;
}
}
如果想向SimpleMessageListenerContainer
新增監聽佇列或者移除佇列,只需要通過如下方式即可操作。
package com.lucky.spring.controller;
import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController
public class ConsumerController {
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private RabbitUtil rabbitUtil;
private Logger log = LoggerFactory.getLogger(ConsumerController.class);
/**
* 新增佇列到監聽器
*
* @param consumerInfo
*/
@PostMapping("/consume/addQueue")
public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
if (!existQueue) {
throw new RuntimeException("當前佇列不存在");
}
//新增mq監聽的佇列
container.addQueueNames(consumerInfo.getQueueName());
//列印監聽容器中正在監聽到佇列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 移除正在監聽的佇列
*
* @param consumerInfo
*/
@PostMapping("/consume/removeQueue")
public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
//移除mq監聽的佇列
container.removeQueueNames(consumerInfo.getQueueName());
//列印監聽容器中正在監聽到佇列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 查詢監聽容器中正在監聽到的佇列
*/
@PostMapping("/consume/queryListenerQueue")
public void queryListenerQueue() {
log.info("container-queue:{}", container.getQueueNames());
}
}
2.7 傳送訊息到交換器
通過如下方式傳送訊息到交換器。
- 先編寫一個請求引數實體類
package com.lucky.spring.entity;
import java.io.Serializable;
/**
* Created by zhangdd on 2020/10/7
*/
public class ProduceInfo implements Serializable {
private static final long serialVersionUID = -5816966739399349770L;
/**
* 交換器名稱
*/
private String exchangeName;
/**
* 路由鍵key
*/
private String routingKey;
/**
* 訊息內容
*/
public String msg;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
- 編寫API介面
package com.lucky.spring.controller;
import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController
public class ConsumerController {
@Autowired
private SimpleMessageListenerContainer container;
@Autowired
private RabbitUtil rabbitUtil;
private Logger log = LoggerFactory.getLogger(ConsumerController.class);
/**
* 新增佇列到監聽器
*
* @param consumerInfo
*/
@PostMapping("/consume/addQueue")
public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
if (!existQueue) {
throw new RuntimeException("當前佇列不存在");
}
//新增mq監聽的佇列
container.addQueueNames(consumerInfo.getQueueName());
//列印監聽容器中正在監聽到佇列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 移除正在監聽的佇列
*
* @param consumerInfo
*/
@PostMapping("/consume/removeQueue")
public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
//移除mq監聽的佇列
container.removeQueueNames(consumerInfo.getQueueName());
//列印監聽容器中正在監聽到佇列
log.info("container-queue:{}", container.getQueueNames());
}
/**
* 查詢監聽容器中正在監聽到的佇列
*/
@PostMapping("/consume/queryListenerQueue")
public void queryListenerQueue() {
log.info("container-queue:{}", container.getQueueNames());
}
}
2.8 交換器、佇列維護操作
如果想通過介面對RabbitMQ中的交換器、佇列以及繫結關係進行維護,通過如下方式介面操作,即可實現。
- 先編寫一個請求引數實體類
package com.lucky.spring.entity;
import java.io.Serializable;
/**
* Created by zhangdd on 2020/10/7
*/
public class QueueConfig implements Serializable {
private static final long serialVersionUID = -6576396650731444495L;
/**
* 交換器型別
*/
private String exchangeType;
/**
* 交換器名稱
*/
private String exchangeName;
/**
* 佇列名稱
*/
private String queueName;
/**
* 路由鍵key
*/
private String routingKey;
public String getExchangeType() {
return exchangeType;
}
public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
- 編寫介面API
package com.lucky.spring.controller;
import com.lucky.spring.entity.QueueConfig;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
/**
* Created by zhangdd on 2020/10/7
*/
@RestController("/config")
public class RabbitController {
@Autowired
private RabbitUtil rabbitUtil;
/**
* 建立交換器
*
* @param config
*/
@PostMapping("addExchange")
public void addExchange(@RequestBody QueueConfig config) {
rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
}
/**
* 刪除交換器
*
* @param config
*/
@PostMapping("deleteExchange")
public void deleteExchange(@RequestBody QueueConfig config) {
rabbitUtil.deleteExchange(config.getExchangeName());
}
/**
* 新增佇列
*
* @param config
*/
@PostMapping("addQueue")
public void addQueue(@RequestBody QueueConfig config) {
rabbitUtil.addQueue(config.getQueueName());
}
/**
* 刪除佇列
*
* @param config
*/
@PostMapping("deleteQueue")
public void deleteQueue(@RequestBody QueueConfig config) {
rabbitUtil.deleteQueue(config.getQueueName());
}
/**
* 清空佇列資料
*
* @param config
*/
@PostMapping("purgeQueue")
public void purgeQueue(@RequestBody QueueConfig config) {
rabbitUtil.purgeQueue(config.getQueueName());
}
/**
* 新增繫結
*
* @param config
*/
@PostMapping("addBinding")
public void addBinding(@RequestBody QueueConfig config) {
rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
}
/**
* 解除繫結
*
* @param config
*/
@PostMapping("removeBinding")
public void removeBinding(@RequestBody QueueConfig config) {
rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
}
/**
* 建立頭部型別的交換器
* 判斷條件是所有的鍵值對都匹配成功才傳送到佇列
*
* @param config
*/
@PostMapping("andExchangeBindingQueueOfHeaderAll")
public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue");
header.put("bindType", "whereAll");
rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
}
/**
* 建立頭部型別的交換器
* 判斷條件是隻要有一個鍵值對匹配成功就傳送到佇列
*
* @param config
*/
@PostMapping("andExchangeBindingQueueOfHeaderAny")
public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue");
header.put("bindType", "whereAny");
rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
}
}
三、利用MQ實現事物補償
上面的操作只是告訴我們怎麼使用 rabbitMQ!
當你仔細回想整個過程的時候,其實還是回到最初那個問題,什麼時候使用 MQ ?
以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:支付訂單、扣減庫存、生成相應單據、發紅包、發簡訊通知等等。
在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的效能,這時可以將一些不需要立即生效的操作拆分出來非同步執行,比如發放紅包、發簡訊通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後傳送一條訊息到 MQ 讓主流程快速完結,而由另外的單獨執行緒拉取 MQ 的訊息(或者由 MQ 推送訊息),當發現 MQ 中有發紅包或發簡訊之類的訊息時,執行相應的業務邏輯。
這種是利用 MQ 實現業務解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。
利用 MQ 實現業務解耦的過程其實也很簡單。
- 當主流程結束之後,將訊息推送到發紅包、發簡訊交換器中即可
package com.lucky.spring.service;
import com.lucky.spring.entity.Order;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* Created by zhangdd on 2020/10/7
*/
@Service
public class OrderService {
@Autowired
private RabbitUtil rabbitUtil;
@Transactional
public void createOrder(Order order) {
//1、建立訂單
//2、呼叫庫存介面,減庫存
//3、向客戶發放紅包
rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
//4、發簡訊通知
rabbitUtil.convertAndSend("exchange.sms.message", null, order);
}
}
- 監聽發紅包操作
/**
* 監聽發紅包
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "exchange.send.bonus")
public void consume(Message message, RabbitProperties.Cache.Channel channel) throws IOException {
String msgJson = new String(message.getBody(),"UTF-8");
log.info("收到訊息: {}", message.toString());
//呼叫發紅包介面
}
- 監聽發簡訊操作
/**
* 監聽發簡訊
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "exchange.sms.message")
public void consume(Message message, AMQImpl.Channel channel) throws IOException {
String msgJson = new String(message.getBody(),"UTF-8");
log.info("收到訊息: {}", message.toString());
//呼叫發簡訊介面
}
既然 MQ 這麼好用,那是不是完全可以將以前的業務也按照整個模型進行拆分呢?
答案顯然不是!
當引入 MQ 之後業務的確是解耦了,但是當 MQ 一旦掛了,所有的服務基本都掛了,是不是很可怕!所以這時候就需要RabbitMQ的叢集搭建和部署,保證訊息幾乎100%的投遞和消費。
到此結束