1. 程式人生 > >RocketMQ批量消費、訊息重試、消費模式、刷盤方式

RocketMQ批量消費、訊息重試、消費模式、刷盤方式

一、Consumer 批量消費

可以通過

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
這裡需要分為2種情況1、Consumer端先啟動  2、Consumer端後啟動.   正常情況下:應該是Consumer需要先啟動

1、Consumer端先啟動

Consumer程式碼如下

package quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Consumer,訂閱訊息
 */
public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
		consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
		consumer.setConsumeMessageBatchMaxSize(10);
		/**
		 * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
		 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
		 */
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.subscribe("TopicTest", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				
				try {
					System.out.println("msgs的長度" + msgs.size());
					System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}
				
				
				
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		consumer.start();

		System.out.println("Consumer Started.");
	}
}


由於這裡是Consumer先啟動,所以他會去輪詢MQ上是否有訂閱佇列的訊息,由於每次producer插入一條,Consumer就拿一條所以測試結果如下(每次size都是1):


2、Consumer端後啟動,也就是Producer先啟動

由於這裡是Consumer後啟動,所以MQ上也就堆積了一堆資料,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條    

所以這段程式碼就生效了測試結果如下(每次size最多是10):


二、訊息重試機制:訊息重試分為2種1、Producer端重試 2、Consumer端重試

1、Producer端重試 

也就是Producer往MQ上發訊息沒有傳送成功,我們可以設定傳送失敗重試的次數

package quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
 * Producer,傳送訊息
 * 
 */
public class Producer {
	public static void main(String[] args) throws MQClientException, InterruptedException {
		DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
		producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
		producer.setRetryTimesWhenSendFailed(10);//失敗的 情況傳送10次
		producer.start();

		for (int i = 0; i < 1000; i++) {
			try {
				Message msg = new Message("TopicTest",// topic
						"TagA",// tag
						("Hello RocketMQ " + i).getBytes()// body
				);
				SendResult sendResult = producer.send(msg);
				System.out.println(sendResult);
			} catch (Exception e) {
				e.printStackTrace();
				Thread.sleep(1000);
			}
		}

		producer.shutdown();
	}
}


2、Consumer端重試

2.1、exception的情況,一般重複16次 10s、30s、1分鐘、2分鐘、3分鐘等等

上面的程式碼中消費異常的情況返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試

正常則返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

package quickstart;


import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Consumer,訂閱訊息
 */
public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
		consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
		consumer.setConsumeMessageBatchMaxSize(10);
		/**
		 * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
		 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
		 */
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.subscribe("TopicTest", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				try {
					// System.out.println("msgs的長度" + msgs.size());
					System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
					for (MessageExt msg : msgs) {
						String msgbody = new String(msg.getBody(), "utf-8");
						if (msgbody.equals("Hello RocketMQ 4")) {
							System.out.println("======錯誤=======");
							int a = 1 / 0;
						}
					}

				} catch (Exception e) {
					e.printStackTrace();
					if(msgs.get(0).getReconsumeTimes()==3){
						//記錄日誌
						
						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
					}else{
						
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
					}
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
			}
		});

		consumer.start();

		System.out.println("Consumer Started.");
	}
}

列印結果:




假如超過了多少次之後我們可以讓他不再重試記錄 日誌。

if(msgs.get(0).getReconsumeTimes()==3){
//記錄日誌
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

2.2超時的情況,這種情況MQ會無限制的傳送給消費端。

就是由於網路的情況,MQ傳送資料之後,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;這樣的就認為沒有到達Consumer端。

這裡模擬Producer只發送一條資料。consumer端暫停1分鐘並且不傳送接收狀態給MQ

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Consumer,訂閱訊息
 */
public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
		consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
		consumer.setConsumeMessageBatchMaxSize(10);
		/**
		 * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
		 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
		 */
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.subscribe("TopicTest", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				try {

					// 表示業務處理時間
					System.out.println("=========開始暫停===============");
					Thread.sleep(60000);

					for (MessageExt msg : msgs) {
						System.out.println(" Receive New Messages: " + msg);
					}

				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
			}
		});

		consumer.start();

		System.out.println("Consumer Started.");
	}
}




三、消費模式

廣播消費:rocketMQ預設是叢集消費,我們可以通過在Consumer來支援廣播消費

consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/**
 * Consumer,訂閱訊息
 */
public class Consumer2 {

	public static void main(String[] args) throws InterruptedException, MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
		consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
		consumer.setConsumeMessageBatchMaxSize(10);
		consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
	
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.subscribe("TopicTest", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				try {

					for (MessageExt msg : msgs) {
						System.out.println(" Receive New Messages: " + msg);
					}

				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
			}
		});

		consumer.start();

		System.out.println("Consumer Started.");
	}
}

如果我們有2臺節點(非主關係),2個節點物理上是分開的,Producer往MQ上寫入20條資料 其中broker1中拉取了12條 。broker2中拉取了8 條,這種情況下,假如broker1宕機,那麼我們消費資料的時候,只能消費到broker2中的8條,broker1中的12條已經持久化到中。需要broker1回覆之後這12條資料才能繼續被消費。


非同步複製和同步雙寫主要是主和從的關係。訊息需要實時消費的,就需要採用主從模式部署

非同步複製:比如這裡有一主一從,我們傳送一條訊息到主節點之後,這樣訊息就算從producer端傳送成功了,然後通過非同步複製的方法將資料複製到從節點

同步雙寫:比如這裡有一主一從,我們傳送一條訊息到主節點之後,這樣訊息就並不算從producer端傳送成功了,需要通過同步雙寫的方法將資料同步到從節點後, 才算資料傳送成功。

四、刷盤方式

同步刷盤:在訊息到達MQ後,RocketMQ需要將資料持久化,同步刷盤是指資料到達記憶體之後,必須刷到commitlog日誌之後才算成功,然後返回producer資料已經發送成功。


非同步刷盤:,同步刷盤是指資料到達記憶體之後,返回producer說資料已經發送成功。,然後再寫入commitlog日誌。


commitlog:

commitlog就是來儲存所有的元資訊,包含訊息體,類似於Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使資料丟失,仍然可以恢復出來。

consumequeue:記錄資料的位置,以便Consume快速通過consumequeue找到commitlog中的資料