1. 程式人生 > >RocketMQ——ack機制保證消費成功

RocketMQ——ack機制保證消費成功

ACK簡介

在實際使用RocketMQ的時候我們並不能保證每次傳送的訊息都剛好能被消費者一次性正常消費成功,可能會存在需要多次消費才能成功或者一直消費失敗的情況,那作為傳送者該做如何處理呢?

RocketMQ提供了ack機制,以保證訊息能夠被正常消費。傳送者為了保證訊息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為訊息消費成功。中途斷電,丟擲異常等都不會認為成功——即都會重新投遞。

DEMO

當然,如果消費者不告知傳送者我這邊消費資訊異常,那麼傳送者是不會知道的,所以消費者在設定監聽的時候需要給個回撥,具體程式碼如下:

consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
             @Override  
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
                                                             ConsumeConcurrentlyContext context) {  
                 for (MessageExt messageExt : msgs) {    
                    String messageBody = new String(messageExt.getBody());    
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
                    		 new Date())+"消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//輸出訊息內容    
                 }    
                   
                 //返回消費狀態  
                 //CONSUME_SUCCESS 消費成功  
                 //RECONSUME_LATER 消費失敗,需要稍後重新消費  
                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
             }  
         }); 

業務實現消費回撥的時候,當且僅當此回撥函式返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批訊息(預設是1條)是消費完成的。如果這時候訊息消費失敗,例如資料庫異常,餘額不足扣款失敗等一切業務認為訊息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批訊息消費失敗了。

為了保證訊息是肯定被至少消費成功一次,RocketMQ會把這批訊息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續失敗到一定次數(預設16次),就會投遞到DLQ死信佇列。應用可以監控死信佇列來做人工干預。

修改重試時間

重試的時間預設如下,這個可以通過檢視broker的日誌,

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

當然,這個重複的時間間隔是可以在配置檔案內設定的,由於我這邊配置的雙master模式,所以在128伺服器的broker-a.properties和129的broker-b.properties中分別配置,如下圖,設定好後務必將之前的資料清理

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s

重試訊息的處理

一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費效能,理論上當嘗試重複次數達到我們想要的結果時如果還是消費失敗,那麼我們需要將對應的訊息進行記錄,並且結束重複嘗試。

package com.gwd.rocketmq;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/** 
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:49:39 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
public class Consumer {
	public static void main(String[] args) throws MQClientException {  
        
        //宣告並初始化一個consumer  
         //需要一個consumer group名字作為構造方法的引數,這裡為consumer1  
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  
  
         //同樣也要設定NameServer地址  
         consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");  
         //這裡設定的是一個consumer的消費策略  
         //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息  
         //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍  
         //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前  
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
         //設定consumer所訂閱的Topic和Tag,*代表全部的Tag  
         consumer.subscribe("TopicTest", "*");  
        //設定一個Listener,主要進行訊息的邏輯處理  
         consumer.registerMessageListener(new MessageListenerConcurrently() {  
             @Override  
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
                                                             ConsumeConcurrentlyContext context) {  
                 for (MessageExt messageExt : msgs) {    
	                 if(messageExt.getReconsumeTimes()==3) {
	                	 //可以將對應的資料儲存到資料庫,以便人工干預
	                	 System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
	                	 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	                 }
                 }       
                 //返回消費狀態  
                 //CONSUME_SUCCESS 消費成功  
                 //RECONSUME_LATER 消費失敗,需要稍後重新消費  
                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
             }  
         });  
         //呼叫start()方法啟動consumer  
         consumer.start();  
         System.out.println("Consumer Started.");  
     }  
}

所以任何異常都要捕獲返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq會放到重試佇列,這個重試TOPIC的名字是%RETRY%+consumergroup的名字,如下圖:

注意點

1.如果業務的回撥沒有處理好而丟擲異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。

2.當使用順序消費的回撥MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有ConsumeConcurrentlyStatus.RECONSUME_LATER的這個狀態,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停佇列的其餘消費,直到原訊息不斷重試成功為止才能繼續消費。

測試案例

(1)producer

package com.gwd.rocketmq;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/** 
* @FileName Producer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:48:37 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
public class Producer {
	public static void main(String[] args) throws MQClientException, InterruptedException {  
        //宣告並初始化一個producer  
        //需要一個producer group名字作為構造方法的引數,這裡為producer1  
        DefaultMQProducer producer = new DefaultMQProducer("producer1");  
          
        //設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔  
        //NameServer的地址必須有,但是也可以通過環境變數的方式設定,不一定非得寫死在程式碼裡  
        producer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");  
   //     producer.setVipChannelEnabled(false);//3.2。6的版本沒有該設定,在更新或者最新的版本中務必將其設定為false,否則會有問題  
        //呼叫start()方法啟動一個producer例項  
        producer.start();  
  
        //傳送10條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值  
        for (int i = 0; i < 5; i++) {  
            try {  
                Message msg = new Message("TopicTest",// topic  
                        "TagA",// tag  
                        ("Hello RocketMQ " + i).getBytes("utf-8")// body  
                );          
                //呼叫producer的send()方法傳送訊息  
                //這裡呼叫的是同步的方式,所以會有返回結果  
                SendResult sendResult = producer.send(msg);  
                //列印返回結果,可以看到訊息傳送的狀態以及一些相關資訊  
                System.out.println(sendResult);  
            } catch (Exception e) {  
                e.printStackTrace();  
                Thread.sleep(1000);  
            }  
        }  
  
        //傳送完訊息之後,呼叫shutdown()方法關閉producer  
        producer.shutdown();  
    }  
}

(2)Consumer

package com.gwd.rocketmq;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/** 
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:49:39 
* 修改歷史:
* 時間           作者          版本        描述
*====================================================  
*
*/
public class Consumer {
	public static void main(String[] args) throws MQClientException {  
        
        //宣告並初始化一個consumer  
         //需要一個consumer group名字作為構造方法的引數,這裡為consumer1  
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  
  
         //同樣也要設定NameServer地址  
         consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");  
         //這裡設定的是一個consumer的消費策略  
         //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息  
         //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍  
         //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前  
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
         //設定consumer所訂閱的Topic和Tag,*代表全部的Tag  
         consumer.subscribe("TopicTest", "*");  
  
         //設定一個Listener,主要進行訊息的邏輯處理  
         consumer.registerMessageListener(new MessageListenerConcurrently() {  
             @Override  
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
                                                             ConsumeConcurrentlyContext context) {  
                 for (MessageExt messageExt : msgs) {    
                    String messageBody = new String(messageExt.getBody());    
                     System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
                    		 new Date())+"消費響應:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//輸出訊息內容    
                 }    
                 //返回消費狀態  
                 //CONSUME_SUCCESS 消費成功  
                 //RECONSUME_LATER 消費失敗,需要稍後重新消費  
                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
             }  
         });  
         //呼叫start()方法啟動consumer  
         consumer.start();  
         System.out.println("Consumer Started.");  
     }  
}

(3)測試結果如下