1. 程式人生 > >RocketMQ客戶端載入流程

RocketMQ客戶端載入流程

 這節介紹RocketMQ客戶端的啟動流程,即Consumer和Producer的啟動流程。

1. 客戶端demo

 首先先看下客戶端的demo

Producer:

public class SyncProducer {

    public static void main (String[] args) throws Exception {
        // 例項化訊息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer ("GroupTest");
        // 設定NameServer的地址
        producer.setNamesrvAddr ("localhost:9876");
        // 啟動Producer例項
        producer.start ();
        for (int i = 0; i < 100; i++) {
            // 建立訊息,並指定Topic,Tag和訊息體
            Message msg = new Message ("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 傳送訊息到一個Broker
            SendResult sendResult = producer.send (msg);
            // 通過sendResult返回訊息是否成功送達
            System.out.printf ("%s%n", sendResult);
        }
        // 如果不再發送訊息,關閉Producer例項。
        producer.shutdown ();
    }
}

Consumer:

public class Consumer {

    public static void main (String[] args) throws InterruptedException, MQClientException {

        // 例項化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

        // 設定NameServer的地址
        consumer.setNamesrvAddr ("localhost:9876");

        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
        consumer.subscribe ("TopicTest", "*");
        // 註冊回撥實現類來處理從broker拉取回來的訊息
        consumer.registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
                // 標記該訊息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者例項
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
}

Producer和Consumer的啟動類似,在初始化然後進行必要設定(主要是客戶端所屬的Group和NameServer地址)後,執行start方法啟動後臺監聽服務,事實上Producer和Consumer都是呼叫同一個類MQClientInstance的start方法,下圖為繼承關係:

DefaultMQproducer和DefaultMQPushConsumer都繼承自ClientConfig,顧名思義ClientConfig表示客戶端的配置,包括NameServer地址、客戶端地址、客戶端例項名等。由於Producer和Consumer都需要同Broker和NameServer互動,所以配置上有很多相同,這兩個將主要功能的實現都委託給了對應的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl內部呼叫了MQClientInstance來完成客戶端同遠端互動的主要功能,而Producer和Consumer則封裝自己相關的行為,MQClientInstance內部又委託忒了MQClientAPIImpl。

2. Producer的啟動

 DefaultMQProducer的啟動如下:

DefaultMQProducer將start委託給了DefaultMQProducerImpl來完成,主要過程為:

  • DefaultMQProducerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 呼叫MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端例項都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key為clientId(格式為ip@instName,instName為pid),value為MQClientInstance例項。當key不存在時則會初始化一個例項,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。
  • 呼叫MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client例項快取中,定時器定時上報,後面會說。
  • 呼叫MQClientInstance的start方法,啟動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態為RUNNING
  • 呼叫MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳

3. Consumer的啟動

 DefaultMQPushConsumer的啟動如下:

DefaultMQPushConsumer同樣將start委託給了DefaultMQPushConsumerImpl來完成,流程上也相似。但相比DefaultMQProducer多了很多其他元件來輔助消費過程,如rebalance、offset管理等,主要過程為:

  • DefaultMQPushConsumerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 同步設定RebalanceImpl的topic(Map</*topic*/String,/*sub expression*/String>)資訊
  • 同DefaultMQProducer一致,呼叫MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端例項都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關係,key為clientId(格式為ip@instName),value為MQClientInstance例項。當key不存在時則會初始化一個例項,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。這裡需要說明的是,RocketMQ中Consumer的消費模式分為CLUSTERING和BROADCASTING,即叢集消費和廣播消費。區別在於叢集消費時,一條訊息只會被一個例項消費,即各例項會平分所有的訊息;而廣播消費時所有例項都會收到同一條訊息。體現在clientId的是,叢集模式下instName為pid,而廣播模式instName為DEFAULT。
  • 設定RebalanceImpl屬性,包括所在Group、消費模式、訊息分配策略(平均分配q的策略)
  • 初始化PlullAPIWrapper,設定訊息過濾器鉤子列表
  • 初始化OffsetStore,設定offset的儲存模式,廣播模式使用本地儲存;叢集模式使用遠端儲存
  • 初始化ConsumeMessageService,根據監聽器型別設定訊息消費模式(順序消費/並行消費),pull模式需要自己指定offset,push不需要設定。
  • 啟動ConsumeMessageService
  • 同DefaultMQProducer一致,呼叫MQClientInstance的registerProducer方法,註冊當前客戶端自身。實現上是客戶端放入client例項快取中,定時器定時上報,後面會說。
  • 呼叫MQClientInstance的start方法,啟動客戶端的後臺任務,該方法是重點,後面會介紹。
  • 標記客戶端當前狀態為RUNNING
  • 判斷監聽資訊是否發生改變,從namesrv更新topic的路由資訊
  • 呼叫MQClientInstance的checkClientInBroker方法,確認該例項已經在broker註冊成功,否則拋異常
  • 呼叫MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳
  • 呼叫MQClientInstance的rebalanceImmediately方法,觸發一次rebalance

 DefaultMQPushConsumer為推模式,RocketMQ還提供了拉模式來消費訊息,實現類為DefaultMQPullConsumer,啟動過程類似,推模式是用拉模式來實現的,重點實現都在MQClientInstace中。

4. MQClientInstance

 MQClientInstance為一個門戶類,組合了各功能,如下,包括Rebalance、消費資料統計、生產訊息、消費訊息等,這些都有對應的實現。

 上面說過,Producer和Consumer在啟動的時候,都會在內部先初始化一個MQClientInstance物件,然後呼叫其start方法啟動對應的後臺程式,如下:

MQClientInstance的start方法除了呼叫自身進行準備工作外,也呼叫了其他元件的start方法開始它們的準備工作,主要流程為:

  1. 先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
  • 若沒有指定nameserver地址,則呼叫MQClientAPIImpl同步獲取一次(通過設定的Http endpoint同步)
  • 呼叫MQClientAPIImpl的start方法,主要是初始化Netty客戶端,啟動netty client初始化任務,連線的建立發生在第一次請求時
  • 開啟MQClientInstance的定時任務,包括:
    1. 如果沒有指定nameserver地址,每兩分鐘從配置的endpoint處同步nameserver地址
    • 定時從namesrv同步topic路由資訊
    • 定時清除下線的broker資訊;傳送心跳
    • 定時持久化消費者消費的offset資訊
    • 每1分鐘調整執行緒池的大小
  • 呼叫PullMessageService的start方法,啟動拉取訊息執行緒
  • 呼叫RebalanceService的start方法,啟動rebalance執行緒
  • 呼叫內部Producer(CLIENT_INNER_PRODUCER)的start方法
  • 標記客戶端當前狀態為RUNNING

下面詳細介紹下各個過程。

4.2. MQClientAPIImpl.fetchNameServerAddr

 該方法用於更新NameServer地址,該方法會從http://xxx:port/rocketmq/yyy,預設8080埠(如果xxx中沒有:,即不帶埠時)中獲取NameServer地址(xxx為域名,由系統配置項rocketmq.namesrv.domain控制,預設為jmenv.tbsite.net;yyy為訪問路徑,由系統配置項rocketmq.namesrv.domain.subgroup控制,預設為nsaddr)。該地址要求返回結果為一個ip列表,以;隔開,如果獲取回來的地址跟現有的地址不一致則會更新快取的NameServer地址列表。解析出來的地址列表用於根據NettyRemotingClient內部持有的變數:

private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
4.3. MQClientAPIImpl.start

 該方法在內部呼叫了NettyRemotingClient的start方法,用於初始化Netty客戶端。NettyRemotingClient是基於Netty實現的tcp協議客戶端,主要流程為:

  • 初始化客戶端bootstrap連線池
  • 設定處理鏈:編碼、解碼、空閒處理、連線管理(服務端)、請求分發
  • 每3秒清除超時的請求(netty主執行緒不處理邏輯)
  • 啟動客戶端的事件處理器,處理IDLE、CLOSE、CONNECT、EXCEPTION事件

關於NettyRemotingClient後面會專門進行講解,這裡只介紹在客戶端啟動時其做了哪些動作。

4.4.2. MQClientInstance.updateTopicRouteInfoFromNameServer

