整合RocketMQ事務訊息
阿新 • • 發佈:2019-09-13
一、 選擇RocketMQ原因
ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ選型
二、 整合思路
RocketMQ提供了事務訊息回查,檢視官方Demo
@SpringBootApplication public class ProducerApplication implements CommandLineRunner { private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @Resource private RocketMQTemplate rocketMQTemplate; @Value("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Override public void run(String... args) throws Exception { // Send transactional messages testTransaction(); } private void testTransaction() throws MessagingException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = MessageBuilder .withPayload("Hello RocketMQ " + i) .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i) .build(); SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, springTransTopic + ":" + tags[i % tags.length], msg, null); System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n", msg.getPayload(), sendResult.getSendStatus()); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(transId, status); if (status == 0) { // Return local transaction with success(commit), in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload()); return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { // Return local transaction with failure(rollback) , in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload()); return RocketMQLocalTransactionState.ROLLBACK; } System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break; } } System.out.printf("------ !!! checkLocalTransaction is executed once," + " msgTransactionId=%s, TransactionState=%s status=%s %n", transId, retState, status); return retState; } } }
需要在testTransaction()
中傳送訊息,然後在TransactionListenerImpl
類中實現executeLocalTransaction()
方法才能執行整個本地事務,然後在checkLocalTransaction()
中實現事務訊息回查。
檢視原始碼可以知道testTransaction()
方法和executeLocalTransaction()
是在同一個執行緒當中,只不過包裝RocketMQTemplate
中。
三、問題和解決方法
3.1事務訊息面臨的幾個問題:
- 訊息傳送的事務訊息回撥查詢和本地事務沒嚴格的先後順序,怎麼保證,回查時,事務操作肯定已經完成。
- 事務訊息回撥使用
transaction_id
transaction_id
存放在哪裡,同時保證transaction_id
關聯的業務操作執行成功。 - 怎麼把事務回撥查詢操作隔離出業務,保證不侵入程式碼中。
- 下游消費者怎麼保證介面冪等性。
- 下游消費者怎麼提高冪等性查詢效能。
- 怎麼把冪等性操作隔離出業務,保證不侵入程式碼中。
3.2 解決方法
- 因為資料庫或者其他業務操作可能會存在延時,那麼不能保證回查時業務操作已完成,那麼可以多次回查,並設定最大回查次數,同時不能丟棄MQ訊息持久化,方便手動恢復。
- 可以使用本地訊息表落地的傳送訊息,同時可以採用切面、繼承等等方式將落地訊息隔離出業務程式碼之外,保證本地訊息落庫不侵入,注意必須要保證本地訊息落庫和本地業務落庫在同一個事務之內!
- 事務訊息回查可以使用第2點的本地訊息表,根據
transaction_id
查詢,判斷本地事務的執行結果,也和第2點一樣,可以使用一些方式將事務訊息回查程式碼隔離出業務程式碼,保證不侵入。 - 冪等性的方法:
- 資料庫唯一約束
- 狀態機CAS單向流轉
- 訊息去重表
- ,在執行本地業務前,先對redis判斷是業務id是否存在,存在則直接返回消費成功,在執行本地業務之後,可以將消費資訊非同步落地到redis當中。注意:需要保證本地業務和訊息冪等性操作在同一個事務當中,同時redis落地操作在事務之外。
- 比較好的方案應該是資料庫唯一約束 + 訊息去重表,在訊息去重表中對業務id設定唯一約束,同時將訊息落地操作隔離出本地業務之外,保證不侵入。
- 定時清理歷史的本地訊息表(訊息