1. 程式人生 > >RocketMQ 消費端

RocketMQ 消費端

原文地址:http://adamswanglin.com/rocketmq/rocketmq-consumer/

關於rocketmq-client包

RocketMQ將producer,consumer和admin相關程式碼都放到rocketmq-client jar包裡;RocketMQ的採用客戶端拉的方式消費訊息(PUSH也是通過客戶端拉來實現的),拉取的時候要考慮負載均衡(rebalance),考慮訊息至少消費一次(offset管理);等等這些導致了consumer的複雜度是client裡面最高的。

Consumer三個重要部分

負載均衡

負載均衡狀態

負載均衡

假設上面是執行一段時間的狀態,C_ORD消費組有兩個節點node1和node2,訂閱了topic:TP_PAY,Broker為TP_PAY建立了四個Queue,node1消費Queue1和Queue2,node2消費Queue3和Queue4。

Rebalance過程

客戶端每20s會啟動rebalance,節點rebalance過程是:

負載均衡2

需要說明的是以上過程,各個節點都是分別進行的。

節點變動

實際中節點會上下線,節點數量會發生變動,Topic配置的queue數量也可能變動(Broker配置變更或者上下線)。這時候Rebalance過程中各個消費節點消費的queue就會發生變動。

負載均衡3

考慮上面平衡狀態下,C_ORD消費組中新增了節點node3:

node3啟動的時候會Rebalance,發現自己應該訂閱Queue4;因為node3從沒有訂閱到訂閱了Queue4,訂閱內容有變,所以會node3立即傳送心跳給Broker並且其中subVersion是當前時間戳;Broker發現心跳中的時間戳有更新會立即傳送NOTIFY_CONSUMER_IDS_CHANGED的指令給node1和node2;node1和node2收到指令會立即Rebalance。只要同一個消費組的訂閱資訊一致,分配演算法一致,最終queue會被同一消費組的節點平均分配。最終變化如下:

負載均衡4

負載均衡演算法有:

AllocateMessageQueueAveragely

AllocateMessageQueueAveragelyByCircle

AllocateMessageQueueConsistentHash

AllocateMessageQueueByMachineRoom

AllocateMessageQueueByConfig

預設是AllocateMessageQueueAveragely;例如5個queue分給3個節點,5/3=1,則平均消費1個;5%3=2,頭兩個節點再額外加一個;最後的結果是{1,2} {3,4} {5}。

訊息拉取

PULL OR PUSH

consumer分pull consumer

push consumer

  • pull:應用自己拉取,訊息延時較大。
  • push:傳送非同步拉取訊息的請求給Broker;如果當時有未消費的訊息Broker立即返回未消費訊息,如果沒有未消費的訊息,Broker在有新訊息的時候返回新訊息。push型別的consumer實現中的非同步拉取訊息的請求實際也是客戶端pull訊息;使用這種實現方式減輕了Broker的負擔(Broker通過請求就知道新訊息發給誰),但也增加了客戶端的複雜度(負載均衡在客戶端實現)。

我們使用的都是pushConsumer,一下只考慮pushConsumer的啟動。

BROADCASTING OR CLUSTERING

consumer的消費模型分為BROADCASTING(廣播模式)和CLUSTERING(叢集模式)。

  • 廣播模式:對於某一條訊息,同一消費組裡每個節點都會收到。

  • 叢集模式:對於某一條訊息,同一消費組裡只會有一個節點收到。

廣播模式和叢集模式只針對單個消費組;不同消費組之間的訊息訂閱是互不影響的。

Offset管理

為了確定訊息是否到達,現有訊息佇列實現裡都有ack機制。

例如在ActiveMQ裡,一條訊息從producer端發出之後,一旦被consumer消費,consumer會返回ACK,broker端會刪除這條已消費的訊息。這樣每一條訊息消費都要傳送一個ACK訊息,Broker端也要根據ACK做相應操作。

RocketMQ用Offset機制來實現ACK,它類似一種批量的ACK:

  • 在Broker端,訊息的Offset是遞增的;

  • Client端拉取的時候也是按順序拉取的,比如第一次拉取offset 0開始的訊息,拉取了10條,第二次就從上次最後一個節點offset+1的位置拉取;

  • Client消費一批訊息後將消費完成的Offset傳送給Broker。