 該方法用於根據客戶端例項關注的所有topic的路由資訊,包括客戶端監聽的topic以及producer生產的topic。首先會遍歷從MQClientInstance內部的consumerTable和consumerTable的客戶端例項,拿到所有的topic資訊,然後挨個更新topic的路由。

 同步topic路由時,會通過NettyRemotingClient選擇一個NameServer獲取topic路由資訊,然後判斷topic資訊是否發生了更改,主要比較topic所對應的Queue和Broker是否發生了更改。若路由資訊發生了更改則會同步topic所在的broker地址列表,即內部的brokerAddrTable屬性;接著同步produer關注的topic路由資訊,即producerTable屬性;接著同步consumer訂閱的topic路由資訊,即consumerTable屬性;最後更新本地topic資訊,即topicRouteTable屬性。

4.4.3. MQClientInstance.sendHeartbeatToAllBrokerWithLock

 該方法會遍歷MQClient所持有的各個producer和consumer,將客戶端資訊構造為HeartbeatData物件,然後呼叫MQClientAPIImpl的sendHearbeat方法,向所有的broker上報心跳資料。心跳內容包括:

  • Consumer:所有Consumer的Group、消費型別、訊息模式、消費起始offset、訂閱訊息的篩選型別等
  • Producer:所有Producer的group
4.4.4. MQClientInstance.persistAllConsumerOffset

 該方法會遍歷consumerTable裡的所有MQConsumer物件,獲取每個佇列處理的MessageQueue,然後呼叫OffsetStore持久化所有的MessageQueue。OffsetStore後面會專門進行講解。

4.4.5. MQClientInstance.adjustThreadPool

 該方法主要是動態調整DefaultMQPushConsumerImpl(推模式)客戶端消費執行緒池的大小。前面說過推模式是通過包裝拉模式來實現的,內部都依賴PullAPIWrapper。實現上推模式多了一個ConsumeMessageService定時使用拉模式消費訊息,該實現需要一個執行緒池,adjustThreadPool就是動態調整該執行緒池的大小。關於客戶端消費訊息的過程,後面也會專門進行講解。

4.5. PullMessageService.start

 PullMessageService用於封裝拉模式以實現推模式。它會迴圈從內部的LinkedBlockingQueue<PullRequest>中拿出PullRequest物件(消費q訊息封裝的物件),選取一個可用的客戶端例項DefaultMQPushConsumerImpl,呼叫其pullMessage方法.該方法會判斷消費進度,決定是立即消費還是延遲消費,如果是延遲消費則再放回LinkedBlockingQueue中等待消費;如果是直接消費,則呼叫PullMessageService(拉模式)的executePullRequestImmediately消費訊息.

 PullMessageService的基礎關係如下:

PullMessageService.start內部主要是啟動執行緒,該執行緒會迴圈執行執行任務,具體實現會在後續介紹訊息消費的時候提及。

4.6. RebalanceService.start

 該方法用於啟動rebalance任務。RebalanceService同PullMessageService相同,都繼承自ServiceThread類,,並實現了run方法。RebalanceService在run方法中等待一定時間(預設20S,可以通過rocketmq.client.rebalance.waitInterval配置具體時間)後會呼叫MQClientInstance.doRebalance執行具體的動作。具體實現會在後續介紹rebalance實現的時候提及。

4.7. DefaultMQPushConsumerImpl.start

 在上面2.時有提及該流程,這裡的DefaultMQPushConsumerImpl物件是Group為CLIENT_INNER_PRODUCER的內部物件。

 客戶端的啟動過程就如上面介紹,下面附上該部分當時原始碼閱讀過程做的筆記簡圖,該圖描述了客戶端啟動過程的大致過程:

更多原創內容請搜尋微信公眾號:啊駝(doubaotaizi)