分散式訊息佇列RocketMQ原始碼分析之3 -- Consumer負載均衡機制 -- Rebalance
同Kafka一樣,RocketMQ也需要探討一個問題:如何把一個topic的多個queue分攤給不同的consumer,也就是負載均衡問題。
有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。
或掃描如下二維碼:
在討論這個問題之前,我們先看一下Client的整體架構。
Producer與Consumer類體系
從下圖可以看出以下幾點:
(1)Producer與Consumer的共同邏輯,封裝在MQClientInstance,MQClientAPIImpl, MQAdminImpl這3個藍色的類裡面。所謂共同的邏輯,比如定期更新NameServer地址列表,定期更新TopicRoute,傳送網路請求等。
(2)Consumer有2種,Pull和Push。下面會詳細講述這2者的區別。
Consumer Group的Clustering模式與Pub/Sub模式
預設的,RocketMQ和Kafka採用的是同樣的策略:同1個Consumer Group的多個Consumer,是分攤,也就是負載均衡;多個Consumer Group之間,是Pub/Sub模式。
但RocketMQ對此還做了擴充套件,允許同1個Consumer Group內部,也可以是廣播模式。具體到程式碼裡面,就是:
*/
public enum MessageModel {
/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING"); //預設取值
...
}
預設的,Consumer的MessageModel就是CLUSTERING模式,也就是同1個Consumer Group內部,多個Consumer分攤同1個topic的多個queue,也就是負載均衡。
如果你把MessageModel改成BROADCAST模式,那同1個Consumer Group內部也變成了廣播,此時ConsumerGroup其實就沒有區分的意義了。此時,不管是1個Consumer Group,還是多個Consumer Group,對同1個topic的訊息,都變成了廣播。
Pull Consumer 與 Push Consumer
Push的負載均衡
下面我們先看一下pull和push的最基本用法:
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //指定Consumer Group
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); //獲取一個topic的所有MessageQueue
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: " + mq + "%n");
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); //遍歷所有queue,挨個呼叫pull
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); //指定Consumer Group
consumer.subscribe("Jodie_topic_1023", "*"); //指定要消費的topic
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() { //該topic的任何一個queue有新訊息,該回調回被呼叫
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
從上面的程式碼我們可以看出,pull和push用法上的基本差別就是:pull是客戶端主動去拉取訊息,push是註冊了一個回撥,當有新訊息,該回調被呼叫。
但這還不是2者的最大區別,最大區別是:在pull裡面,所有MessageQueue是向我們暴露的,我們需要自己去手動遍歷所有的queue;而在push裡面,我們只指定了訂閱的topic,而MessageQueue是向我們隱藏的,在其內部做了“負載均衡”。
而上面的pull的程式碼,我們手動遍歷了所有的queue,沒有負載均衡!!!
那對於Pull模式,如何做負載均衡呢??
Pull的負載均衡
在MQPullConsumer這個類裡面,有一個MessageQueueListener,它的目的就是當queue發生變化的時候,通知Consumer。也正是這個藉口,幫助我們在Pull模式裡面,實現負載均衡。
注意,這個介面在MQPushConsumer裡面是沒有的,那裡面有的是上面程式碼裡的MessageListener。
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
public interface MessageQueueListener {
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
}
有了這個Listener,我們就可以動態的知道當前的Consumer分攤到了幾個MessageQueue。然後對這些MessageQueue,我們可以開個執行緒池來消費。
MQPullConsumerScheduleService
幸運的是,RocketMQ給我們提供了一個工具類,MQPullConsumerScheduleService,幫助我們在pull模式下,實現負載均衡。
類似Push模式,在這個程式碼裡面,我們也只指定了topic,而不像上面簡陋的pull版本,要自己遍歷所有的messageQueue。其內部幫我們做了負載均衡。
其使用程式碼如下:
public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
context.setPullNextDelayTimeMillis(100);
} catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}