RocketMQ詳解(10)——Consumer詳解
RocketMQ詳解(10)——消費模式詳解
一. 不同型別的消費者
根據使用者對讀取操作的控制情況,消費在可以分為兩種型別:
- DefaultMQPushConsumer:有系統控制讀取操作,收到訊息後自動呼叫監聽器回撥處理。
- DefaultMQPullConsumer:讀取操作中的大部分功能由使用者自主控制。
二. DefaultMQPushConsumer的使用
使用DefaultMQPushConsumer主要是設定好各種引數和傳入處理訊息的回撥方法。系統收到訊息後會自動呼叫回撥方法來處理訊息,自動儲存Offset,並且加入新的DefaultMQPushConsumer後會自動做負載均衡。
示例程式碼
package william.rmq.consumer.quickstart; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
DefaultMQPushConsumer需要設定三個引數:
- 這個Consumer所屬的ConsumerGroup
- NameServer的IP和埠
- 訂閱的Topic名稱
下面對這幾個引數進行詳細介紹:
ConsumerGroup用於把多個Consumer組織到一起,提高併發處理能力,ConsumerGroup需要和訊息模式MessageModel配合使用。
RocketMQ支援兩種訊息模式:
- MessageModel.CLUSTERING——叢集模式:同一個ConsumerGroup裡面的每個Consumer只消費所訂閱的訊息的一部分內容,同一個ConsumerGroup下所有Consumer消費的內容合起來才是所訂閱的Topic內容的整體,從而達到負載均衡的目的。
- MessageModel.BROADCASTING——廣播模式:同一個ConsumerGroup下的每個Consumer都能消費到所訂閱Topic的所有訊息,也就是一個訊息會被多次分發,被多個Consumer消費。
NameServer的ip和埠號,可以填寫多個,用分號隔開,達到消除單點故障的目的,如”ip1:port1;ip2:port2”
Topic名稱用來標識訊息型別,需要提前建立。如果不想消費某個Topic下的所有訊息,可以通過指定Tag進行訊息過濾,如Consumer.subscribe(“TopicTest”,”tags1 || tag2 || tag3”),表示這個Consumer要消費TopicTest主體下的帶有tag1或tag2或tag3的訊息(Tag指的是在傳送訊息時設定的標籤)。在設定Tag引數時,用null或者”*”表示要消費這個Topic下的所有訊息。
三. DefaultMQPushConsumer的處理流程
本節結合原始碼分析DefaultMQPushConsumer的處理流程。
DefaultMQPushConsumer主要功能實現在DefaultMQPushConsumerImpl中,訊息處理邏輯是在pullMessage()方法的PullCallback回撥中。在PullCallback回撥中有個switch語句,根據Broker返回的訊息型別做響應的處理,具體邏輯看原始碼:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
PullCallback是一個訊息拉取回調,Consumer從Broker拉取訊息會,會根據拉取狀態回撥對應的onSuccess或onException方法。在onSuccess()的處理中,會根據不同的PullStatus進行不同的處理,PullStatus的狀態有:
- PullStatus.FOUND:成功拉取訊息
- PullStatus.NO_NEW_MSG:沒有新的訊息可被拉取
- PullStatus.NO_MATCHED_MSG:過濾結果不匹配
- PullStatus.OFFSET_ILLEGAL:offset非法
DefaultMQPushConsumer中有很多PullRequest的方法,如executePullRequestImmediately(),之所以在PushConsumer中也使用PullRequest的方式拉取訊息,是因為RocketMQ通過長輪詢的方式來實現Push和Pull兩種模式,長輪詢可以即有Pull的優點,又兼具Push的實時性。
Push方式是Broker端接收到訊息後,主動把訊息推給Consumer端,實時性高。對於一個提供訊息佇列服務的Server來說,用Push方式會有很多弊端:首先是訊息的流量難以控制,當Push的訊息過多時會加大Server的工作量,進而影響Server的效能;其次,Client的處理能力各不相同,且Client的狀態不受Server控制,如果Client不能及時處理Server推送過來的訊息,在造成訊息堆積等各種潛在的問題。
Pull方式是Client端迴圈地從Server端拉取訊息,主動權在Client手裡,自己拉取到一定量的訊息後,妥當處理完畢再繼續拉取。Pull方式的問題是迴圈拉取的時間間隔不好設定,間隔太短就會處於”忙等”的狀態,浪費資源;間隔太長又可能導致Server端有訊息到來時沒有及時被處理。
“長輪詢”方式通過Client端和Server端的配合,達到既擁有Pull的優點,又保證實時性的目的。
“長輪詢”的核心是,Broker端HOLD住客戶端的請求一小段時間,如果在這段時間內有訊息到達,就利用現有的連結立刻返回訊息給Consumer。”長輪詢”的主動權還是掌握在Consumer手上,即使Broker有大量的訊息積壓,也不會主動推送給Consumer。
長輪詢方式的侷限性在於,HOLD住Consumer端請求時,需要佔用資源,它適合用在訊息佇列這種客戶端連線數可控的場景中。
四. DefaultMQPullConsumer的處理流程
使用DefaultMQPullConsumer和使用DefaultMQPushConsumer一樣需要設定各種引數,寫處理訊息的回撥方法。此外,還需要進行額外的處理。下面給出使用的示例程式碼:
package william.rmq.consumer.pull; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import william.rmq.common.constant.RocketMQConstant; import javax.annotation.PostConstruct; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * @Auther: ZhangShenao * @Date: 2018/9/15 09:25 * @Description:使用DefaultMQPullConsumer拉取訊息 */ @Service @Slf4j public class PullMessageConsumer { /**記錄每個MessageQueue的消費位點offset,可以持久化到DB或快取Redis,這裡作為演示就儲存在程式中*/ private static final Map<MessageQueue,Long> OFFSET_TABLE = new ConcurrentHashMap<>(); @Value("${spring.rocketmq.namesrvAddr}") private String namesrvAddr; /**使用DefaultMQPullConsumer實現拉取訊息模式*/ private DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("DefaultPullConsumer"); /**每次拉取訊息的最大數量*/ private static final int MAX_PULL_SIZE_EACH_TIME = 32; @PostConstruct public void start() { try { //設定namesrv地址 consumer.setNamesrvAddr(namesrvAddr); //啟動消費端 consumer.start(); System.err.println("Order Message Consumer Start..."); //從指定的Topic pull訊息 Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(RocketMQConstant.TEST_TOPIC_NAME); //遍歷MessageQueue,獲取Message for (MessageQueue messageQueue : messageQueues){ //獲取該MessageQueue的消費位點 long offset = consumer.fetchConsumeOffset(messageQueue, true); System.err.println("Consumer From Queue: " + messageQueue + ",from offset: " + offset); while (true){ try { //拉取訊息 PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), MAX_PULL_SIZE_EACH_TIME); System.err.println("Pull Message Result: " + pullResult); //記錄offset saveMessageQueueOffset(messageQueue,pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()){ //拉取到訊息 case FOUND: break; //沒有匹配的訊息 case NO_MATCHED_MSG: break; //暫時沒有新訊息 case NO_NEW_MSG: continue; //offset非法 case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e){ log.error("Pull Message Error!!",e); } } } //關閉Consumer consumer.shutdown(); } catch (Exception e) { throw new RuntimeException(e); } } private long getMessageQueueOffset(MessageQueue messageQueue){ Long offset = OFFSET_TABLE.get(messageQueue); return (offset != null ? offset : 0); } private void saveMessageQueueOffset(MessageQueue messageQueue,long offset){ OFFSET_TABLE.put(messageQueue,offset); } }
分別啟動生產端和消費端程式,可看到消費端控制檯列印如下:
Order Message Consumer Start... Consumer From Queue: MessageQueue [topic=DefaultCluster, brokerName=zhangshenao, queueId=0],from offset: 0 Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=57, msgFoundList=32] Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=25] Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0] Pull Message Result: PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=57, minOffset=0, maxOffset=57, msgFoundList=0] Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=58, minOffset=0, maxOffset=58, msgFoundList=1] Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=59, minOffset=0, maxOffset=59, msgFoundList=1] Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=60, minOffset=0, maxOffset=60, msgFoundList=1] Pull Message Result: PullResult [pullStatus=FOUND, nextBeginOffset=61, minOffset=0, maxOffset=61, msgFoundList=1]
示例程式碼的處理邏輯是遍歷指定Topic下的所有MessageQueue,然後從中pull訊息,並記錄消費的offset。主要包括下面三件事:
獲取MessageQueue並遍歷
一個Topic包含多個MessageQueue,如果這個Consumer需要獲取Topic下的所有訊息,就要遍歷所有的MessageQueue。如果有特殊情況,也可以選擇某些指定的MessageQueue來消費。
維護Offset
從一個MessageQueue中拉取訊息時,要傳入Offset引數,隨著不斷讀取訊息,offset不斷增加,這個時候需要使用者把offset儲存下來,根據具體情況可以儲存在記憶體中、寫到磁碟或資料庫等。
根據不同的拉取狀態做不同的處理
拉取訊息的請求發出後,會返回FOUND、NO_NEW_MSG、NO_MATCHED_MSG和OFFSET_ILLEGAL四種狀態。需要根據每個狀態做不同的處理。比較重要的兩個狀態是FOUND和NO_NEW_MSG,分別表示拉取到新的訊息和沒有新的訊息。
五. Consumer的啟動、關閉流程
- Consumer分為Push和Pull兩種模式,對於DefaultMQPullConsumer來說,使用者主動權很高,可以根據實際需要啟動、暫停和停止消費過程。需要特別注意的是offset的儲存,要在程式的異常處理部分考慮把offset寫入磁碟的處理,準確記錄每個MessageQueue消費的offset,才能保證消費的準確性。
- DefaultMQPushConsumer的退出,要顯式呼叫shutdown()方法,以便釋放資源、儲存offset等。這個呼叫要加到Consumer所在應用的退出邏輯中。DefaultMQPushConsumer在啟動時,會檢查各種配置,然後連線NameServer獲取Topic資訊。如果啟動時出現異常,如無法連線NameServer,程式仍然可以正常啟動不報錯(日誌會列印Warn資訊)。在單機情況下,可以故意寫錯NameServer的地址模擬這種異常。
- 之所以DefaultMQPushConsumer在無法連線NameServer時仍然能正常啟動,是考慮到分散式系統的設計。RocketMQ叢集可以有多個NameServer和Broker,某個節點出現異常後整個叢集仍然可用。所以DefaultMQPushConsumer在出現連線異常時不是立刻退出,而是不斷嘗試重連。