RocketMQ之Pull消費者客戶端啟動
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
這裡只是簡單設定了consumerGroup消費者組名,表示消費者屬於哪個組。構造了DefaultMQPullConsumerImpl的例項,DefaultMQPullConsumerImpl的構造方法很簡單,只是綁定了DefaultMQPullConsumer、配置了傳入的rpcHook。
DefaultMQPullConsumer內部封裝了DefaultMQPullConsumerImpl,其中還維護這一些配置資訊。這裡維護著消費者訂閱的topic集合。
private Set<String> registerTopics = new HashSet<String>();
整個消費者客戶端的啟動,呼叫了DefaultMQPullConsumer的start()方法,內部直接呼叫DefaultMQPullConsumerImpl的start()方法,這個start方法加了synchronized修飾。public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPullConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer , this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); } else { switch (this.defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer .getConsumerGroup()); break; default: break; } this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl .GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
一開始的serverState的狀態自然為CREAT_JUST,呼叫checkConfig(),其中先是對ConsumerGroup進行驗證,非空,合法(符合正則規則,且長度不超過配置最大值),且不為預設值(防止消費者叢集名衝突),然後對消費者訊息模式、訊息佇列分配演算法進行非空、合法校驗。
關於消費者訊息模式有BroadCasting(廣播)跟Clustering(叢集)兩種、預設是Clustering(叢集)配置在DefaultMQPullConsumer中。關於消費者的訊息分配演算法,在DefaultMQPullConsumer中實現有預設的訊息分配演算法,allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();(平均分配演算法)。其實現了AllocateMessageQueueStrategy介面,重點看其實現的allocate()方法。
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
傳入的引數有當前消費者id,所有訊息佇列陣列,以及當前所有消費者陣列。先簡單驗證非空,再通過消費者陣列大小跟訊息佇列大小根據平均演算法算出當前消費者該分配哪些訊息佇列集合。邏輯不難。RocketMQ還提供了迴圈平均、一致性雜湊、配置分配等演算法,這裡預設採用平均分配。
我們再回到DefaultMQPullConsumerImpl的start()方法,checkConfig後,呼叫copySubscription()方法,將配置在DefaultMQPullConsumer中的topic資訊構造成並構造成subscriptionData資料結構,以topic為key以subscriptionData為value以鍵值對形式存到rebalanceImpl的subscriptionInner中。
private void copySubscription() throws MQClientException {
try {
Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
接下來從MQCLientManager中得到MQClient的例項,這個步驟跟生產者客戶端相同。
再往後是對rebalanceImpl的配置,我們重點看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成員中直接構造private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);即在DefaultMQPullConsumerImpl初始化的時候構造。接下來對其消費者組名、訊息模式(預設叢集)、佇列分配演算法(預設平均分配)、消費者客戶端例項進行配置,配置資訊都是從DefaultMQPullConsumer中取得。
public abstract class RebalanceImpl {
protected static final Logger log = ClientLogger.getLog();
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
接下來構造了PullAPIWrapper,僅僅呼叫其構造方法,簡單的配置下 public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
this.mQClientFactory = mQClientFactory;
this.consumerGroup = consumerGroup;
this.unitMode = unitMode;
}
然後初始化消費者的offsetStore,offset即偏移量,可以理解為消費進度,這裡根據不同的訊息模式來選擇不同的策略。如果是廣播模式,那麼所有消費者都應該收到訂閱的訊息,那麼每個消費者只應該自己消費的消費佇列的進度,那麼需要把消費進度即offsetStore存於本地採用LocalFileOffsetStroe,相反的如果是叢集模式,那麼叢集中的消費者來平均消費訊息佇列,那麼應該把消費進度存於遠端採用RemoteBrokerOffsetStore。然後呼叫相應的load方法載入。
之後將當前消費者註冊在MQ客戶端例項上之後,呼叫MQClientInstance的start()方法,啟動消費者客戶端。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed."
, null);
default:
break;
}
}
}
看到這裡應該很熟悉,跟生產者客戶端這裡是同一段程式碼,無非解析路由訊息並完成路由訊息的配置,啟動netty客戶端,啟動定時任務(定時更新從名稱伺服器獲取路由資訊更新本地路由資訊,心跳,調整執行緒數量),後面啟動pull server、rebalance service、push service最後把serviceState狀態設為Running表示客戶端啟動。
我們在這裡重點看下RebalanceService的啟動。下面貼出的是RebalanceService的run()方法。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
可以看到,只要這個執行緒沒有被停止(客戶端沒關閉),會一直迴圈呼叫客戶端的doRebalance()方法。 public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
MQClientInstance遍歷consumerTable(之前註冊的時候以consumerGroup為key,以消費者客戶端DefaultMQPullConsumerImpl為value存入consumerTable中)中的每個元素,迴圈呼叫其元素的doRebalance()方法。那我們看DefaultMQPullConsumerImpl的doRebalance方法。 @Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
}
直接呼叫了rebalanceImpl的doRebalance方法 public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
可以看到先得到subTable即subscriptionInner,之前根據配置的每個topic生成的SubscriptionData資料結構的map。先遍歷該map,得到每個topic,針對每個topic呼叫rebalanceByTopic() private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
strategy.getName(), e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}
, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
我們先重點關注叢集模式下,先得到topic的本地路由資訊,再通過topic跟這個消費者的組名,呼叫netty客戶端的同步網路訪問topic指定的broker,從broker端得到與其連線的且是指定消費組名下訂閱指定topic的消費者id的集合。然後採用預設的分配演算法的allocate()進行佇列給消費者平均分配。然後呼叫updateProcessQueueTableInRebalance()方法判斷是否重新佇列分配。 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
先遍歷processQueueTable,看其topic下的該處理訊息佇列是否還是應該處理,由於新分配之後,訊息佇列可能會改變,所以原該處理的訊息佇列可能沒必要處理,因此沒必要處理的訊息佇列移除。當然也有可能多出需要處理的訊息佇列,於是需要建立其與processQueue的對應關係,先呼叫computerPullFromWhere得到該條訊息下次拉取資料的位置,在RebalancePullImpl中實現了該方法直接返回0,把該處理的mq封裝成pq後,更新到processQueueTable中。若有更新,無論是增加還是刪除,則changed都設為true。(這個地方講的有點模糊,他是客戶端pull與push區別的關鍵,實際上push不過是在pull之上封裝了下操作,後面我們會重新回來分析。)
方法返回後,如果changed為true,會呼叫messageQueueChanged方法來通知配置在DefaultMQPullConsumer中的相關messageQueueListener,我們可以看到RebalancePullImpl中的實現。
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
廣播模式則比較簡單,由於所有消費者都要處理,少了佇列分配這個步驟。