1. 程式人生 > 實用技巧 >04訊息佇列系列-RabbitMQ 利用MQ實現事物補償

04訊息佇列系列-RabbitMQ 利用MQ實現事物補償

04訊息佇列系列-RabbitMQ 利用MQ實現事物補償

原文連結

一、介紹

本篇使用SpringBoot整合RabbitMQ,為後續業務處理開發做鋪墊。

二、整合實戰

2.1 建立一個gradle專案,引入amqp依賴

implementation 'org.springframework.boot:spring-boot-starter-amqp'

2.2 在application.properties檔案裡新增RabbitMQ的配置資訊

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2.3 編寫RabbitUtil工具類

package com.lucky.spring.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Map;

/**
 * Created by zhangdd on 2020/10/7
 */
public class RabbitUtil {

    private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 建立 Exchange
     *
     * @param exchangeType
     * @param exchangeName
     */
    public void addExchange(String exchangeType, String exchangeName) {
        Exchange exchange = createExchange(exchangeType, exchangeName);
        rabbitAdmin.declareExchange(exchange);
    }


    /**
     * 刪除一個Exchange
     *
     * @param exchangeName
     * @return
     */
    public boolean deleteExchange(String exchangeName) {
        return rabbitAdmin.deleteExchange(exchangeName);
    }

    /**
     * 建立一個指定的Queue
     *
     * @param queueName
     */
    public void addQueue(String queueName) {
        Queue queue = createQueue(queueName);
        rabbitAdmin.declareQueue(queue);
    }

    /**
     * 刪除一個 queue
     *
     * @param queueName
     * @return
     */
    public boolean deleteQueue(String queueName) {
        return rabbitAdmin.deleteQueue(queueName);
    }

    /**
     * 按照篩選條件,刪除佇列
     *
     * @param queueName
     * @param unused
     * @param empty
     */
    public void deleteQueue(String queueName, boolean unused, boolean empty) {
        rabbitAdmin.deleteQueue(queueName, unused, empty);
    }

    /**
     * 清空某個佇列中的訊息,注意,清空的訊息並沒有被消費
     *
     * @param queueName
     */
    public void purgeQueue(String queueName) {
        rabbitAdmin.purgeQueue(queueName, false);
    }

    /**
     * 判斷指定的佇列是否存在
     *
     * @param queueName
     * @return
     */
    public boolean existQueue(String queueName) {
        return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;
    }


    /**
     * 繫結一個佇列到一個匹配型交換器使用一個routingKey
     *
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers      EADERS模式型別設定,其他模式型別傳空
     */
    public void addBinding(String exchangeType, String exchangeName, String queueName,
                           String routingKey, boolean isWhereAll, Map<String, Object> headers) {
        Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
        rabbitAdmin.declareBinding(binding);
    }

    /**
     * 宣告繫結
     *
     * @param binding
     */
    public void addBinding(Binding binding) {
        rabbitAdmin.declareBinding(binding);
    }

    /**
     * 解除交換器和佇列的繫結
     *
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     */
    public void removeBinding(String exchangeType, String exchangeName, String queueName,
                              String routingKey, boolean isWhereAll, Map<String, Object> headers) {
        Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
        removeBinding(binding);
    }

    /**
     * 解除交換器與佇列的繫結
     *
     * @param binding
     */
    public void removeBinding(Binding binding) {
        rabbitAdmin.removeBinding(binding);
    }

    /**
     * create a exchange,queue and bind queue at the same time
     *
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     */
    public void addExchangeBindingQueue(String exchangeType, String exchangeName, String queueName,
                                        String routingKey, boolean isWhereAll, Map<String, Object> headers) {
        //宣告交換器
        addExchange(exchangeType, exchangeName);
        //declare queue
        addQueue(queueName);
        //declare relationship of binding
        addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
    }

    /**
     * send message
     *
     * @param exchange
     * @param routingKey
     * @param object
     */
    public void convertAndSend(String exchange, String routingKey, final Object object) {
        rabbitTemplate.convertAndSend(exchange, routingKey, object);
    }

    /**
     * switch message object
     *
     * @param messageType
     * @param msg
     * @return
     */
    public Message getMessage(String messageType, Object msg) {
        MessageProperties properties = new MessageProperties();
        properties.setContentType(messageType);
        Message message = new Message(msg.toString().getBytes(), properties);
        return message;
    }

