SpringBoot整合RocketMQ事務訊息
生產者相關配置<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.1.0-incubating</version> </dependency>
RmqProducerConfig.java(生產者)
TransactionExecuterImpl.java(本地邏輯)package com.test.rocketmq.config; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RmqProducerConfig { /** * 所屬的組名 */ @Value("${rocketmq.producerGroup}") private String producerGroup; /** * nameserver地址 */ @Value("${rocketmq.namesrvAddr}") private String namesrvAddr; @Bean(name="transactionMQProducer",destroyMethod="shutdown",initMethod ="start") public TransactionMQProducer transactionMQProducer(){ TransactionMQProducer producer = new TransactionMQProducer(producerGroup); //nameserver地址 producer.setNamesrvAddr(namesrvAddr); //事務回查最小併發數 producer.setCheckThreadPoolMinSize(5); //事務回查最大併發數 producer.setCheckThreadPoolMaxSize(20); //佇列數 producer.setCheckRequestHoldMax(2000); //伺服器回撥Producer,檢查本地事務分支成功還是失敗,雖然沒用,但是還得設定 producer.setTransactionCheckListener(new TransactionCheckListener() { @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("state -- "+ new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); return producer; } }
OrderService.java(生成訂單 併發送訊息)package com.test.rocketmq.mq; import com.alibaba.fastjson.JSONObject; import com.test.rocketmq.dao.OrderMapper; import com.test.rocketmq.dao.pojo.Order; import com.test.rocketmq.utils.FastJsonUtil; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.Map; /** * 處理本地業務邏輯 */ @Component public class TransactionExecuterImpl implements LocalTransactionExecuter { @Autowired private OrderMapper orderMapper; @Override @Transactional public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { try { System.out.println("本地邏輯執行了。。。"); //Message Body Order order = FastJsonUtil.jsonToBean(new String(msg.getBody(), "utf-8"), Order.class); order.setState(Order.COMPLETED); //將之前的訂單 根據使用者id和訂單號 修改為已確認傳送狀態 this.orderMapper.update(order); //成功通知MQ訊息變更 該訊息變為:<確認傳送> return LocalTransactionState.COMMIT_MESSAGE; //這裡方便測試 自定義事務回查 所以返回 未知狀態 //return LocalTransactionState.UNKNOW; } catch (Exception e) { e.printStackTrace(); //失敗回滾 return LocalTransactionState.ROLLBACK_MESSAGE; } } }
package com.test.rocketmq.service;
import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.config.RmqProducerConfig;
import com.test.rocketmq.dao.OrderMapper;
import com.test.rocketmq.dao.pojo.Order;
import com.test.rocketmq.mq.TransactionExecuterImpl;
import com.test.rocketmq.utils.FastJsonUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author: wangsaichao
* @date: 2018/2/12
* @description: 訂單Service層
*/
@Service
public class OrderService {
@Autowired
private TransactionMQProducer transactionMQProducer;
@Autowired
private TransactionExecuterImpl transactionExecuterImpl;
@Autowired
private OrderMapper orderMapper;
/**
* 建立訂單,併發送到MQ
* @param userId
*/
@Transactional(rollbackFor = Exception.class)
public void createOrder(String userId) throws Exception {
Order order = new Order();
order.setUserId(userId);
order.setAmount(666D);
order.setState(Order.PREPARED);
order.setUserName("王賽超");
order.setUpdateTime(new Date());
//訂單id,也是key
String uuid = UUID.randomUUID().toString();
order.setOrderId(uuid);
//訂單入庫
orderMapper.insert(order);
//傳送訊息
this.sendMessage(order);
System.out.println("訂單:"+uuid+"入庫成功,傳送訊息成功");
}
/**
* 給消費者傳送訊息
* @param order
* @throws Exception
*/
private void sendMessage(Order order) throws Exception {
//構造訊息資料
Message message = new Message();
//主題
message.setTopic("TransactionTest");
//子標籤
message.setTags("tagOrder");
message.setKeys(order.getOrderId());
message.setBody(FastJsonUtil.beanToJson(order).getBytes("utf-8"));
//封裝本地邏輯
transactionMQProducer.sendMessageInTransaction(message,transactionExecuterImpl, 1000);
}
}
消費者相關配置MessageListenerImpl.java(消費者)
package com.test.rocketmq.mq;
import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.dao.pojo.Account;
import com.test.rocketmq.dao.pojo.Journal;
import com.test.rocketmq.service.AccountService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
/**
* @author: WangSaiChao
* @date: 2018/2/12
* @description: 訊息消費者
*/
@Component
public class MessageListenerImpl implements MessageListenerConcurrently {
@Autowired
private AccountService accountService;
@Override
@Transactional
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg=msgs.get(0);
try {
String topic = msg.getTopic();
//Message Body
JSONObject messageBody=JSONObject.parseObject(new String(msg.getBody(),"UTF-8"));
String tags = msg.getTags();
String keys = msg.getKeys();
String userId = messageBody.getString("userId");
String orderId = messageBody.getString("orderId");
Double amount = messageBody.getDouble("amount");
//列印
System.out.println(topic+" "+tags+" "+keys+" 使用者id為: "+userId+",訂單號為: " + orderId + ",消費金額為:"+amount);
//業務邏輯處理,根據使用者id去資料庫中 將賬戶餘額更新
Account account = new Account();
Date date = new Date();
account.setAmount(amount);
account.setUserId(userId);
account.setUpdateTime(date);
accountService.update(account);
//儲存記錄到賬戶流水錶
Journal journal = new Journal();
journal.setOrderId(orderId);
journal.setUserId(userId);
journal.setAmount(amount);
journal.setUpdateTime(date);
accountService.insert(journal);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
//重試次數為3情況
if(msg.getReconsumeTimes() == 3){
/**
* 如果連續重發3次還是釋出出去 證明該訂單可能哪裡有錯
* 儲存錯誤日誌,然後就不再發送訊息了,需要人工介入,邏輯我就不寫了
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//如果沒超過三次,稍後再次傳送
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
關於事務訊息回查實現方式
為什麼需要回查?
假如現在有兩個服務 A訂單服務(生產者) B賬戶服務(消費者)
1.A建立一條prepared預傳送狀態的訂單記錄入庫,然後同步傳送MQ
2.MQ儲存訊息然後回撥A執行本地邏輯,將第一步的訂單狀態改 從 prepared狀態改為 已確認狀態(也可以叫 支付中狀態),然後返回成功,如果一切正常,那麼訊息對於B服務可見,正常消費沒問題,如果由於網路等原因失敗,則MQ上的訊息一直處於prepared狀態,對於B服務一直是不可見的狀態,無法消費。
解決方案
A訂單服務
對應 order(訂單表) 欄位必須含有一個 createtime建立時間 和 state 代表狀態的欄位 假設 state 分為 待確認 已確認(支付中) 已完成 三種狀態
對應邏輯如下:
1.A建立一條 待確認 狀態的訂單記錄入庫,然後同步傳送MQ
2.MQ儲存訊息然後回撥A執行本地邏輯,將第一步的訂單狀態改 從 待確認 狀態改為 已確認 狀態。
3.A服務提供一個介面 將 訂單狀態從 已確認 狀態 改為 已完成狀態。
4.另啟動一個job服務 查詢 10分鐘 或者 15分鐘 以上的狀態為待確認的訂單記錄,重新發送MQ,這個時間根據自己的業務執行時間來配置,比如正常情況下 1S內就執行完成了, 高峰 訊息堆積的時候 大概是 20分鐘 取一箇中間值。
B賬戶服務
對應account(賬戶表)
journal(資金流水錶) 必須含有 一個update_time(處理時間)欄位 一個 reconciliation_date(對賬時間)欄位。
selecttime(查詢時間)表 該表中 永遠只有一條記錄, 含有一個時間欄位
對應邏輯如下:
1.B服務消費一條訊息,對應的將一條流水記錄 插入 journal表(要做冪等處理)。
2.另起一個job服務 先查詢selecttime表中的那一條唯一記錄的時間欄位( 第一次可以將該時間隨便設定為當前時間往前就可以,比如設定為 2018年1月1日)。然後 根據該時間欄位查詢 journal表中 已消費的記錄,select XXX,XXX from journal where reconciliation_date >= 2018年1月1日 order by reconciliation_date asc 輪詢呼叫A服務第三步提供的的介面,告訴A服務這個訊息已經消費了,A服務修改訂單狀態。
這個reconciliation_date欄位還有什麼用呢? 假如說 第二步查出來100條記錄, 同步呼叫A服務的介面,執行完成之後,將該欄位值更新為最後一條訂單的reconciliation_date(對賬時間) 下次再查詢的時候 這100條記錄 將不會再查詢。如果說執行過程中,出現什麼異常情況,比如說 執行到第90條 突然停電了,這個 是不影響的,因為修改 該值 是在執行完這100條之後才會進行修改,等程式啟動的時候會重新查詢出這100條記錄,然後回撥A服務。A服務可做冪等,也可不做冪等 ,因為這個操作本身就是將狀態改為已完成狀態,多改幾次結果還是一樣的,天然就是冪等的。