1. 程式人生 > >RocketMQ-事務訊息

RocketMQ-事務訊息

1.RocketMQ有三種訊息

    1)普通訊息

    2)順序訊息

    3)事務訊息

2.分散式事務-強調最終一致性而不是強一致性

  1. 單個應用,資料庫分庫分表了(跨資料庫)
  2. 多個應用,服務拆分了,不跨資料庫
  3. 多個應用,服務拆分了,並且資料庫分庫分表了

正式因為出現了上面三種情況,才引入了分散式事務的解決方案,而RocketMQ的事務訊息就是一種解決方案

3.RocketMQ事務訊息的執行流程

        事務訊息有三種狀態:

  • UNKNOWN狀態:表示事務訊息未確定,可能是業務方執行本地事務邏輯時間耗時過長或者網路原因等引起的,該狀態會導致步驟5的發生
  • COMMIT狀態:表示事務訊息被提交,會被正確分發給消費者。
  • ROLLBACK狀態:該狀態表示該事務訊息被回滾,因為本地事務邏輯執行失敗導致

4.生產者傳送prepare訊息的過程

 

5.boroker處理prepare的訊息過程

6.broker結束事務訊息的過程

7.broker對事務訊息是如何回查的

8.事務訊息的異常恢復機制

事務訊息的異常狀態主要有:

            生產者提交prepare訊息到broker成功,但是當前生產者例項宕機了
            生產者提交prepare訊息到broker失敗,可能是因為提交的broker已宕機
            生產者提交prepare訊息到broker成功,執行本地事務邏輯成功,但是broker宕機了未確定事務狀態
            生產提交prepare訊息到broker成功,但是在進行事務回查的過程中broker宕機了,未確定事務狀態
對於1:事務訊息會根據producerGroup搜尋其他的生產者例項進行回查,所以transactionId務必儲存在中央儲存中,並且事務訊息的pid不能跟其他訊息的pid混用。

對於2:當前例項會搜尋其他的可用的broker-master進行提交,因為只有提交prepare訊息後才會執行本地事務,所以沒有影響,注意生產者報的是超時異常時,是不會進行重發的。

對於3:因為返回狀態是oneway方式,此時如果消費者未收到訊息,需要用手段確定該事務訊息的狀態,儘快將broker重啟,broker重啟後會通過回查完成事務訊息。

對於4:同3,儘快重啟broker。

9.生產者程式碼示例

package com.roger.transaction.producer;

import com.roger.transaction.listener.TransactionListenerImpl;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.UUID;
import java.util.concurrent.*;

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        //step1:未決事務
        // 也就是 當RocketMQ發現`Prepared訊息`時,會根據這個Listener實現的策略來決斷事務
        TransactionListener transactionListener = new TransactionListenerImpl();
        //step2:建立事務訊息生產者
        TransactionMQProducer transactionMQProducer =
                new TransactionMQProducer("transactionProducerGroup");
        //step3:設定nameServer地址
        transactionMQProducer.setNamesrvAddr("172.20.10.60:9876");
        //step4:設定事務訊息的決斷者
        transactionMQProducer.setTransactionListener(transactionListener);
        //step6:設定事務訊息的處理執行緒池
        ExecutorService executorService = new ThreadPoolExecutor(2,
                5,
                30,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("trancationMQProducerThread");
                        return thread;
                    }
                });
        transactionMQProducer.setExecutorService(executorService);
        //step7:啟動
        transactionMQProducer.start();
        //step8:傳送訊息
        String[] tags = new String[]{"TagTMA", "TagTMB", "TagTMC"};
        for (int i = 0; i < 20; i++) {
            Message msg = new Message();
            msg.setTopic("TrancationMsgTopic");
            msg.setTags(tags[i % tags.length]);
            msg.setKeys(UUID.randomUUID().toString());
            msg.setBody("Hello RocketMQ Tranaction Msg".getBytes(RemotingHelper.DEFAULT_CHARSET));
            TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
        }
    }
}
package com.roger.transaction.listener;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 事務訊息的檢測機制
 */
public class TransactionListenerImpl implements TransactionListener {

    private ConcurrentMap<String,Integer> localTransMap = new ConcurrentHashMap<>();
    private  AtomicInteger transactionIndex = new AtomicInteger(0);

    /**
     * 1)設定本地事務狀態
     * 2)在本地事務表中插入(t_message_transaction)一條記錄
     * 3)臨時使用ConcurrentMap可以實現儲存
     * 4)這個方法和業務方法的程式碼在一個事務中,即業務方法成功,會在本地事務表中增加一條記錄,否則,不會增加
     * @param msg
     * @param args
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object args) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTransMap.put(msg.getTransactionId(),status);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(transactionIndex.get());
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 告知RocketMQ訊息是需要提交還是回滾
     *
     * broker定時進行訊息會查
     *
     * 如果 executeLocalTransaction()方法返回的訊息狀態是UNKNOWN狀態的訊息
     * 則 broker會呼叫checkLocalTransaction()方法檢驗這個這個事務訊息狀態
     * 來告知RocketMQ訊息是需要提交還是回滾
     * @param msgExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msgExt) {
        System.out.println("broker 會查事務狀態....");
        Integer status = localTransMap.get(msgExt.getTransactionId());
        if(status == null){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        switch (status){
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }
}

參考博文:https://blog.csdn.net/qq_28632173/article/details/83790243