RocketMQ批量消費、訊息重試、消費模式、刷盤方式
一、Consumer 批量消費
可以通過
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
這裡需要分為2種情況1、Consumer端先啟動 2、Consumer端後啟動. 正常情況下:應該是Consumer需要先啟動1、Consumer端先啟動
Consumer程式碼如下
package quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱訊息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br> * 如果非第一次啟動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println("msgs的長度" + msgs.size()); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
由於這裡是Consumer先啟動,所以他會去輪詢MQ上是否有訂閱佇列的訊息,由於每次producer插入一條,Consumer就拿一條所以測試結果如下(每次size都是1):
2、Consumer端後啟動,也就是Producer先啟動
由於這裡是Consumer後啟動,所以MQ上也就堆積了一堆資料,Consumer的
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
所以這段程式碼就生效了測試結果如下(每次size最多是10):
二、訊息重試機制:訊息重試分為2種1、Producer端重試 2、Consumer端重試
1、Producer端重試
也就是Producer往MQ上發訊息沒有傳送成功,我們可以設定傳送失敗重試的次數
package quickstart; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * Producer,傳送訊息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); producer.setRetryTimesWhenSendFailed(10);//失敗的 情況傳送10次 producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
2、Consumer端重試
2.1、exception的情況,一般重複16次 10s、30s、1分鐘、2分鐘、3分鐘等等
上面的程式碼中消費異常的情況返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試
正常則返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,訂閱訊息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// System.out.println("msgs的長度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt msg : msgs) {
String msgbody = new String(msg.getBody(), "utf-8");
if (msgbody.equals("Hello RocketMQ 4")) {
System.out.println("======錯誤=======");
int a = 1 / 0;
}
}
} catch (Exception e) {
e.printStackTrace();
if(msgs.get(0).getReconsumeTimes()==3){
//記錄日誌
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
列印結果:
假如超過了多少次之後我們可以讓他不再重試記錄 日誌。
if(msgs.get(0).getReconsumeTimes()==3){
//記錄日誌
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超時的情況,這種情況MQ會無限制的傳送給消費端。
就是由於網路的情況,MQ傳送資料之後,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;這樣的就認為沒有到達Consumer端。
這裡模擬Producer只發送一條資料。consumer端暫停1分鐘並且不傳送接收狀態給MQ
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,訂閱訊息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 表示業務處理時間
System.out.println("=========開始暫停===============");
Thread.sleep(60000);
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
三、消費模式
廣播消費:rocketMQ預設是叢集消費,我們可以通過在Consumer來支援廣播消費
consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Consumer,訂閱訊息
*/
public class Consumer2 {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
如果我們有2臺節點(非主關係),2個節點物理上是分開的,Producer往MQ上寫入20條資料 其中broker1中拉取了12條 。broker2中拉取了8 條,這種情況下,假如broker1宕機,那麼我們消費資料的時候,只能消費到broker2中的8條,broker1中的12條已經持久化到中。需要broker1回覆之後這12條資料才能繼續被消費。
非同步複製和同步雙寫主要是主和從的關係。訊息需要實時消費的,就需要採用主從模式部署
非同步複製:比如這裡有一主一從,我們傳送一條訊息到主節點之後,這樣訊息就算從producer端傳送成功了,然後通過非同步複製的方法將資料複製到從節點
同步雙寫:比如這裡有一主一從,我們傳送一條訊息到主節點之後,這樣訊息就並不算從producer端傳送成功了,需要通過同步雙寫的方法將資料同步到從節點後, 才算資料傳送成功。
四、刷盤方式
同步刷盤:在訊息到達MQ後,RocketMQ需要將資料持久化,同步刷盤是指資料到達記憶體之後,必須刷到commitlog日誌之後才算成功,然後返回producer資料已經發送成功。
非同步刷盤:,同步刷盤是指資料到達記憶體之後,返回producer說資料已經發送成功。,然後再寫入commitlog日誌。
commitlog:
commitlog就是來儲存所有的元資訊,包含訊息體,類似於Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使資料丟失,仍然可以恢復出來。
consumequeue:記錄資料的位置,以便Consume快速通過consumequeue找到commitlog中的資料