1. 程式人生 > >SpringBoot整合RocketMQ事務訊息

SpringBoot整合RocketMQ事務訊息

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.1.0-incubating</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>4.1.0-incubating</version>
</dependency>
生產者相關配置
RmqProducerConfig.java(生產者)

package com.test.rocketmq.config;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RmqProducerConfig {

    /**
     * 所屬的組名
     */
    @Value("${rocketmq.producerGroup}")
    private String producerGroup;

    /**
     * nameserver地址
     */
    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;


    @Bean(name="transactionMQProducer",destroyMethod="shutdown",initMethod ="start")
    public TransactionMQProducer transactionMQProducer(){

        TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
        //nameserver地址
        producer.setNamesrvAddr(namesrvAddr);
        //事務回查最小併發數
        producer.setCheckThreadPoolMinSize(5);
        //事務回查最大併發數
        producer.setCheckThreadPoolMaxSize(20);
        //佇列數
        producer.setCheckRequestHoldMax(2000);
        //伺服器回撥Producer,檢查本地事務分支成功還是失敗,雖然沒用,但是還得設定
        producer.setTransactionCheckListener(new TransactionCheckListener() {
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                System.out.println("state -- "+ new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        return producer;
    }


}
TransactionExecuterImpl.java(本地邏輯)
package com.test.rocketmq.mq;

import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.dao.OrderMapper;
import com.test.rocketmq.dao.pojo.Order;
import com.test.rocketmq.utils.FastJsonUtil;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.Map;

/**
 * 處理本地業務邏輯
 */
@Component
public class TransactionExecuterImpl implements LocalTransactionExecuter {

    @Autowired
    private OrderMapper orderMapper;

    @Override
    @Transactional
    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

        try {

            System.out.println("本地邏輯執行了。。。");
            //Message Body
            Order order = FastJsonUtil.jsonToBean(new String(msg.getBody(), "utf-8"), Order.class);

            order.setState(Order.COMPLETED);
            //將之前的訂單 根據使用者id和訂單號 修改為已確認傳送狀態
            this.orderMapper.update(order);

            //成功通知MQ訊息變更 該訊息變為:<確認傳送>
            return LocalTransactionState.COMMIT_MESSAGE;

            //這裡方便測試 自定義事務回查 所以返回 未知狀態
            //return LocalTransactionState.UNKNOW;

        } catch (Exception e) {
            e.printStackTrace();
            //失敗回滾
            return LocalTransactionState.ROLLBACK_MESSAGE;

        }

    }
}
OrderService.java(生成訂單 併發送訊息)
package com.test.rocketmq.service;

import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.config.RmqProducerConfig;
import com.test.rocketmq.dao.OrderMapper;
import com.test.rocketmq.dao.pojo.Order;
import com.test.rocketmq.mq.TransactionExecuterImpl;
import com.test.rocketmq.utils.FastJsonUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author: wangsaichao
 * @date: 2018/2/12
 * @description: 訂單Service層
 */
@Service
public class OrderService {

    @Autowired
    private TransactionMQProducer transactionMQProducer;

    @Autowired
    private TransactionExecuterImpl transactionExecuterImpl;

    @Autowired
    private OrderMapper orderMapper;

    /**
     * 建立訂單,併發送到MQ
     * @param userId
     */
    @Transactional(rollbackFor = Exception.class)
    public void createOrder(String userId) throws Exception {

        Order order = new Order();
        order.setUserId(userId);
        order.setAmount(666D);
        order.setState(Order.PREPARED);
        order.setUserName("王賽超");
        order.setUpdateTime(new Date());
        //訂單id,也是key
        String uuid = UUID.randomUUID().toString();
        order.setOrderId(uuid);

        //訂單入庫
        orderMapper.insert(order);

        //傳送訊息
        this.sendMessage(order);

        System.out.println("訂單:"+uuid+"入庫成功,傳送訊息成功");

    }

    /**
     * 給消費者傳送訊息
     * @param order
     * @throws Exception
     */
    private void sendMessage(Order order) throws Exception {

        //構造訊息資料
        Message message = new Message();
        //主題
        message.setTopic("TransactionTest");
        //子標籤
        message.setTags("tagOrder");

        message.setKeys(order.getOrderId());

        message.setBody(FastJsonUtil.beanToJson(order).getBytes("utf-8"));

        //封裝本地邏輯

        transactionMQProducer.sendMessageInTransaction(message,transactionExecuterImpl, 1000);

    }
}
消費者相關配置
MessageListenerImpl.java(消費者)

package com.test.rocketmq.mq;

import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.dao.pojo.Account;
import com.test.rocketmq.dao.pojo.Journal;
import com.test.rocketmq.service.AccountService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.List;

/**
 * @author: WangSaiChao
 * @date: 2018/2/12
 * @description: 訊息消費者
 */
@Component
public class MessageListenerImpl implements MessageListenerConcurrently {


    @Autowired
    private AccountService accountService;


    @Override
    @Transactional
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt msg=msgs.get(0);
        try {

            String topic = msg.getTopic();
            //Message Body
            JSONObject messageBody=JSONObject.parseObject(new String(msg.getBody(),"UTF-8"));
            String tags = msg.getTags();
            String keys = msg.getKeys();

            String userId = messageBody.getString("userId");
            String orderId = messageBody.getString("orderId");
            Double amount = messageBody.getDouble("amount");

            //列印
            System.out.println(topic+"  "+tags+"    "+keys+"  使用者id為: "+userId+",訂單號為: " + orderId + ",消費金額為:"+amount);

            //業務邏輯處理,根據使用者id去資料庫中 將賬戶餘額更新
            Account account = new Account();
            Date date = new Date();
            account.setAmount(amount);
            account.setUserId(userId);
            account.setUpdateTime(date);
            accountService.update(account);

            //儲存記錄到賬戶流水錶
            Journal journal = new Journal();
            journal.setOrderId(orderId);
            journal.setUserId(userId);
            journal.setAmount(amount);
            journal.setUpdateTime(date);
            accountService.insert(journal);

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        } catch (Exception e) {
            e.printStackTrace();
            //重試次數為3情況
            if(msg.getReconsumeTimes() == 3){

                /**
                 * 如果連續重發3次還是釋出出去 證明該訂單可能哪裡有錯
                 * 儲存錯誤日誌,然後就不再發送訊息了,需要人工介入,邏輯我就不寫了
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            //如果沒超過三次,稍後再次傳送
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

    }
}

關於事務訊息回查實現方式

為什麼需要回查?

假如現在有兩個服務  A訂單服務(生產者)  B賬戶服務(消費者)   
1.A建立一條prepared預傳送狀態的訂單記錄入庫,然後同步傳送MQ
2.MQ儲存訊息然後回撥A執行本地邏輯,將第一步的訂單狀態改 從 prepared狀態改為 已確認狀態(也可以叫 支付中狀態),然後返回成功,如果一切正常,那麼訊息對於B服務可見,正常消費沒問題,如果由於網路等原因失敗,則MQ上的訊息一直處於prepared狀態,對於B服務一直是不可見的狀態,無法消費。

解決方案
A訂單服務

對應 order(訂單表) 欄位必須含有一個 createtime建立時間 和 state 代表狀態的欄位 假設 state 分為 待確認  已確認(支付中)  已完成 三種狀態
對應邏輯如下:
1.A建立一條 待確認 狀態的訂單記錄入庫,然後同步傳送MQ
2.MQ儲存訊息然後回撥A執行本地邏輯,將第一步的訂單狀態改 從 待確認 狀態改為 已確認 狀態。
3.A服務提供一個介面 將 訂單狀態從 已確認 狀態 改為 已完成狀態。
4.另啟動一個job服務 查詢 10分鐘 或者 15分鐘 以上的狀態為待確認的訂單記錄,重新發送MQ,這個時間根據自己的業務執行時間來配置,比如正常情況下 1S內就執行完成了, 高峰 訊息堆積的時候 大概是 20分鐘 取一箇中間值。

B賬戶服務
對應account(賬戶表) 
journal(資金流水錶) 必須含有 一個update_time(處理時間)欄位  一個 reconciliation_date(對賬時間)欄位。
selecttime(查詢時間)表  該表中 永遠只有一條記錄, 含有一個時間欄位
對應邏輯如下:
1.B服務消費一條訊息,對應的將一條流水記錄 插入 journal表(要做冪等處理)。
2.另起一個job服務 先查詢selecttime表中的那一條唯一記錄的時間欄位( 第一次可以將該時間隨便設定為當前時間往前就可以,比如設定為 2018年1月1日)。然後 根據該時間欄位查詢 journal表中 已消費的記錄,select XXX,XXX from journal where reconciliation_date >= 2018年1月1日 order by reconciliation_date asc 輪詢呼叫A服務第三步提供的的介面,告訴A服務這個訊息已經消費了,A服務修改訂單狀態。
這個reconciliation_date欄位還有什麼用呢? 假如說 第二步查出來100條記錄, 同步呼叫A服務的介面,執行完成之後,將該欄位值更新為最後一條訂單的reconciliation_date(對賬時間)  下次再查詢的時候 這100條記錄 將不會再查詢。如果說執行過程中,出現什麼異常情況,比如說 執行到第90條 突然停電了,這個 是不影響的,因為修改 該值 是在執行完這100條之後才會進行修改,等程式啟動的時候會重新查詢出這100條記錄,然後回撥A服務。A服務可做冪等,也可不做冪等 ,因為這個操作本身就是將狀態改為已完成狀態,多改幾次結果還是一樣的,天然就是冪等的。