RocketMq消費者拉取訊息服務PullMessageService
阿新 • • 發佈:2020-08-13
RocketMq消費者拉取訊息服務PullMessageService
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //從請求佇列裡獲取一個請求 PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { //拉取資料 this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) { //根據組名獲取對應的消費者 一個mqClientInstance裡一個consumerGroup只有一個消費者對應 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; //實際是根據組名獲取對應的消費者來發起訊息拉取 impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
DefaultMQPushConsumerImpl#pullMessage