    /**
     * declare exchange
     *
     * @param exchangeType
     * @param exchangeName
     * @return
     */
    private Exchange createExchange(String exchangeType, String exchangeName) {
        if (ExchangeTypes.DIRECT.equals(exchangeType)) {
            return new DirectExchange(exchangeName);
        }
        if (ExchangeTypes.TOPIC.equals(exchangeType)) {
            return new TopicExchange(exchangeName);
        }
        if (ExchangeTypes.HEADERS.equals(exchangeType)) {
            return new HeadersExchange(exchangeName);
        }
        if (ExchangeTypes.FANOUT.equals(exchangeType)) {
            return new FanoutExchange(exchangeName);
        }
        return null;
    }

    /**
     * declare relation of binding
     *
     * @param exchangeType
     * @param exchangeName
     * @param queueName
     * @param routingKey
     * @param isWhereAll
     * @param headers
     * @return
     */
    private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName,
                                   String routingKey, boolean isWhereAll, Map<String, Object> headers) {
        if (ExchangeTypes.DIRECT.equals(exchangeType)) {
            return BindingBuilder.bind(new Queue(queueName))
                    .to(new DirectExchange(exchangeName))
                    .with(routingKey);
        }
        if (ExchangeTypes.TOPIC.equals(exchangeType)) {
            return BindingBuilder.bind(new Queue(queueName))
                    .to(new TopicExchange(exchangeName))
                    .with(routingKey);
        }
        if (ExchangeTypes.HEADERS.equals(exchangeType)) {
            if (isWhereAll) {
                return BindingBuilder.bind(new Queue(queueName))
                        .to(new HeadersExchange(exchangeName))
                        .whereAll(headers)
                        .match();
            } else {
                return BindingBuilder.bind(new Queue(queueName))
                        .to(new HeadersExchange(exchangeName))
                        .whereAny(headers)
                        .match();
            }
        }

        if (ExchangeTypes.FANOUT.equals(exchangeType)) {
            return BindingBuilder.bind(new Queue(queueName))
                    .to(new FanoutExchange(exchangeName));
        }
        return null;
    }

    /**
     * declare queue
     *
     * @param queueName
     * @return
     */
    private Queue createQueue(String queueName) {
        return new Queue(queueName);
    }

}

2.4 編寫RabbitMQ配置類

package com.lucky.spring.config;

import com.lucky.spring.util.RabbitUtil;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/10/7
 */
@Configuration
public class RabbitConfig {


    /**
     * init factory of connection
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,
                                               @Value("${spring.rabbitmq.port}") int port,
                                               @Value("${spring.rabbitmq.username}") String username,
                                               @Value("${spring.rabbitmq.password}") String password) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    /**
     * 重新例項化 RabbitAdmin操作類
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    /**
     * 重新例項化 RabbitTemplate操作類
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //資料轉換為json存入訊息佇列
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public RabbitUtil rabbitUtil() {
        return new RabbitUtil();
    }

}

2.5 編寫佇列監聽類(靜態)

我們已經知道這個概念佇列監聽這個概念,只需要在方法上加上@RabbitListener(queues = "")即可收到對應佇列的訊息。但此時的佇列是已經知道了的所以稱之為監聽靜態佇列。

package com.lucky.spring.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/10/7
 */
@Configuration
public class DirectConsumeListener {

    private Logger log = LoggerFactory.getLogger(DirectConsumeListener.class);

    /**
     * 監聽指定佇列 mq.direct.1
     *
     * @param message
     */
    @RabbitListener(queues = "mq.direct.1")
    public void consume(Message message) {
        log.info("DirectConsumeListener,收到訊息: {}", message.toString());
    }
}

2.6 編寫佇列監聽類(動態)

在服務執行過程中,動態的新增佇列,然後在監聽新增的佇列的行為,就是監聽動態佇列。

package com.lucky.spring.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by zhangdd on 2020/10/7
 */
@Configuration
public class DynamicConsumeListener {

    private Logger log = LoggerFactory.getLogger(DynamicConsumeListener.class);

    /**
     * 使用 SimpleMessageListenerContainer 實現動態監聽
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setMessageListener(message -> {
            log.info("ConsumerMessageListen,收到訊息: {}", message.toString());
        });
        return container;
    }
}

如果想向SimpleMessageListenerContainer新增監聽佇列或者移除佇列,只需要通過如下方式即可操作。

package com.lucky.spring.controller;

import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by zhangdd on 2020/10/7
 */
@RestController
public class ConsumerController {

    @Autowired
    private SimpleMessageListenerContainer container;

    @Autowired
    private RabbitUtil rabbitUtil;

    private Logger log = LoggerFactory.getLogger(ConsumerController.class);

