1. 程式人生 > >Pulsar Consumer實現介紹

Pulsar Consumer實現介紹

Pulsar-Consumer

“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”

Pulsar是pub-sub模式的分散式訊息平臺,擁有靈活的訊息模型和直觀的客戶端API。

Pulsar由雅虎開發並開源的下一代訊息系統,目前是Apache軟體基金會的孵化器專案。

本片文章簡單介紹Pulsar的Consumer,包含以下內容:

  • Consumer的體系
  • 消費邏輯的實現

1. Consumer體系

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer通過訂閱關係繫結到Topic(和Producer類似,都是繫結到一個Topic上),並接收訊息。

Consumer支援:

  • 同步接收訊息:阻塞使用者執行緒等待訊息
  • 非同步接收訊息:非同步等待訊息(通過Future返回訊息)
  • 通過MessageListener返回訊息:接收訊息後回撥使用者的MessageListener

Consumer提供了三類獲取訊息的方式,其中非同步的方式包含通過Future非同步等待訊息和通過MessageListener被動接收訊息。MessageListener和另外兩種方式是互斥的,一旦Consumer註冊了MessageListener介面,則必須通過MessageListener處理訊息,主動觸發receive獲取訊息將丟擲異常。

Consumer的繼承關係:

  • Consumer:定義了消費者相關的介面
  • ConsumerBase:介面中基礎方法的實現,抽象類
  • ConsumerImpl:在ConsumerBase基礎上的Consumer具體實現
  • MultiTopicsConsumerImpl:組合多個ConsumerImpl完成對多Topic/Partition的消費

Consumer的設計和Producer是一致的,通過介面定義行為,基礎類實現基本能力,在通過組合的方式來實現消費多個Topic/Partition(Producer則是像多個Topic傳送訊息)。

1.1 消費進度提交

Consumer處理訊息後需要傳送acknowledgement到Broker,這樣Broker可以丟棄訊息(應該是移動消費offset的操作,類似RocketMQ,並不是真正的刪除訊息)。支援單挑訊息提交或者批量提交,批量提交則以最後一條訊息的offset為準。(只是記錄一個offset比較某個位置之前的訊息都已經被Consumer處理,所以批量提交其實只是把最大的offset提交)

1.2 訂閱模型

訂閱模型決定了訊息時如何被投遞給Consumer的。在Pulsar中,訂閱模型有: exclusive、shared、 failover。

Exclusive

只能有一個Consumer繫結到訂閱關係上,其他Consumer嘗試繫結到訂閱關係上時會報錯(Exclusive是預設的訂閱模型)。

Shared

在Shared模型中,多個Consumer可以繫結到一個訂閱關係上。訊息按照round-robin模式被投遞給各個Consumer。若某個Consumer宕機,被投遞給該Consumer的未被ACK(沒有acknowledgement)的訊息將被重新投遞給其他的Consumer進行消費。

Shared模式帶來的限制:

  1. 訊息時按照round-robin模式投遞給各個Consumer的,所以訊息順序無法得到保障
  2. 同樣因為round-robin模式,無法使用批量提交acknowledgement的功能(如上圖Consumer C-3如果提交了m4會導致m3被標記為已經消費,但實際Consumer C1可能還沒處理m3)

failover

在Failover模型中,多個Consumer可以繫結到一個訂閱關係上,但是隻有一個稱為Master Consumer的消費者能消費訊息。對多個Consumer按照name進行排序,第一個Consumer則為Master Consumer。

在Master Consumer失效(比如斷開連線)後,Master Consumer未提交的訊息和後續的訊息會提交給後續的Consumer。

2. 消費邏輯的實現

Consumer獲取訊息的核心API有以下兩個,分別實現同步獲取訊息和非同步獲取訊息:

/**
     * Receives a single message.
     * <p>
     * This calls blocks until a message is available.
     *
     * @return the received message
     * @throws PulsarClientException.AlreadyClosedException
     *             if the consumer was already closed
     * @throws PulsarClientException.InvalidConfigurationException
     *             if a message listener was defined in the configuration
     */
    Message<T> receive() throws PulsarClientException;

    /**
     * Receive a single message
     * <p>
     * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
     * </p>
     * <p>
     * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
     * received message. Else it creates <i> backlog of receive requests </i> in the application.
     * </p>
     *
     * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
     */
    CompletableFuture<Message<T>> receiveAsync();

MessageListener則通過ConsumerBuilder介面進行設定並傳入到Consumer的構造方法中。

這三個API都由ConsumerImpl#messageReceived觸發,即Consumer接收到訊息後根據請求的型別來決定:

  • 同步獲取訊息的,將訊息放入記憶體佇列,被掛起的執行緒會從佇列中獲取訊息
  • 非同步獲取訊息的,執行callback將訊息放入future
  • 通過MessageListener處理訊息的,通過ListenerExecutor執行邏輯

可見Pulsar在消費模式上處理是統一的,即無論客戶端採用何種方式進行訊息的接收,訊息統一由服務端進行“推送”,而在Consumer內部根據使用者請求的型別進行處理。

通過ConsumerImpl#messageReceived的實現可以發現Pulsar的訊息消費是一種“推”的模型,這和RocketMQ的“拉”的模型差異是很大的(RocketMQ採用一種Long-Polling的方式,由Consumer主動發起請求從服務端獲取資料,若服務端有需要處理的訊息,請求立即返回;如果沒有訊息,這個請求會在服務單阻塞一段時間,直到新訊息到達或者請求即將超時,返回給客戶端)。

Consumer獲取訊息的模型

具體看Pulsar-Consumer獲取訊息的程式碼實現會發現它也不是一種純粹的,類似淘寶Notify的推的模式,而是一種推拉結合的方式,示意如下:

  1. Consumer向Broker傳送FLOW請求,通知Broker可以推送訊息給Consumer
  2. Broker將訊息通過MESSAGE請求將訊息推送給Consumer

這是一個反覆的過程,每次Consumer接收訊息處理後都會繼續傳送FLOW請求給Broker。

這是在RocketMQ或者Kafka的設計中都沒有的一種方式,這種方式進行一定的拓展則可以實現類似akka的Dynamic Push/Pull模式(詳見公眾號歷史文章:《Push or Pull?》)。

在閱讀Pulsar Consumer部分程式碼的時候還發現非常有趣的一點,當你搜索“Consumer”時會出現一個Consumer介面和一個Consumer類:

  • 介面: org.apache.pulsar.client.api.Consumer
  • 類: org.apache.pulsar.broker.service.Consumer

Consumer介面是Client模組定義Consumer行為的,為什麼在Broker模組會有一個Consumer類?

實際在Broker端會給連結上來的Consumer構造一個對應的Consumer物件,維護遠端的Consumer的連結等資訊。所有對遠端的Consumer的操作會封裝在Broker端的Consumer中。這樣可以更好的實現程式碼的可插拔性,降低耦合,提升程式碼的可測試性。比如在測試Broker端的邏輯時,只需要Mock一個Consumer類來模擬各種正常和網路異常的情況,而不需要真正的啟動一個Consumer。

總結

本文主要是介紹一下Pulsar Consumer模組的相關概念和一些模型,沒有深入的解讀程式碼實現。Pulsar Consumer的實現方式還是非常有趣的,和大家比較熟悉的RocketMQ的實現方式差異比較大,值得一讀。