RocketMQ這樣做之後提升了效率:Offset更新頻率相比單條更新小,Broker端只用儲存某個消費組對某個Queue的消費進度而不用在每個訊息上存某個消費組是否消費了該訊息。

但同時也帶來了一個問題:更大機率的重複消費。消費組消費了offset =2到offset=10的訊息,但是offset=1的訊息消費的比較慢;如果更新offset=10可能會導致offset=1的訊息未成功消費Broker卻認為成功,所以RocketMQ的做法是消費端更新的Offset都是未消費訊息的最小offset;如果這時候消費端down機,別的消費組消費的時候會從offset=1的訊息開始拉取消費,這樣offset=2和offset=10的訊息就會重複消費。所以RocketMQ不保證訊息不重複,當然這只是造成訊息重複消費的一個原因。

Consumer啟動

幾個重要的類關係

類關係

PullMessageService

啟動拉取訊息執行緒類

RebalanceService

啟動定時任務Rebalance類

MQClientInstance

一般一個應用一個

DefaultMQPushConsumerImpl

一個consumerGroup一個,和MQClientInstance是多對一關係

RebalanceImpl

和pull/push模式有關,所以具體的rebalance實現掛在DefaultMQPushConsumerImpl下面。

啟動詳細流程

啟動DefaultMQPushConsumerImpl

啟動流程

獲取/建立MQClientInstance

MQClientInstance和clientId一一對應,一般一個應用只會有MQClientInstance,規則是{ip}:{pid}。

建立MQClientAPI

封裝了遠端呼叫Broker和NameServer的API。包括推送訊息,拉取訊息,在Broker上建立消費組等。

建立PullMessageService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class PullMessageService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
    }
}

其中ServiceThread是對執行緒的封裝,RocketMQ中很多XXService類都繼承自ServiceThread。

ServiceTask實現程式開啟,停止,等待特定時長執行,中途任意時間喚醒等。

擷取部分程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public abstract class ServiceThread implements Runnable {
	protected final Thread thread;
  	//和juc CountDownLatch 區別 增加了 reset方法
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);

    public void start() {
        this.thread.start();
    }
    //執行緒等待程式碼
  	protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
  //執行緒中途喚醒程式碼
  	public void wakeup() {
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }
}

能看到PullMessageService的作用是啟動一個執行緒,不停的從queue里拉取請求並執行pullMessage方法。

建立RebalanceService
1
2
3
4
5
6
7
8
9
public class RebalanceService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

RebalanceService的作用就是啟動一個執行緒,定時呼叫doRebalance方法。

建立PullApiWrapper

拉取訊息請求和響應的wrapper;主要作用:請求的時候封裝RPC請求體,響應的時候二次過濾tag。

說二次過濾是因為Broker會過濾一次tag,但是為了效率broker過濾tag是按存放的tag的hashCode做比較的,不保證tag字串一致。

PullApiWrapper做二次過濾保證獲取的的tag和訂閱的tag字串匹配。

啟動ConsumeMessageConcurrentlyService

訊息消費處理類,這裡是併發處理訊息;對應的還有ConsumeMessageOrderlyService,順序處理訊息類。

除了處理下訊息外,還負責啟動執行緒定時清除消費開始15分鐘還未處理完成的訊息(傳送回Broker重試)。

啟動MQClientInstance

啟動netty客戶端;啟動拉取訊息服務;啟動rebalance服務;啟動定時任務:定時向Broker傳送心跳,定時拉取路由資訊,定時傳送offset到Broker,定時調整消費執行緒池大小。

registerConsumer

DefaultMQPushConsumerImpl是一個consumerGroup一個例項,MQClientInstance一個應用一個例項;兩者一對多,註冊指DefaultMQPushConsumerImpl放入MQClientInstance中的ConcurrentMap<String/* group */, MQConsumerInner>中。

updateTopicRouteInfoFromNameServer

立即更新一次訂閱的topic的路由資訊。

checkClientInBroker

隨機選擇一個Broker,傳送檢查客戶端配置配置的請求。