    /**
     * 新增佇列到監聽器
     *
     * @param consumerInfo
     */
    @PostMapping("/consume/addQueue")
    public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
        boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
        if (!existQueue) {
            throw new RuntimeException("當前佇列不存在");
        }
        //新增mq監聽的佇列
        container.addQueueNames(consumerInfo.getQueueName());
        //列印監聽容器中正在監聽到佇列
        log.info("container-queue:{}", container.getQueueNames());
    }

    /**
     * 移除正在監聽的佇列
     *
     * @param consumerInfo
     */
    @PostMapping("/consume/removeQueue")
    public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
        //移除mq監聽的佇列
        container.removeQueueNames(consumerInfo.getQueueName());
        //列印監聽容器中正在監聽到佇列
        log.info("container-queue:{}", container.getQueueNames());
    }

    /**
     * 查詢監聽容器中正在監聽到的佇列
     */
    @PostMapping("/consume/queryListenerQueue")
    public void queryListenerQueue() {
        log.info("container-queue:{}", container.getQueueNames());
    }
}

2.7 傳送訊息到交換器

通過如下方式傳送訊息到交換器。

  • 先編寫一個請求引數實體類
package com.lucky.spring.entity;

import java.io.Serializable;

/**
 * Created by zhangdd on 2020/10/7
 */
public class ProduceInfo implements Serializable {

    private static final long serialVersionUID = -5816966739399349770L;
    /**
     * 交換器名稱
     */
    private String exchangeName;

    /**
     * 路由鍵key
     */
    private String routingKey;

    /**
     * 訊息內容
     */
    public String msg;

    public String getExchangeName() {
        return exchangeName;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

  • 編寫API介面
package com.lucky.spring.controller;

import com.lucky.spring.entity.ConsumerInfo;
import com.lucky.spring.util.RabbitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by zhangdd on 2020/10/7
 */
@RestController
public class ConsumerController {

    @Autowired
    private SimpleMessageListenerContainer container;

    @Autowired
    private RabbitUtil rabbitUtil;

    private Logger log = LoggerFactory.getLogger(ConsumerController.class);

    /**
     * 新增佇列到監聽器
     *
     * @param consumerInfo
     */
    @PostMapping("/consume/addQueue")
    public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
        boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
        if (!existQueue) {
            throw new RuntimeException("當前佇列不存在");
        }
        //新增mq監聽的佇列
        container.addQueueNames(consumerInfo.getQueueName());
        //列印監聽容器中正在監聽到佇列
        log.info("container-queue:{}", container.getQueueNames());
    }

    /**
     * 移除正在監聽的佇列
     *
     * @param consumerInfo
     */
    @PostMapping("/consume/removeQueue")
    public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
        //移除mq監聽的佇列
        container.removeQueueNames(consumerInfo.getQueueName());
        //列印監聽容器中正在監聽到佇列
        log.info("container-queue:{}", container.getQueueNames());
    }

    /**
     * 查詢監聽容器中正在監聽到的佇列
     */
    @PostMapping("/consume/queryListenerQueue")
    public void queryListenerQueue() {
        log.info("container-queue:{}", container.getQueueNames());
    }
}

2.8 交換器、佇列維護操作

如果想通過介面對RabbitMQ中的交換器、佇列以及繫結關係進行維護,通過如下方式介面操作,即可實現。

  • 先編寫一個請求引數實體類
package com.lucky.spring.entity;

import java.io.Serializable;

/**
 * Created by zhangdd on 2020/10/7
 */
public class QueueConfig implements Serializable {


    private static final long serialVersionUID = -6576396650731444495L;
    /**
     * 交換器型別
     */
    private String exchangeType;

    /**
     * 交換器名稱
     */
    private String exchangeName;

    /**
     * 佇列名稱
     */
    private String queueName;

    /**
     * 路由鍵key
     */
    private String routingKey;

    public String getExchangeType() {
        return exchangeType;
    }

    public void setExchangeType(String exchangeType) {
        this.exchangeType = exchangeType;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
}
  • 編寫介面API
package com.lucky.spring.controller;

import com.lucky.spring.entity.QueueConfig;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

/**
 * Created by zhangdd on 2020/10/7
 */
@RestController("/config")
public class RabbitController {
    @Autowired
    private RabbitUtil rabbitUtil;

    /**
     * 建立交換器
     *
     * @param config
     */
    @PostMapping("addExchange")
    public void addExchange(@RequestBody QueueConfig config) {
        rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
    }

    /**
     * 刪除交換器
     *
     * @param config
     */
    @PostMapping("deleteExchange")
    public void deleteExchange(@RequestBody QueueConfig config) {
        rabbitUtil.deleteExchange(config.getExchangeName());
    }

