1. 程式人生 > >RocketMQ 簡介 -- Producer And Consumer

RocketMQ 簡介 -- Producer And Consumer

RocketMQ Producer And Consumer

Producer

  • Producer例項都有的Producer Group Name,此Name在事務訊息時起作用。

  • Producer傳送訊息時,會嘗試從NameServer獲取TopicRoute資訊。

  • Producer傳送訊息時,TopicPublishInfo中的每個MessageQueue代表一個Master的Broker。

  • Producer傳送訊息時,更具Topic選擇Broker時的策略有延遲傳送策略以及預設策略:

    1. 延遲傳送策略:每次連線異常、或其他異常都會將Broker作為隔離broker設定禁用時間為3000;每次操作完成都會將目標broker的請求總時間儲存;當每次請求時根據brokerName獲取可用的(當前系統時間大於broker的啟用時間)的Broker。

    2. 預設策略:在不使用上次使用的Broker前提下,通過ThreadLocal持有的Integer取模得到Broker。

      以上策略由於使用到了ThreadLocal,所以可能出現多個執行緒同時向同一個Broker傳送的情況。

  • Producer傳送的結果只能保證送達到Broker,無法確認是否被消費,即SEND_OK。

  • Producer傳送的訊息只能到Master Broker上,不能向Slave傳送。

  • Producer傳送的訊息方式有:onway、sync、async三種。

  • Producer在傳送訊息時,如果訊息體長度大於預設的4k,則進行壓縮後傳送。

TransactionProducer

  • RocketMQ使用的分散式事務相對於兩段式提交是最終一致性的(弱一致性),即A成功後B才會執行,且不能保證B何時執行。
  • Producer傳送事務訊息時使用sendMessageInTransaction。
  • 使用SendMessageRequestHeader訊息體頭中的properties中的TRAN_MSG屬性標識為事務訊息,以及PGROUP屬性標識ProducerGroupName。
  • 事務訊息處理流程:
    1. Producer傳送TRAN_MSG為true、包含PGROUP屬性、sysflag為100或101的訊息給Broker,作為事務起始標識,此訊息訊息碼為普通訊息碼。
    2. Producer接收到Broker返回的成功訊息後,執行本地事務,然後傳送本地事務執行結果到Broker(END_TRANSACTION)。
    3. 如果最終本地執行成功(END_TRANSACTION訊息是Commit),那麼Comsumer才能看到並消費事務訊息。所以第一步傳送的Prepare和Rollback的訊息是對Consumer不可見的

Consumer

  • Consumer例項都有的Consumer Group Name,此Name在CLUSTERING方式消費訊息時起作用。

  • Consumer預設可以通過Tag來進行Topic下一層的訊息過濾。每個Consumer對每個Topic只能有一個訂閱。

  • Consumer有兩種訊息消費方式:一種是BROADCASTING,另一種是CLUSTERING。預設使用CLUSTERING方式。

    1. BROADCASTING,是廣播方式。 即各個Consumer都會消費訊息。

      使用本地檔案(userHome/.rocketmq_offsets/clientID/groupName/offsets.json或加.bak檔案)儲存消費偏移量(offsetStore)。

      再平衡時,直接刪除本地儲存的MessageQueue和對應的ProcessQueue。同時將新的MessageQueue新增到本地。且不會用到AllocateMessageQueueStrategy介面的平衡策略。

    2. CLUSTERING,類似於P2P的方式。即同一Consumer Group Name下的Consumer分攤消費訊息。使用遠端(Broker)儲存消費偏移量(offsetStore)。

      再平衡時,先從Broker獲取統一ConsumerGroup下的ConsumerID列表;然後根據MessageQueue列表、consumerGroupName、當前consumerID、所有的ConsumerID,通過AllocateMessageQueueStrategy的策略進行再平衡,然後同廣播方式類似,但是會向Broker註冊偏移量。

  • 對於CLUSTERING方式消費的Consumer,內部會有多一個%RETRY%GroupName的Topic訂閱。

  • Consumer在再平衡時通過AllocateMessageQueueStrategy介面的實現類分配MessageQueue,預設為:AllocateMessageQueueAveragely,可選策略有:

    1. AllocateMessageQueueAveragely:分佈策略。7個Broker,5個Consumer時,再平衡後結果為:
    ConsumerIndex MessageQueue
    0 [0, 1]
    1 [2, 3]
    2 [4]
    3 [5]
    4 [6]
    1. AllocateMessageQueueAveragelyByCircle:輪詢分佈策略。7個Broker,5個Consumer時,再平衡後結果為:
    ConsumerIndex MessageQueue
    0 [0, 5]
    1 [1, 6]
    2 [2]
    3 [3]
    4 [4]
    1. AllocateMessageQueueByConfig:根據此策略中的配置,直接返回配置的MessageQueue集合,無演算法。

    2. AllocateMessageQueueByMachineRoom:機房策略,每個Consumer自身持有一部分consumeids,在這部分ids內部平衡。

    3. AllocateMessageQueueConsistentHash:一致性Hash策略,引入虛擬節點平衡。

  • Consumer消費訊息時的監聽器(訊息到達Consumer後執行邏輯所處的回撥類),分為兩種:MessageListenerConcurrently和MessageListenerOrderly:

    1. MessageListenerConcurrently用於併發消費訊息。

      啟動時會啟動一個週期執行緒池清理過期訊息(通過CONSUMER_SEND_MSG_BACK將訊息返回給Broker並從本地佇列刪除)。

    2. MessageListenerOrderly用於按順序、單佇列、單執行緒的消費訊息。

      啟動時會啟動一個週期執行緒池定期鎖定訊息佇列(向Broker傳送LOCK_BATCH_MQ訊息)。

  • 無論是使用Push方式還是Pull方式消費訊息,本質上都是使用Pull的方式從Broker拉取訊息,然後本地消費。區別在於使用DefaultMQPullConsumerImpl是主動呼叫API的方式從Broker獲取訊息並消費,而DefaultMQPushConsumerImpl是通過後臺執行緒從Broker拉取訊息,然後回撥監聽器進行消費。

  • 使用Push方式時,Consumer拉取訊息是從RebalanceService的再平衡開始觸發(RebalancePushImpl.dispatchPullRequest),通過向Broker傳送(PULL_MESSAGE)拉取訊息,並回調訊息返回值進行消費。

Common

公共部分指Producer、Consumer啟動都會啟動的服務(MQClientInstance.start()方法中),但是對於Producer或Consumer可能無意義:

  • 獲取NameServer地址,從環境變數或web服務。
  • 啟動NettyRemotingClient服務,只有NettyRemotingServer才有registerDefaultProcessor,NettyRemotingClient不存在registerDefaultProcessor。
  • 啟動週期執行任務,包括獲取NameServer資訊、從NameServer獲取Topic資訊、從topicRouteTable清空無效Broker、向各個Broker傳送心跳、將消費偏移量持久化(本地或Broker)、調整執行緒池(目前沒有實現)
  • 啟動PullMessageService,通過PULL_MESSAGE從Broker拉取訊息訊息。
  • 啟動RebalanceService,CLUSTERING方式消費訊息時通過AllocateMessageQueueStrategy介面的策略再平衡,所謂的再平衡就是重新構建本地訂閱的Broker數量、地址。