RocketMQ專題2:三種常用生產消費方式(順序、廣播、定時)以及順序消費源碼探究
順序、廣播、定時任務
前插
? 在進行常用的三種消息類型例子展示的時候,我們先來說一說RocketMQ的幾個重要概念:
- PullConsumer與PushConsumer:主要區別在於Pull與Push的區別。對於PullConsumer,消費者會主動從broker中拉取消息進行消費。而對於PushConsumer,會封裝包含消息獲取、消息處理以及其他相關操作的接口給程序調用
- Tag: Tag可以看做是一個子主題(sub-topic),可以進一步細化主題下的相關子業務。提高程序的靈活性和可擴展性
- Broker:RocketMQ的核心組件之一。用來從生產者處接收消息,存儲消息以及將消息推送給消費者。同時RocketMQ的broker也用來存儲消息相關的數據,比如消費者組、消費處理的偏移量、主題以及消息隊列等
- Name Server: 可以看做是一個信息路由器。生產者和消費者從NameServer中查找對應的主題以及相應的broker
實例
? 這裏我們不玩虛的,直接將三個類型的生產者,消費者代碼實例給出(在官網給出的例子上做了些許改動和註釋說明):
生產者代碼
/** * 多種類型組合消息測試 * @author ziyuqi * */ public class MultiTypeProducer { public static void main(String[] args) throws Exception { // 順序消息生產者 FIFO OrderedProducer orderedProducer = new OrderedProducer(); orderedProducer.produce(); // 廣播消息生產者 /*BroadcastProducer broadcastProducer = new BroadcastProducer(); broadcastProducer.produce();*/ // 定時任務消息生產者 /*ScheduledProducer scheduledProducer = new ScheduledProducer(); scheduledProducer.produce();*/ } } /** * 按順序發送消息的生產者 * @author ziyuqi * */ class OrderedProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupD"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; for (int i=0; i<50; i++) { Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * 所謂的順序,只能保證同一MessageQueue放入的消息滿足FIFO。該方法返回應該將消息放入那個MessageQueue,最後一個參數為send傳入的最後一個參數 * 如果需要全局保持FIFO,則所有消息應該依次放入同一隊列中去mqs隊列中的同一下標 */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 消息被分開放入多個隊列,每個隊列中的消息保證按順序被消費FIFO /*int index = (Integer) arg % mqs.size(); System.out.println("QueueSize:" + mqs.size()); return mqs.get(index);*/ // 消息全部放入同一隊列,全局保持順序性 return mqs.get(0); } }, i); System.out.println(sendResult); } producer.shutdown(); } } /** * 廣播生產者 * @author ziyuqi * */ class BroadcastProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); // 也必須設置nameServer producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } } /** * 定時消息發送者 * @author ziyuqi * */ class ScheduledProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("scheduledTopic", ("Message:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 設置投遞的延遲時間 message.setDelayTimeLevel(3); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } }
消費者代碼
public class MultiTypeConsumer { public static void main(String[] args) throws Exception { // 按順序消費者 OrderedConsumer orderedConsumer = new OrderedConsumer(); orderedConsumer.consume(); // 廣播消費者 /*BroadcastConsumer broadcastConsumer = new BroadcastConsumer(); broadcastConsumer.consume();*/ // 定時任務消費者 /*ScheduledConsumer scheduledConsumer = new ScheduledConsumer(); scheduledConsumer.consume();*/ } } /** * 按順序的消費者 * @author ziyuqi * */ class OrderedConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupD"); /* * 設置從哪裏開始消費 : * 當設置為: ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("localhost:9876"); // 設置定於的主題和tag(必須顯示指定tag) consumer.subscribe("OrderedTopic", "tagA || tagB || tagC || tagD || tagE"); consumer.setMessageListener(new MessageListenerOrderly() { AtomicLong num = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { /** * 設置是否自動提交: 默認自動提交,提交之後消息就不能夠被再次消費。 * 非自動提交時,消息可能會被重復消費 */ context.setAutoCommit(false); this.num.incrementAndGet(); try { for (MessageExt msg : msgs) { System.out.println("Received:num=" + this.num.get() +", queueId=" + msg.getQueueId() + ", Keys=" + msg.getKeys() + ", value=" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } /*try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }*/ if (this.num.get() % 3 == 0) { // return ConsumeOrderlyStatus.ROLLBACK; } else if (this.num.get() % 4 == 0) { return ConsumeOrderlyStatus.COMMIT; } else if (this.num.get() % 5 == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // 非主動提交的時候,SUCCESS不會導致隊列消息提交,消息未提交就可以被循環消費 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } } class BroadcastConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 即使是廣播形式下,nameServer還是要設置 consumer.setNamesrvAddr("localhost:9876"); // 設置消費的消息類型為廣播類消息 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("BroadcastTopic", "tagA || tagB || tagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println("Received:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } /** * 定時任務消費者 * @author ziyuqi * */ class ScheduledConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("scheduledTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println("Received:[" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET) + "]" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + " ms later!"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
源碼與實例分析
? 結合我上面的測試代碼,以及我在測試中主要針對順序消費的疑惑和源碼調試。我這裏簡單分析下順序消費者的相關執行過程,大致的執行步驟如下:
消費者啟動
? 我們知道每次consumer創建之後,都會調用consumer.start()
方法來啟動消費者。跟進代碼嵌套,不難發現最終會進入DefaultMQPushConsumerImpl
的start
方法中,該方法的主要代碼如下:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
// 消費者啟動狀態滿足Create_just
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 配置檢查
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 主要方法在這,啟動MQ客戶端工廠,進行消息拉取
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
?
MQClient啟動
? 上一段源碼我們發現最終調用了mQClientFactory.start();
.我們繼續跟進該方法,發現實際調用的是MQClientInstance.start()
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service 關鍵點在這調用了pullMessageService的start方法
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
消息拉取
? 根據上一段代碼的註釋,我們進入到核心的消息推送代碼PullMessageService
的start
方法(實際上PullMessage繼承自Thread類,調用的是run方法):
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest); // 重點轉移到該方法具體推送實現
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest); // 調用默認的拉消息消費者實現
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
? 我們繼續跟進DefaultMQPushConsumerImpl
的pullMessage
方法:
public void pullMessage(final PullRequest pullRequest) {
// ... 省略
final long beginTimestamp = System.currentTimeMillis();
// 該回調函數實際是對消息消費的具體處理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 向線程池丟入消費請求任務
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
// ... 省略
try {
this.pullAPIWrapper.pullKernelImpl( // 定義消息拉取核心實現的相關參數:包括拉取方式、回調函數等,最終會通過Netty遠程請求消息然後請求成功後調用回調方法
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
? 以上代碼註釋有三個重點的地方,具體的處理流程大致是這樣。首先this.pullAPIWrapper.pullKernelImpl
這個方法定義了具體的消息拉取策略,內部實現其實會根據消息類型取拉取消息。對於默認的集群消息模式,實際會調用Netty進行消息拉取,拉取結束後會調用註釋中的回調函數進行處理。最終實際會進入DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
,而實際上對於順序消息消費會進入ConsumeMessageOrderlyService
的submitConsumeRequest
方法。該方法直接向消費線程池中放入一個消費請求任務。
消費請求任務
? 我們繼續跟進ConsumeRequest
消費請求任務的具體實現:
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 調用註冊的listener消費消息,並且得到返回結果
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 處理Listener的返回結果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
? 可以看出我們開始會調用我們實現的MessageListener對拉取到的消息進行消費,消費完成之後我們會拿到消費結果,並對消費結果進行處理。
消費結果處理(COMMIT ROLLBACK)
? 我們直接跟進消費結果處理代碼:
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) { // 自動提交的情況下
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) { // 非自動提交,需區別對待返回的處理結果
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
? 因為我們例子中寫的是非自動提交,我們就來看看非自動提交下ROLLBACK和COMMIT的具體實現(對應ProcessQueue
的相關方法):
public void rollback() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
/**
* 當消費到KEY2的時候,因為num=3所以進入rollback方法
* 此時:
* this.msgTreeMap包含所有未消費的消息 此時有 KEY3 --- KEY49
* this.consumingMsgOrderlyTreeMap 有所有按順序消費過的消息 KEY0 --- KEY2
* 不難看出一旦執行rollback,不僅僅是將當前消費的消息重新放入消息隊列供再次消費,前面已經處理的消息
* 將都會重新放入消息隊列供再次消費。也就能解釋前面所出現的為什麽自動提交設置為false之後,消息重復消費
*/
this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
}
}
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// 獲取已順序消費消息隊列中最後一個消息的偏移值
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
// 原隊列消息個數減去已順序消費但未提交的消息個數為剩下可繼續消費的消息個數
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
// 隊列消息總長度減去待提交的隊列消息總長度
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
// 將已消費未提交的隊列列表清空
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
? 至此,整個簡單的消費流程分析完成。
消費流程源碼分析總結
- Pull OR Push:即使是Push模式的Consumer,其最終實現還是是通過Pull的方式來進行的
- Netty:集群模式的遠程消息獲取是通過Netty來實現的
總結
? RocketMQ的常用三種消息生產消費模式到現在我們就基本分析完了。個人認為順序消息消費給需要順序執行的流程異步實現提供了強有力的支持。這一點特別適用於阿裏當前的相關領域。當然RocketMQ也不是盡善盡美的,我個人在測試的時候發現順序消息消費的性能不算特別高,當然具體什麽原因只有留到後續分析了。還有,因為這個項目開始是阿裏內部研發的,可能源碼註釋上相比於其他開源項目還是要少一些,也沒有那麽清楚。以至於consumer.setConsumeFromWhere
這個的不同設值的具體區別在哪我還沒有探究出來(想想Spring的事務隔離級別以及傳遞特性相關常量的註釋基本一看就懂了),限於篇幅還有我趕緊趕去上班,就不再繼續深究了(後面繼續)。
參考鏈接
http://rocketmq.apache.org/docs/
RocketMQ專題2:三種常用生產消費方式(順序、廣播、定時)以及順序消費源碼探究