    /**
     * 新增佇列
     *
     * @param config
     */
    @PostMapping("addQueue")
    public void addQueue(@RequestBody QueueConfig config) {
        rabbitUtil.addQueue(config.getQueueName());
    }

    /**
     * 刪除佇列
     *
     * @param config
     */
    @PostMapping("deleteQueue")
    public void deleteQueue(@RequestBody QueueConfig config) {
        rabbitUtil.deleteQueue(config.getQueueName());
    }

    /**
     * 清空佇列資料
     *
     * @param config
     */
    @PostMapping("purgeQueue")
    public void purgeQueue(@RequestBody QueueConfig config) {
        rabbitUtil.purgeQueue(config.getQueueName());
    }

    /**
     * 新增繫結
     *
     * @param config
     */
    @PostMapping("addBinding")
    public void addBinding(@RequestBody QueueConfig config) {
        rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
    }

    /**
     * 解除繫結
     *
     * @param config
     */
    @PostMapping("removeBinding")
    public void removeBinding(@RequestBody QueueConfig config) {
        rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
    }

    /**
     * 建立頭部型別的交換器
     * 判斷條件是所有的鍵值對都匹配成功才傳送到佇列
     *
     * @param config
     */
    @PostMapping("andExchangeBindingQueueOfHeaderAll")
    public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue");
        header.put("bindType", "whereAll");
        rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
    }

    /**
     * 建立頭部型別的交換器
     * 判斷條件是隻要有一個鍵值對匹配成功就傳送到佇列
     *
     * @param config
     */
    @PostMapping("andExchangeBindingQueueOfHeaderAny")
    public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
        HashMap<String, Object> header = new HashMap<>();
        header.put("queue", "queue");
        header.put("bindType", "whereAny");
        rabbitUtil.addExchangeBindingQueue(ExchangeTypes.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
    }
}

三、利用MQ實現事物補償

上面的操作只是告訴我們怎麼使用 rabbitMQ


當你仔細回想整個過程的時候,其實還是回到最初那個問題,什麼時候使用 MQ ?
以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:支付訂單、扣減庫存、生成相應單據、發紅包、發簡訊通知等等


在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的效能,這時可以將一些不需要立即生效的操作拆分出來非同步執行,比如發放紅包、發簡訊通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後傳送一條訊息到 MQ 讓主流程快速完結,而由另外的單獨執行緒拉取 MQ 的訊息(或者由 MQ 推送訊息),當發現 MQ 中有發紅包或發簡訊之類的訊息時,執行相應的業務邏輯。

這種是利用 MQ 實現業務解耦,其它的場景包括最終一致性、廣播、錯峰流控等等。
利用 MQ 實現業務解耦的過程其實也很簡單。

  • 當主流程結束之後,將訊息推送到發紅包、發簡訊交換器中即可
package com.lucky.spring.service;

import com.lucky.spring.entity.Order;
import com.lucky.spring.util.RabbitUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * Created by zhangdd on 2020/10/7
 */
@Service
public class OrderService {

    @Autowired
    private RabbitUtil rabbitUtil;

    @Transactional
    public void createOrder(Order order) {
        //1、建立訂單
        //2、呼叫庫存介面,減庫存
        //3、向客戶發放紅包
        rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
        //4、發簡訊通知
        rabbitUtil.convertAndSend("exchange.sms.message", null, order);
    }
}

  • 監聽發紅包操作
  /**
     * 監聽發紅包
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "exchange.send.bonus")
    public void consume(Message message, RabbitProperties.Cache.Channel channel) throws IOException {
        String msgJson = new String(message.getBody(),"UTF-8");
        log.info("收到訊息: {}", message.toString());

        //呼叫發紅包介面
    }
  • 監聽發簡訊操作
 /**
     * 監聽發簡訊
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "exchange.sms.message")
    public void consume(Message message, AMQImpl.Channel channel) throws IOException {
        String msgJson = new String(message.getBody(),"UTF-8");
        log.info("收到訊息: {}", message.toString());

        //呼叫發簡訊介面
    }

既然 MQ 這麼好用,那是不是完全可以將以前的業務也按照整個模型進行拆分呢?
答案顯然不是!
當引入 MQ 之後業務的確是解耦了,但是當 MQ 一旦掛了,所有的服務基本都掛了,是不是很可怕!所以這時候就需要RabbitMQ的叢集搭建和部署,保證訊息幾乎100%的投遞和消費。


到此結束