sendHeartbeatToAllBrokerWithLock

立即向所有相關Broker(訂閱的topic的路由到的Broker)的master節點發送心跳。

rebalanceImmediately

立即執行一次rebalance。

PushConsumerRebalance

Consumer拉取訊息

拉取訊息的流程示意

拉取

圖中小人代表有特定執行緒處理任務;黃色箭頭代表PullRequest的流向。

RebalanceService初始化PullRequest

client啟動

RebalanceService確定consumer拉取的queue。

  • 為需要拉取的queue生成一個ProcessQueue用來儲存正在/等待處理的資訊,放入processQueueTable中。
  • 為需要拉取的queue生成一個PullRequest,放入PullRequestQueue中;其中,拉取訊息的位置從nextOffset從Broker遠端拉取。
定時Rebalance
  • 如果發現有新訂閱的queue,也會為每個新增訂閱的queue生成一個PullRequest,放入PullRequestQueue中;其中,拉取訊息的位置從nextOffset從Broker遠端拉取。
  • 如果發現有queue已經不訂閱了,更新offset到Broker,將ProcessQueue設為dropped並從processQueueTable中移除。

PullMessageService取出PullRequest

PullMessageService啟動一個執行緒不停的從PullRequestQueue裡取出PullRequest。如果取出的PullRequest是已失效的(ProcessQueue是否dropped),丟棄;如果未失效,執行下面步驟。

PullMessageService傳送非同步請求

PullMessageService取出PullRequest後,根據其中的queue定位Broker,併發送非同步拉取請求。同時將PullRequest封裝在PullCallback裡,PullCallback封裝在ResponseFuture裡;並以自增的請求id為鍵,ResponseFuture為值放入ResponseTable中。

Broker傳送非同步響應

Broker收到請求,如果offset之後有新的訊息會立即傳送非同步響應;否則等待直到producer有新的訊息傳送後返回或者超時。

如果通訊異常或者Broker超時未返回響應,nettyClient會定時清理超時的請求,釋放PullRequest回到PullRequestQueue。

NettyClient處理響應

根據響應id從ResponseTable中取出ResponseFuture;從響應裡取出最新的offset和批量拉取到的訊息。

用最新的offset更新ResponseFuture裡的PullRequest並推送給PullRequestQueue裡以進行下一次拉取。

批量拉取到的訊息分批推給consumeExecutor執行緒處理。

拉取訊息的詳細流程

消費流程

幾點說明

  • 拉取到的訊息分批用consumerExecutor執行緒池執行,如果執行緒池滿了5s後重試。

  • 每批訊息裡消費失敗的訊息會重發給broker的重試佇列,重發也失敗的訊息5s後用consumerExecutor重新消費。

  • 每批消費完成後更新offset到Broker。

  • 非同步收到訊息後,訊息會分queue放到ProcessQueue中,ProcessQueue裡的msgTreeMap:TreeMap<Long/offset/, MessageExt>存放訊息;收到的訊息處理成功或者處理失敗已發回重試會從treemap裡移除。幾點需要注意:

    1.每次更新offset到Broker都是從treemap裡取第一條(最小offset),某條訊息消費超時會導致Broker的offset無法更新;當某條消費超過15分鐘還未消費完成,會發回Broker嘗試重試。

    2.treemap裡堆積的訊息超過1000(可配)條或者最小最大offset相差超過2000(可配)會觸發控流,延遲拉取訊息。

  • 關於訊息重複:

    RocketMQ不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重。

  • 關於訊息消費順序:

    RocketMQ有嚴格順序消費的實現。但是有序消費會影響訊息並行處理效率,消費端吞吐量下降;而且單條訊息阻塞會阻塞這個消費端;所以我們並沒有使用訊息順序消費。

