1. 程式人生 > >RocketMq(3)-傳送帶有事務的訊息

RocketMq(3)-傳送帶有事務的訊息

RocketMq支援事務性的訊息

RocketMq支援傳輸帶有事務的訊息,可以用來保持資料的最終一致性。下面我來簡單的實現一個帶有事務的訊息

生產者端的程式碼

  1. 需要寫事務檢查的Listener,自定義一個類,實現TransactionCheckListener(rocketMq3.0.8版本以前)

    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
    //可以檢視messageExt裡面的body是否存在,來判斷事務的狀態
    return LocalTransactionState.COMMIT_MESSAGE;
    }
    
  2. 然後寫本地操作的事務,我們需要實現TransactionExuctor的介面,

    public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
    System.out.println(“本地事務”);
    //在這裡進行資料庫的操作,
    return LocalTransactionState.COMMIT_MESSAGE;
    }

  3. 生產者的程式碼

    TransactionCheckListener transactionCheckListener=new MyTranscationCheckListener();
    TransactionMQProducer producer=new TransactionMQProducer(“transaction”);
    producer.setNamesrvAddr(“47.106.132.60:9876”);
    producer.setTransactionCheckListener(transactionCheckListener);
    producer.start();
    MyLocalTranscationExuctor transcationExuctor=new MyLocalTranscationExuctor();
    Message message=new Message(“Topic_Trans”,”MyTags”,”事務訊息”.getBytes());
    producer.sendMessageInTransaction(message,transcationExuctor,null);
    producer.shutdown();

如何處理在RocketMq3.0.8版本後的訊息檢查

我們需要自己去解決訊息的check listener回查機制,我們可以在生產者這端執行本地事務操作時,我們可以把當前訊息存在另外一張表中,最好每條資訊都需要一個Id,然後在這端另起一個任務或者執行緒來取出傳送確認訊息失敗的訊息傳送給消費者。然後消費者這段需要一張表來儲存已經成功解決的訊息列表,然後起一個任務,來取出一段時間之內已成功消費的資訊,然後消費者那端進行修改表中的訊息狀態碼。這樣就會解決確認訊息失敗的原因。其實這個也可以用來實現不支援事務訊息的訊息佇列實現分散式事務的最終一致性。