1. 程式人生 > >整合RocketMQ事務訊息

整合RocketMQ事務訊息

一、 選擇RocketMQ原因

ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ選型

二、 整合思路

RocketMQ提供了事務訊息回查,檢視官方Demo

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
    private static final String TX_PGROUP_NAME = "myTxProducerGroup";
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Value("${demo.rocketmq.transTopic}")
    private String springTransTopic;
    
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // Send transactional messages
        testTransaction();
    }


    private void testTransaction() throws MessagingException {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {

                Message msg = MessageBuilder
                                        .withPayload("Hello RocketMQ " + i)
                                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i)
                                        .build();
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
                                                                        springTransTopic + ":" + tags[i % tags.length],
                                                                        msg,
                                                                        null);
                System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                                    msg.getPayload(),
                                    sendResult.getSendStatus());

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(transId, status);
            if (status == 0) {
                // Return local transaction with success(commit), in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (status == 1) {
                // Return local transaction with failure(rollback) , in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
            Integer status = localTrans.get(transId);
            if (null != status) {
                switch (status) {
                    case 0:
                        retState = RocketMQLocalTransactionState.UNKNOWN;
                        break;
                    case 1:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                    case 2:
                        retState = RocketMQLocalTransactionState.ROLLBACK;
                        break;
                }
            }
            System.out.printf("------ !!! checkLocalTransaction is executed once," +
                    " msgTransactionId=%s, TransactionState=%s status=%s %n",
                transId, retState, status);
            return retState;
        }
    }

}

需要在testTransaction()中傳送訊息,然後在TransactionListenerImpl類中實現executeLocalTransaction()方法才能執行整個本地事務,然後在checkLocalTransaction()中實現事務訊息回查。

檢視原始碼可以知道testTransaction()方法和executeLocalTransaction()是在同一個執行緒當中,只不過包裝RocketMQTemplate中。

三、問題和解決方法

3.1事務訊息面臨的幾個問題:

  1. 訊息傳送的事務訊息回撥查詢和本地事務沒嚴格的先後順序,怎麼保證,回查時,事務操作肯定已經完成。
  2. 事務訊息回撥使用transaction_id
    查詢,那麼transaction_id存放在哪裡,同時保證transaction_id關聯的業務操作執行成功。
  3. 怎麼把事務回撥查詢操作隔離出業務,保證不侵入程式碼中。
  4. 下游消費者怎麼保證介面冪等性。
  5. 下游消費者怎麼提高冪等性查詢效能。
  6. 怎麼把冪等性操作隔離出業務,保證不侵入程式碼中。

3.2 解決方法

  1. 因為資料庫或者其他業務操作可能會存在延時,那麼不能保證回查時業務操作已完成,那麼可以多次回查,並設定最大回查次數,同時不能丟棄MQ訊息持久化,方便手動恢復。
  2. 可以使用本地訊息表落地的傳送訊息,同時可以採用切面、繼承等等方式將落地訊息隔離出業務程式碼之外,保證本地訊息落庫不侵入,注意必須要保證本地訊息落庫和本地業務落庫在同一個事務之內!
  3. 事務訊息回查可以使用第2點的本地訊息表,根據transaction_id查詢,判斷本地事務的執行結果,也和第2點一樣,可以使用一些方式將事務訊息回查程式碼隔離出業務程式碼,保證不侵入。
  4. 冪等性的方法:
    • 資料庫唯一約束
    • 狀態機CAS單向流轉
    • 訊息去重表
  5. ,在執行本地業務前,先對redis判斷是業務id是否存在,存在則直接返回消費成功,在執行本地業務之後,可以將消費資訊非同步落地到redis當中。注意:需要保證本地業務和訊息冪等性操作在同一個事務當中,同時redis落地操作在事務之外。
  6. 比較好的方案應該是資料庫唯一約束 + 訊息去重表,在訊息去重表中對業務id設定唯一約束,同時將訊息落地操作隔離出本地業務之外,保證不侵入。
  7. 定時清理歷史的本地訊息表(訊息