原始碼分析RocketMQ訊息拉取拉模式PULL
消費者 與 訊息儲存方Broker一般有兩種通訊機制:推(PUSH)、拉(PULL)
推模式:訊息傳送者將訊息傳送到Broker,然後Broker主動推送給訂閱了該訊息的消費者。
拉模式:訊息傳送者將訊息傳送到Broker上,然後由訊息消費者自發的向Broker拉取訊息。
RocketMQ推拉機制實現:
嚴格意義上來講,RocketMQ並沒有實現PUSH模式,而是對拉模式進行一層包裝,在消費端開啟一個執行緒PullMessageService迴圈向Broker拉取訊息,一次拉取任務結束後馬上又發起另一次拉取操作,實現準實時自動拉取,,PUSH模式的實現請參考如下博文:
推模式訊息拉取機制:
推模式訊息佇列負載機制:http://blog.csdn.net/prestigeding/article/details/78927447
本文重點在討論RocketMQ拉模式DefaultMQPullConsumer實現。
RocketMQ拉模式,RocketMQ消費者不自動向訊息伺服器拉取訊息,而是將控制權移交給應用程式,RocketMQ消費者只是提供拉取訊息API。
為了對RocketMQ 拉模式有一個直觀的瞭解,我們先大概瀏覽一下MQPullConsumer介面:
從上面我們可以看到除了啟動、關閉,註冊訊息監聽器,其他的就是針對MessageQueue拉取訊息,特別值得留意的是每一個拉取pull方法,都是直接針對訊息消費佇列。PUSH模式可以說基於訂閱與釋出模式,而PULL模式可以說是基於訊息佇列模式。
特別說明:PULL模式根據主題註冊訊息監聽器,這裡的訊息監聽器,不是用來訊息消費的,而是在該主題的佇列負載發生變化時,做一下通知。
下文,我們應該帶著我們對PUSH模式的相關知識來認識一下PULL模式,對比學習:
PUSH模式主要知識點:
1)訊息拉取機制:PullMessageServer執行緒 根據PullRequest拉取任務迴圈拉取。
2)訊息佇列負載機制,按照消費組,對主題下的訊息佇列,結合當前消費組內消費者數量動態負載。
按照上面API的描述,PULL模式應該無需考慮上面兩個情形,我們帶著上述疑問,開始我們今天的學習。
1、DefaultMQPullConsumer 核心屬性
/**
* Do the same thing for the same Group, the application must be set,and
* guarantee Globally unique
*/
private String consumerGroup;
/**
* Long polling mode, the Consumer connection max suspend time, it is not
* recommended to modify
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
* Long polling mode, the Consumer connection timeout(must greater than
* brokerSuspendMaxTimeMillis), it is not recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
* The socket timeout in milliseconds
*/
private long consumerPullTimeoutMillis = 1000 * 10;
/**
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* Message queue listener
*/
private MessageQueueListener messageQueueListener;
/**
* Offset Storage
*/
private OffsetStore offsetStore;
/**
* Topic set you want to register
*/
private Set<String> registerTopics = new HashSet<String>();
/**
* Queue allocation algorithm
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
/**
* Whether the unit of subscription group
*/
private boolean unitMode = false;
private int maxReconsumeTimes = 16;
consumerGroup : 消費組名稱
brokerSuspendMaxTimeMillis :長輪詢模式下掛起的最大超時時間,在Broker端根據偏移量從儲存
檔案中查詢訊息時如果返回PULL_NOT_FOUND時,不理解返回給拉取客戶端,而是交給
PullRequestHoldService執行緒,每隔5秒再去拉取一次訊息,如果找到則返回給訊息拉取客
戶端,否則超時。
consumerTimeoutMillisWhenSuspend : 整個訊息拉取過程中,拉取客戶端等待伺服器響應結果的超時時間,預設30S
consumerPullTimeoutMillis :預設10s,拉訊息時建立網路連線的超時時間
messageModel :消費模式,廣播、叢集
messageQueueListener : 業務訊息監聽器
OffsetStore :訊息消費進度管理器
registerTopics :註冊主題數
allocateMessageQueueStrategy :佇列分配器
maxReconsumeTimes :最大訊息重試次數,預設16次
2、訊息消費者啟動流程分析,DefaultMQPullConsumerImpl#start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription(); // @1
if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); // @2
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); // @3
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // @:4
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); // @5
}
this.offsetStore.load();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); // @6
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start(); // @7
log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
程式碼@1:根據註冊的主題,構建訂閱資訊,放入到RebalanceImpl的訂閱表中。PS:DefaultMQPullConsumerImpl 可以註冊多個主題,但多個主題使用同一個訊息處理監聽器。
程式碼@2:建立MQClientInstance,每一個clientConfig一個MqClientInstance物件。
程式碼@3:填充rebalanceImpl 物件的消費組、訊息佇列分配器、消費模式。這裡的作用是什麼?既然無需負載訊息佇列,為什麼需要這一步???
程式碼@4:構建PullAPIWrapper物件,該物件封裝了具體拉取訊息的邏輯,PULL,PUSH模式最終都會呼叫PullAPIWrapper類的方法從Broker拉取訊息。
程式碼@5:根據叢集消費模式(廣播、叢集)初始化訊息進度管理器offsetStore。
程式碼@6:將該消費者加入到MQClientInstance消費者列表中。
程式碼@7:啟動MQClientInstance。該方法我們在講解DefaultMQPushConsumer時相信講解過,我們再簡單瀏覽一下該方法:
既然Pull模式無需自動拉取訊息,但PullMessageService執行緒(訊息拉取)+ RebalanceService執行緒(訊息佇列負載)這個兩個執行緒就沒必要啟動,這裡啟動了,會不會帶來問題?
答案是不會,因為雖然PullMessageService執行緒啟動,但是一開始會在獲取拉取任務(PullRequest)
PullRequest是有RebalanceService產生,它根據主題訊息佇列個數和當前消費組內消費者個數進行負載,然後產生對應的PullRequest物件,再將這些物件放入到PullMessageService的pullRequestQueue佇列。具體放入邏輯呼叫:RebalanceImpl#dispatchPullRequest(final List pullRequestList);
我們來看一下RebalanceImpl的子類RebalancePullImpl的dispatchPullRequest方法:
再對比一下RebalancePushImpl的dispatchPullRequest,
再結合PullMessageService被喚醒後,執行的pullMessage方法:
我們可以得出結論,PullMessageService 只為PUSH模式服務,ReblanceService進行路由重新分佈時,如果是RebalancePullImpl,並不會產PullRequest,從而喚醒PullMessageService,PullMessageService被 喚醒後,也是執行DefaultMQPushConsumerImpl的pullMessage方法。
ReblanceService執行緒預設每20S進行一次訊息佇列重新負載,判斷訊息佇列是否需要進行重新分佈(如果消費者個數和主題的佇列數沒有發生改變),則繼續保持原樣。對於PULL模型,如果消費者需要監聽某些主題佇列發生事件,註冊訊息佇列變更事件方法,則RebalanceService會將訊息佇列負載變化事件通知消費者。
至於PULL模式那些根據訊息佇列拉取訊息的方法,與PUSH模式走的邏輯是一樣的,唯一的區別是PULL模式是需要應用程式收到觸發訊息拉取動作。
通過上述分析,我們總結一下RocketMQ,PUSH,PULL模式區別:
PUSH: 消費者訂閱主題,然後自動進行叢集內訊息佇列的動態負載,自動拉取訊息。準實時。
PULL:消費者無需訂閱主題,由業務方(應用程式)直接根據MessageQueue拉取訊息。
專案中一般採用PUSH模式。