Consumer關閉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//DefaultMQPushConsumerImpl - shutdown
public synchronized void shutdown() {
  switch (this.serviceState) {
    case CREATE_JUST:
      break;
    case RUNNING:
      //關閉訊息消費執行緒池
      this.consumeMessageService.shutdown();
      //消費進度同步到Broker
      this.persistConsumerOffset();
//在Broker裡取消註冊
      this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
      this.mQClientFactory.shutdown();
      log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
      //關閉ProcessQueue
      this.rebalanceImpl.destroy();
      this.serviceState = ServiceState.SHUTDOWN_ALREADY;
      break;
    case SHUTDOWN_ALREADY:
      break;
    default:
      break;
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
synchronized (this) {
  switch (this.serviceState) {
    case CREATE_JUST:
      break;
    case RUNNING:
      //關閉生產者
      this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
      this.serviceState = ServiceState.SHUTDOWN_ALREADY;
      //關閉拉取訊息執行緒
      this.pullMessageService.shutdown(true);
      //關閉定時任務--heartbeat updateRouteInfo persistAllConsumerOffset等
      this.scheduledExecutorService.shutdown();
      //關閉netty服務
      this.mQClientAPIImpl.shutdown();
      //關閉rebalance執行緒
      this.rebalanceService.shutdown();

      if (this.datagramSocket != null) {
        this.datagramSocket.close();
        this.datagramSocket = null;
      }
      MQClientManager.getInstance().removeClientFactory(this.clientId);
      log.info("the client factory [{}] shutdown OK", this.clientId);
      break;
    case SHUTDOWN_ALREADY:
      break;
    default:
      break;
  }
}


原文地址:http://adamswanglin.com/rocketmq/rocketmq-consumer/

關於rocketmq-client包

RocketMQ將producer,consumer和admin相關程式碼都放到rocketmq-client jar包裡;RocketMQ的採用客戶端拉的方式消費訊息(PUSH也是通過客戶端拉來實現的),拉取的時候要考慮負載均衡(rebalance),考慮訊息至少消費一次(offset管理);等等這些導致了consumer的複雜度是client裡面最高的。

Consumer三個重要部分

相關推薦

RocketMQ 消費

原文地址:http://adamswanglin.com/rocketmq/rocketmq-consumer/關於rocketmq-client包RocketMQ將producer,consumer和admin相關程式碼都放到rocketmq-client jar包裡;Ro

RocketMQ原理解析-consumer 2.消費負載均衡

消費端負載均衡 消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載 消費端遍歷自己的所有topic,依次調rebalanceByTopic  根據topic獲取此topic下的所有queue  選擇一臺broker獲取基

dubbo學習總結三 消費

註意 服務端 註意點 發送 blog dubbo tro http ref 消費端跟服務端類似 註意點是dubbo:reference 和服務端的dubbo:service做區分 消費端主要是處理發送過來的請求 dubbo學習總結三 消費端

spring cloud eureka 消費

eurekaEurekaClient @Qualifier("eurekaClient") @Autowired private EurekaClient eurekaClient; public String dataServiceUrlByEurekaClient() { InstanceInf

服務消費泛化調用與異步調用

null 當前 ndt obj 技術分享 tap 參數設置 簡單的 pub 本文借用dubbo.learn的Dubbo API方式來解釋原理。 服務消費端泛化調用 前面我們講解到,基於Spring和基於Dubbo API方式搭建簡單的分布式系統時,服務消費端引入了一個SDK

spring cloud多個消費重複定義feign client,多模組掃描

問題連線:點選開啟連結  嘗試將FeignClient單獨建立了一個模組G,將對各個模組的FeignClient呼叫介面集中在模組G中管理,A,B,C,D,E,F模組互調時,只需要在pom中引入G模組即可。但一直失敗,對於該問題網上大都是 加@ComponentScan(basePack

dubbo原始碼分析-消費啟動初始化過程-筆記

消費端的程式碼解析是從下面這段程式碼開始的 <dubbo:reference id="xxxService" interface="xxx.xxx.Service"/> ReferenceBean(afterPropertiesSet) ->getObject() ->ge

SpringCloud-----Rest服務提供【安全訪問】以及【消費訪問】

1、服務提供端如果沒有安全訪問機制,會出現什麼問題? 把這些介面放在Internet伺服器上,無異於裸奔,所有資訊都容易被洩露; 任何使用者只要得到介面,那我們的程式將毫無祕密可言。 2、Spring-boot-security提供安全訪問機制 服務提供端匯入依賴包: pom

dubbo多網絡卡時,服務提供者的錯誤IP註冊到註冊中心導致消費連線不上

使用了虛擬機器之後,啟動了dubbo服務提供者應用,又連了正式環境的註冊中心; 一旦dubbo獲取的ip錯誤後, 這種情況即使提供者服務停掉,目前dubbo沒有能力清除這類錯誤的提供者; (需要修改原始碼測試,需要客戶端重新更細包,因為清除動作client端,我司就是這麼幹的)  規

Linux下RocketMQ服務和客服安裝

Linux下RocketMQ服務端和客服端安裝 安裝條件 jdk 1.8+ Maven 3.0.5 .64位Linux系統 下載和安裝   使用xshell將下載的安裝包上傳到linux系統      

springcloud學習筆記三:利用Feign編寫客戶消費

Feign 在上一篇筆記中,我們在消費者端使用restemplate的方式遠端消費服務,這樣寫雖然可以,但是卻與我們其它的程式碼有點格格不入,那麼通過feign,一個http客戶端庫,可以做到使用HTTP請求遠端服務時能與呼叫本地方法一樣的編碼體驗,與我們的客戶端程式碼看起來融為一體。

SpringCloud學習筆記一,(服務, 消費), 硬編碼級別

1. 建立maven父工程microcloud, 管理依賴,     建立microcloud-api模組, 存放entity類和sql,      建立microcloud-provider-dept-8001模組,服務端,      建立microcloud-

SpringCloud-----搭建註冊中心+服務+消費

1、服務註冊中心 pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schema

rocketmq消費模式機制

1. RocketMQ 支援兩種訊息模式: Clustering 和Broadcasting 從程式碼可以看出: 2. CLUSTERING 同組裡的每個Consumer 只消費所訂閱訊息的一部分內容。 3. BROADCASTING 同組裡的每個Consum

Dubbo消費呼叫服務過程分析

呼叫鏈的整體流程圖 下面藍色部分是消費端的呼叫過程,大致過程分為Proxy–>Filter–>Invoker–>Directory–>LoadBalance–>Filter

Dubbo剖析-服務消費泛化呼叫

一、前言 前面我們講解基於Spring和基於dubbo api方式搭建一個簡單的分散式系統時候服務消費端是引入了一個sdk的,這個SDK是個二方包,裡面存放了服務提供端提供的所有介面類以及介面使用的入參和出參的pojo類,服務消費端則使用JDK代理對介面進行代理。 泛化介面呼叫方式主要用於服務

Dubbo剖析-服務消費非同步呼叫

一、前言 前面我們講解的無論是正常呼叫還是泛化呼叫也好,都是進行同步呼叫的,也就是服務消費方發起一個遠端呼叫後,呼叫執行緒要被阻塞掛起,直到服務提供方返回。本節來講解下非同步呼叫,非同步呼叫是指服務消費方發起一個遠端呼叫後,不等服務提供方返回結果,呼叫方法就返回了,也就是當前執行緒不會被阻塞,這就允許呼叫方

Spring配置中心(Spring Cloud Config)在配置了context-path後消費無法通過eureka獲取配置的解決

最近研究微服務,使用Eureka搭建了註冊中心,並且使用SpringCloudConfig做配置中心 當配置中心註冊到註冊中心以後,消費端在bootstrap.yml通過以下配置即可獲取到配置資料 server: port: 8888 servlet: context-pat

RocketMQ消費批拉超過32不生效

說明 由於一些原因,我需要RocketMQ消費的時候,一批拉400條,一批處理400條。設定如下: 為了簡單驗證是否正確,消費如下: 直接通過列印msgs.size()觀察情況即可。 現象 實驗的topic裡面的訊息數量很多很多,但是啟動消費端,消費端的日

IDEA專案搭建十三——服務消費與生產通訊實現

一、簡介 之前已經完成了EurekaClient的服務生產者和Feign的服務消費者模組的搭建,現在實現統一的通訊約定 (1) 統一Request結構 (2) 統一Response結構 (3) 統一Error通知 二、程式碼 1、建立統一請求物件ServiceRequest<>實際引數就是這個泛型