RocketMQ中PullConsumer的啟動原始碼分析
通過DefaultMQPullConsumer作為預設實現,這裡的啟動過程和Producer很相似,但相比複雜一些
【RocketMQ中Producer的啟動原始碼分析】
DefaultMQPullConsumer的構造方法:
1 public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { 2 this.consumerGroup = consumerGroup; 3 defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); 4 }
這裡會封裝一個DefaultMQPullConsumerImpl,類似於Producer中DefaultMQProducerImpl
DefaultMQPullConsumerImpl:
1 public class DefaultMQPullConsumerImpl implements MQConsumerInner { 2 private final InternalLogger log = ClientLogger.getLog(); 3 private final DefaultMQPullConsumer defaultMQPullConsumer; 4 private final long consumerStartTimestamp = System.currentTimeMillis(); 5 private final RPCHook rpcHook; 6 private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); 7 private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); 8 private volatile ServiceState serviceState = ServiceState.CREATE_JUST; 9 private MQClientInstance mQClientFactory; 10 private PullAPIWrapper pullAPIWrapper; 11 private OffsetStore offsetStore; 12 private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); 13 14 public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { 15 this.defaultMQPullConsumer = defaultMQPullConsumer; 16 this.rpcHook = rpcHook; 17 } 18 ...... 19 }
如上會封裝這些東西,在後面遇到了再詳細介紹
而DefaultMQPullConsumer的start方法,其實際上呼叫的是DefaultMQPullConsumerImpl的start方法
DefaultMQPullConsumerImpl的start方法:
1 public synchronized void start() throws MQClientException { 2 switch (this.serviceState) { 3 case CREATE_JUST: 4 this.serviceState = ServiceState.START_FAILED; 5 6 this.checkConfig(); 7 8 this.copySubscription(); 9 10 if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { 11 this.defaultMQPullConsumer.changeInstanceNameToPID(); 12 } 13 14 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); 15 16 this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); 17 this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); 18 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); 19 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); 20 21 this.pullAPIWrapper = new PullAPIWrapper( 22 mQClientFactory, 23 this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); 24 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); 25 26 if (this.defaultMQPullConsumer.getOffsetStore() != null) { 27 this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); 28 } else { 29 switch (this.defaultMQPullConsumer.getMessageModel()) { 30 case BROADCASTING: 31 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); 32 break; 33 case CLUSTERING: 34 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); 35 break; 36 default: 37 break; 38 } 39 this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); 40 } 41 42 this.offsetStore.load(); 43 44 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); 45 if (!registerOK) { 46 this.serviceState = ServiceState.CREATE_JUST; 47 48 throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() 49 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), 50 null); 51 } 52 53 mQClientFactory.start(); 54 log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); 55 this.serviceState = ServiceState.RUNNING; 56 break; 57 case RUNNING: 58 case START_FAILED: 59 case SHUTDOWN_ALREADY: 60 throw new MQClientException("The PullConsumer service state not OK, maybe started once, " 61 + this.serviceState 62 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 63 null); 64 default: 65 break; 66 } 67 68 }
首先checkConfig方法會對配置做檢查
接著copySubscription方法:
1 private void copySubscription() throws MQClientException { 2 try { 3 Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); 4 if (registerTopics != null) { 5 for (final String topic : registerTopics) { 6 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), 7 topic, SubscriptionData.SUB_ALL); 8 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 9 } 10 } 11 } catch (Exception e) { 12 throw new MQClientException("subscription exception", e); 13 } 14 }
這裡的registerTopics是由使用者呼叫setRegisterTopics方法註冊進來的Topic集合
在這裡會將集合中的Topic包裝成SubscriptionData儲存在rebalanceImpl中
SubscriptionData:
1 public class SubscriptionData implements Comparable<SubscriptionData> { 2 public final static String SUB_ALL = "*"; 3 private boolean classFilterMode = false; 4 private String topic; 5 private String subString; 6 private Set<String> tagsSet = new HashSet<String>(); 7 private Set<Integer> codeSet = new HashSet<Integer>(); 8 private long subVersion = System.currentTimeMillis(); 9 private String expressionType = ExpressionType.TAG; 10 ...... 11 }
RebalanceImpl:
1 public abstract class RebalanceImpl { 2 protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); 3 protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = 4 new ConcurrentHashMap<String, Set<MessageQueue>>(); 5 protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = 6 new ConcurrentHashMap<String, SubscriptionData>(); 7 protected String consumerGroup; 8 protected MessageModel messageModel; 9 protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; 10 protected MQClientInstance mQClientFactory; 11 ...... 12 }
回到start方法,接著和Producer中一樣通過MQClientManager獲取一個MQClientInstance
然後會完成對rebalanceImpl屬性的填充
接著會例項化一個PullAPIWrapper,同時向其註冊過濾器的鉤子,這個物件在之後分析訊息拉取時詳細介紹
接下來會根據訊息的模式,決定使用不同方式的OffsetStore
1 public enum MessageModel { 2 /** 3 * broadcast 4 */ 5 BROADCASTING("BROADCASTING"), 6 /** 7 * clustering 8 */ 9 CLUSTERING("CLUSTERING"); 10 ...... 11 }
分別是廣播模式和叢集模式
廣播模式(BROADCASTING):同一個ConsumerGroup裡的每個Consumer都能消費到所訂閱Topic的全部訊息,也就是一個訊息會被多次分發,被多個Consumer消費
叢集模式(CLUSTERING):同一個ConsumerGroup裡的每個Consumer只消費所訂閱訊息的一部分內容,同一個ConsumerGroup裡所有的Consumer消費的內容合起來才是所訂閱Topic內容的整體
採用廣播模式,消費者的消費進度offset會被儲存在本地;而採用叢集模式,消費者的消費進度offset會被儲存在遠端(broker)上
故廣播模式使用LocalFileOffsetStore,叢集模式使用RemoteBrokerOffsetStore
在採用廣播模式,即LocalFileOffsetStore,呼叫load方法會對其配置檔案offsets.json進行載入,而RemoteBrokerOffsetStore時沒意義的非同步操作
LocalFileOffsetStore的load方法:
1 public void load() throws MQClientException { 2 OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); 3 if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { 4 offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); 5 6 for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { 7 AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); 8 log.info("load consumer's offset, {} {} {}", 9 this.groupName, 10 mq, 11 offset.get()); 12 } 13 } 14 }
readLocalOffset方法會將offsets.json檔案中的json字串轉換成OffsetSerializeWrapper物件封裝
1 public class OffsetSerializeWrapper extends RemotingSerializable { 2 private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = 3 new ConcurrentHashMap<MessageQueue, AtomicLong>(); 4 5 public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() { 6 return offsetTable; 7 } 8 9 public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) { 10 this.offsetTable = offsetTable; 11 } 12 }
從這裡就可裡大致理解json檔案中的內容,其中AtomicLong就對應MessageQueue下具體的Offset
之後在load方法中,會將該map儲存在LocalFileOffsetStore中的offsetTable中
接著會呼叫mQClientFactory的start方法,這個方法在【RocketMQ中Producer的啟動原始碼分析】中進行過分析
1 public void start() throws MQClientException { 2 synchronized (this) { 3 switch (this.serviceState) { 4 case CREATE_JUST: 5 this.serviceState = ServiceState.START_FAILED; 6 // If not specified,looking address from name server 7 if (null == this.clientConfig.getNamesrvAddr()) { 8 this.mQClientAPIImpl.fetchNameServerAddr(); 9 } 10 // Start request-response channel 11 this.mQClientAPIImpl.start(); 12 // Start various schedule tasks 13 this.startScheduledTask(); 14 // Start pull service 15 this.pullMessageService.start(); 16 // Start rebalance service 17 this.rebalanceService.start(); 18 // Start push service 19 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 20 log.info("the client factory [{}] start OK", this.clientId); 21 this.serviceState = ServiceState.RUNNING; 22 break; 23 case RUNNING: 24 break; 25 case SHUTDOWN_ALREADY: 26 break; 27 case START_FAILED: 28 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); 29 default: 30 break; 31 } 32 } 33 }
首先若是沒有設定NameServer的地址,會呼叫fetchNameServerAddr方法進行自動定址,詳見Producer的啟動
之後mQClientAPIImpl的start方法會完成對Netty客戶端的繫結操作,詳見Producer的啟動
startScheduledTask方法則會設定五個定時任務:
①若是名稱服務地址namesrvAddr不存在,則呼叫前面的fetchNameServerAddr方法,定時更新名稱服務
②定時更新Topic所對應的路由資訊
③定時清除離線的Broker,以及向當前線上的Broker傳送心跳包
(以上詳見Producer的啟動)
④定時持久化消費者佇列的消費進度
DefaultMQPullConsumerImpl中的實現:
1 public void persistConsumerOffset() { 2 try { 3 this.makeSureStateOK(); 4 Set<MessageQueue> mqs = new HashSet<MessageQueue>(); 5 Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); 6 mqs.addAll(allocateMq); 7 this.offsetStore.persistAll(mqs); 8 } catch (Exception e) { 9 log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); 10 } 11 }
首先從rebalanceImpl中取出所有處理的消費佇列MessageQueue集合
然後呼叫offsetStore的persistAll方法進一步處理該集合
由於廣播模式和叢集模式,所以這裡有兩種實現:
廣播模式LocalFileOffsetStore的persistAll方法:
1 public void persistAll(Set<MessageQueue> mqs) { 2 if (null == mqs || mqs.isEmpty()) 3 return; 4 5 OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); 6 for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 7 if (mqs.contains(entry.getKey())) { 8 AtomicLong offset = entry.getValue(); 9 offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); 10 } 11 } 12 13 String jsonString = offsetSerializeWrapper.toJson(true); 14 if (jsonString != null) { 15 try { 16 MixAll.string2File(jsonString, this.storePath); 17 } catch (IOException e) { 18 log.error("persistAll consumer offset Exception, " + this.storePath, e); 19 } 20 } 21 }
這裡和之前的load方法相反,會將MessageQueue對應的offset資訊替換掉原來的json檔案中的內容
這樣就完成了廣播模式下定時持久化消費者佇列的消費進度
叢集模式RemoteBrokerOffsetStore的persistAll方法的實現:
1 public void persistAll(Set<MessageQueue> mqs) { 2 if (null == mqs || mqs.isEmpty()) 3 return; 4 5 final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); 6 if (!mqs.isEmpty()) { 7 for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 8 MessageQueue mq = entry.getKey(); 9 AtomicLong offset = entry.getValue(); 10 if (offset != null) { 11 if (mqs.contains(mq)) { 12 try { 13 this.updateConsumeOffsetToBroker(mq, offset.get()); 14 log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", 15 this.groupName, 16 this.mQClientFactory.getClientId(), 17 mq, 18 offset.get()); 19 } catch (Exception e) { 20 log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); 21 } 22 } else { 23 unusedMQ.add(mq); 24 } 25 } 26 } 27 } 28 29 if (!unusedMQ.isEmpty()) { 30 for (MessageQueue mq : unusedMQ) { 31 this.offsetTable.remove(mq); 32 log.info("remove unused mq, {}, {}", mq, this.groupName); 33 } 34 } 35 }
和上面類似,遍歷offsetTable中的內容,只不過不是儲存在了本地,而是通過updateConsumeOffsetToBroker向Broker傳送
updateConsumeOffsetToBroker方法:
1 private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, 2 MQBrokerException, InterruptedException, MQClientException { 3 updateConsumeOffsetToBroker(mq, offset, true); 4 } 5 6 public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, 7 MQBrokerException, InterruptedException, MQClientException { 8 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); 9 if (null == findBrokerResult) { 10 11 this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); 12 findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); 13 } 14 15 if (findBrokerResult != null) { 16 UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); 17 requestHeader.setTopic(mq.getTopic()); 18 requestHeader.setConsumerGroup(this.groupName); 19 requestHeader.setQueueId(mq.getQueueId()); 20 requestHeader.setCommitOffset(offset); 21 22 if (isOneway) { 23 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( 24 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); 25 } else { 26 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( 27 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); 28 } 29 } else { 30 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 31 } 32 }
首先根據BrokerName查詢Broker的路由資訊:
1 public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { 2 String brokerAddr = null; 3 boolean slave = false; 4 boolean found = false; 5 6 HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); 7 if (map != null && !map.isEmpty()) { 8 for (Map.Entry<Long, String> entry : map.entrySet()) { 9 Long id = entry.getKey(); 10 brokerAddr = entry.getValue(); 11 if (brokerAddr != null) { 12 found = true; 13 if (MixAll.MASTER_ID == id) { 14 slave = false; 15 } else { 16 slave = true; 17 } 18 break; 19 20 } 21 } // end of for 22 } 23 24 if (found) { 25 return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); 26 } 27 28 return null; 29 }
brokerAddrTable中的borker的路由資訊會由 ②定時更新Topic所對應的路由資訊 ,來完成更新,在brokerAddrTable中只要找的一個Broker的資訊後,將其封裝為FindBrokerResult返回
若是沒有找到會執行updateTopicRouteInfoFromNameServer方法,也就是執行了一次定時任務中的方法,立即更新一次,再通過findBrokerAddressInAdmin方法,重新查詢
找到之後,例項化一個請求頭 UpdateConsumerOffsetRequestHeader,將相應資訊封裝,由於使用的是Oneway模式,所以這裡採用updateConsumerOffsetOneway方法,通過Netty向Broker傳送
1 public void updateConsumerOffsetOneway( 2 final String addr, 3 final UpdateConsumerOffsetRequestHeader requestHeader, 4 final long timeoutMillis 5 ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, 6 InterruptedException { 7 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); 8 9 this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); 10 }
其實這裡就非常簡單地呼叫了invokeOneway方法,完成向Broker的訊息單向傳送
【RocketMQ中Producer訊息的傳送原始碼分析】
非OneWay則採用同步傳送
這樣,在叢集模式下,消費進度也就交給了Broker管理,之後的負載均衡以此為基礎
⑤定時調整消費者端的執行緒池的大小
這裡針對的是PushConsumer,後續部落格再介紹
對於PullConsumer來說rebalanceService服務的開啟才是最重要的
RebalanceService:
1 public void run() { 2 log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 this.waitForRunning(waitInterval); 6 this.mqClientFactory.doRebalance(); 7 } 8 9 log.info(this.getServiceName() + " service end"); 10 }
這裡的waitForRunning和Broker的刷盤以及主從複製類似,會進行超時阻塞(預設20s),也可以通過Broker傳送的NOTIFY_CONSUMER_IDS_CHANGED請求將其喚醒,之後會呼叫doRebalance方法
RebalanceImpl的doRebalance方法:
1 public void doRebalance(final boolean isOrder) { 2 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 3 if (subTable != null) { 4 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 5 final String topic = entry.getKey(); 6 try { 7 this.rebalanceByTopic(topic, isOrder); 8 } catch (Throwable e) { 9 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 10 log.warn("rebalanceByTopic Exception", e); 11 } 12 } 13 } 14 } 15 16 this.truncateMessageQueueNotMyTopic(); 17 }
這裡就會取得copySubscription方法中說過的訂閱Topic集合,這個集合會在②中的定時任務會通過NameServer來進行更新
通過rebalanceByTopic方法,處理訂閱的Topic:
1 private void rebalanceByTopic(final String topic, final boolean isOrder) { 2 switch (messageModel) { 3 case BROADCASTING: { 4 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 5 if (mqSet != null) { 6 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 7 if (changed) { 8 this.messageQueueChanged(topic, mqSet, mqSet); 9 log.info("messageQueueChanged {} {} {} {}", 10 consumerGroup, 11 topic, 12 mqSet, 13 mqSet); 14 } 15 } else { 16 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 17 } 18 break; 19 } 20 case CLUSTERING: { 21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try { 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }
這裡會根據廣播模式和叢集模式做不同的處理
廣播模式:
先根據Topic取得對應的所有訊息佇列的集合
然後先通過updateProcessQueueTableInRebalance方法處理:
1 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, 2 final boolean isOrder) { 3 boolean changed = false; 4 5 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); 6 while (it.hasNext()) { 7 Entry<MessageQueue, ProcessQueue> next = it.next(); 8 MessageQueue mq = next.getKey(); 9 ProcessQueue pq = next.getValue(); 10 11 if (mq.getTopic().equals(topic)) { 12 if (!mqSet.contains(mq)) { 13 pq.setDropped(true); 14 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 15 it.remove(); 16 changed = true; 17 log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); 18 } 19 } else if (pq.isPullExpired()) { 20 switch (this.consumeType()) { 21 case CONSUME_ACTIVELY: 22 break; 23 case CONSUME_PASSIVELY: 24 pq.setDropped(true); 25 if (this.removeUnnecessaryMessageQueue(mq, pq)) { 26 it.remove(); 27 changed = true; 28 log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", 29 consumerGroup, mq); 30 } 31 break; 32 default: 33 break; 34 } 35 } 36 } 37 } 38 39 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); 40 for (MessageQueue mq : mqSet) { 41 if (!this.processQueueTable.containsKey(mq)) { 42 if (isOrder && !this.lock(mq)) { 43 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 44 continue; 45 } 46 47 this.removeDirtyOffset(mq); 48 ProcessQueue pq = new ProcessQueue(); 49 long nextOffset = this.computePullFromWhere(mq); 50 if (nextOffset >= 0) { 51 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 52 if (pre != null) { 53 log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 54 } else { 55 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 56 PullRequest pullRequest = new PullRequest(); 57 pullRequest.setConsumerGroup(consumerGroup); 58 pullRequest.setNextOffset(nextOffset); 59 pullRequest.setMessageQueue(mq); 60 pullRequest.setProcessQueue(pq); 61 pullRequestList.add(pullRequest); 62 changed = true; 63 } 64 } else { 65 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 66 } 67 } 68 } 69 70 this.dispatchPullRequest(pullRequestList); 71 72 return changed; 73 }
若是訊息佇列發生了更新,這裡首先在while迴圈中會將處理佇列中的無用的記錄刪除
而在for迴圈中則是為了新增新的處理記錄,向processQueueTable添加了處理記錄,computePullFromWhere方法在PullConsumer中預設返回0,作為nextOffset,會將該nextOffset作為下次拉取訊息的位置儲存在ProcessQueue中,進而儲存在processQueueTable中,作為處理任務的記錄
之後的dispatchPullRequest方法是對於PushConsumer而言的,這裡沒有作用
回到rebalanceByTopic方法,若是發生了更新,會呼叫messageQueueChanged方法:
1 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { 2 MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener(); 3 if (messageQueueListener != null) { 4 try { 5 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); 6 } catch (Throwable e) { 7 log.error("messageQueueChanged exception", e); 8 } 9 } 10 }
這裡實際上就交給MessageQueueListener執行messageQueueChanged回撥方法
叢集模式:
首先還是根據Topic得到訊息佇列的集合
由於是集合模式,每個消費者會取得不同的訊息,所以這裡通過findConsumerIdList方法,得到消費者的ID列表
1 public List<String> findConsumerIdList(final String topic, final String group) { 2 String brokerAddr = this.findBrokerAddrByTopic(topic); 3 if (null == brokerAddr) { 4 this.updateTopicRouteInfoFromNameServer(topic); 5 brokerAddr = this.findBrokerAddrByTopic(topic); 6 } 7 8 if (null != brokerAddr) { 9 try { 10 return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); 11 } catch (Exception e) { 12 log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); 13 } 14 } 15 16 return null; 17 }
findBrokerAddrByTopic方法,會根據Topic選取所在叢集的一個Broker的地址(由②定時任務通過NameServer更新),若是master存在選擇master,否則隨機選擇一個slave
若是沒找到,則重新向NameServer請求更新,再找一次
當得到Broker的地址資訊後,通過getConsumerIdListByGroup方法,向Broker傳送請求:
1 public List<String> getConsumerIdListByGroup( 2 final String addr, 3 final String consumerGroup, 4 final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 5 MQBrokerException, InterruptedException { 6 GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); 7 requestHeader.setConsumerGroup(consumerGroup); 8 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader); 9 10 RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 11 request, timeoutMillis); 12 assert response != null; 13 switch (response.getCode()) { 14 case ResponseCode.SUCCESS: { 15 if (response.getBody() != null) { 16 GetConsumerListByGroupResponseBody body = 17 GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); 18 return body.getConsumerIdList(); 19 } 20 } 21 default: 22 break; 23 } 24 25 throw new MQBrokerException(response.getCode(), response.getRemark()); 26 }
這裡實際上就是向Broker傳送了一個GET_CONSUMER_LIST_BY_GROUP請求,進行同步傳送,再收到響應後,將響應中的資料,也就是消費者ID的封裝成的List返回
回到rebalanceByTopic方法,得到消費者的ID列表後
會根據分配策略進行分配,這裡預設使用的是AllocateMessageQueueAveragely
然後呼叫它的allocate方法,進行分配
1 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 2 List<String> cidAll) { 3 if (currentCID == null || currentCID.length() < 1) { 4 throw new IllegalArgumentException("currentCID is empty"); 5 } 6 if (mqAll == null || mqAll.isEmpty()) { 7 throw new IllegalArgumentException("mqAll is null or mqAll empty"); 8 } 9 if (cidAll == null || cidAll.isEmpty()) { 10 throw new IllegalArgumentException("cidAll is null or cidAll empty"); 11 } 12 13 List<MessageQueue> result = new ArrayList<MessageQueue>(); 14 if (!cidAll.contains(currentCID)) { 15 log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 16 consumerGroup, 17 currentCID, 18 cidAll); 19 return result; 20 } 21 22 int index = cidAll.indexOf(currentCID); 23 int mod = mqAll.size() % cidAll.size(); 24 int averageSize = 25 mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 26 + 1 : mqAll.size() / cidAll.size()); 27 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; 28 int range = Math.min(averageSize, mqAll.size() - startIndex); 29 for (int i = 0; i < range; i++) { 30 result.add(mqAll.get((startIndex + i) % mqAll.size())); 31 } 32 return result; 33 }
(關於這個ID在Producer的啟動中介紹過,是在MQClientManager的getAndCreateMQClientInstance方法中,對於客戶端來說是唯一的)
由於是叢集模式,那麼這裡的Consumer也理所應當作為其中一員,所以會檢查currentCID是否包含在集合中
接著會根據消費者的數量以及訊息的數量,進行訊息的分配,以此達到消費者端的負載均衡
這裡採用的是平均分配的方式,利用訊息的數量以及消費者的數量就,計算出當前消費者需要消費哪部分訊息
處理之外,RocketMQ中還提供其他幾種分配方式,根據需要,酌情使用
回到rebalanceByTopic方法中,在完成訊息的分配後
會呼叫updateProcessQueueTableInRebalance方法,完成對訊息佇列和處理佇列的更新,若是發生了更新,再通過messageQueueChanged方法,呼叫回撥介面的方法,完成對訊息佇列變化的通知
至此,PullConsumer的啟動