1. 程式人生 > >RocketMQ生產者和消費者關鍵概念總結

RocketMQ生產者和消費者關鍵概念總結

  1. 每隔30S向Broker所有節點發送心跳,傳送groupName,消費模式(叢集,廣播),獲取模式(push,pull),consumeFromWhere,SubscriptionData等資訊
  2. 每隔30S從Namesrv獲取最新的TopicRouteData,提取所有的MessageQueue,存入RebalanceImpl的topicSubscribeInfoTable中,以備負載均衡時使用
  3. 每隔30S根據獲取到的最新TopicRouteData,更新Broker資訊,移除下線的Broker
  4. 每隔5S向Broker Master持久化進度,若Master宕機則向Slave持久化
  5. 每隔20S進行一次負載均衡,隨機從一個Broker上獲取指定Group的所有Consumer的id集合(叢集消費模式下),將Client(消費者)和Master的MessageQueue排序,然後均分佇列,不夠的按ClientId序號分配餘數。在Broker和Consumer不變的情況下,每個Client消費的MessageQueue不會變化,因為每次都是這些資料,排序後重新分配的結果和原來一樣。當Broker和Consumer有變化後,才可能會改變原先消費的MessageQueue位置。
  6. RebalanceService獲取到最新的Topic路由資訊,當Broker或者Consumer有變化時,,更新Client持有的消費佇列,將那些被不再屬於自身的消費佇列提交最新的消費進度,然後移除出消費請求列表,對於新增的消費佇列,根據消費策略(consumeFromWhere)獲取此佇列相應的消費進度,生成訊息拉取請求,馬上執行訊息拉取服務。
  7. 在消費者啟動時,會立即執行一次負載均衡,分配當前消費者訂閱的MessageQueue,同時獲取這些MessageQueue的相應消費進度,將這些資訊封裝成一個PullRequest,將其放入PullMessageService#pullRequestQueue中,pullRequestQueue是一個阻塞佇列LinkedBlockingQueue,當有PullRequest放入pullRequestQueue中,PullMessageService會立即從pullRequestQueue 彈出 PullRequest來執行。當第一次負載均衡結束後,put一個PullRequest進pullRequestQueue,PullMessageService就馬上執行拉取資訊請求了。
  8. 每一個消費佇列的第一次拉取請求都是由RebalanceService在執行完負載均衡後產生,之後的請求由上次拉取請求返回後生成,當負載均衡後若指定的消費佇列不屬於此Client,那麼會設定其為丟棄的佇列,此舉會中斷這種迴圈式的生成請求,也就是中斷對不屬於此Cilent的消費佇列的請求。
  9. RocketMQ在拉取到訊息後,會存入ProcessQueue的一個TreeMap型別的屬性中,key為ConsumeQueue的Offset,value為MessageExt,訊息在其中按Offset有序排列
  10. 消費者在拉取到訊息後,根據consumeMessageBatchMaxSize的大小指定每個執行緒消費訊息的數量來拆分訊息填入不同的消費者執行緒,然後提交到消費者執行緒池,執行消費
  11. 消費者在消費完訊息後(無論訊息是否消費成功),會根據訊息的Offset,去TreeMap中移除相應的鍵值對。RocketMQ會每隔15分鐘清除TreeMap中消費時間超過15分鐘的訊息(將訊息發回Broker,同時從TreeMap中移除),每15分鐘最多移除16條過期訊息
  12. 併發式的消費者預設每次消費1條資訊,當返回null,RECONSUME_LATER,或者丟擲異常時,預設消費失敗,消費者會把當前訊息發回給Broker,若發回失敗,則在5S後重新消費此訊息
  13. 無論訊息是否消費成功,均會更新到本地以及Broker上的進度表,因為消費失敗的訊息會被髮回給Broker,存入%RETRY%+consumeGroup 的Topic中,消費者在啟動時預設訂閱重試Topic,發回給Broker的訊息會根據delayLevel來判斷重投遞時間
  14. 正常消費情形下更新進度只能增長,也就是多個執行緒併發更新進度時,只能將進度更新為當前之後,在下一次請求時會附帶上當前消費的進度。
  15. 在拉取訊息時,如果是從Master處拉取,則在拉取訊息時會附帶更新Broker上的消費進度,消費者內有一個定時任務,每隔5S將消費進度持久化到Broker上,因為消費進度只會更得越來越大,所以兩種更新形式執行時間晚的會覆蓋之前的值。
  16. 併發式的訊息拉取,每次拉取的最大數量為32條訊息,每個執行緒預設消費數量為1條,因此當一次拉去後,最多會生成32個消費者執行緒併發消費,消費者的消費執行緒池最小執行緒數為20,最大為64
  17. 在拉取請求時要指定ConsumeQueue的起始位置,拉取的位置始終 >= 提交的消費進度位置 ,因為消費者可能還有執行緒在消費之前拉取的訊息
  18. 當拉取到訊息後,將訊息放進TreeMap中,然後提交消費請求(執行緒)到消費者執行緒池(如果消費者執行緒池已經滿了,會阻塞放入訊息,直到有空閒的執行緒),提交了之後就會馬上傳送下一次拉取請求,而不必等到拉取的訊息消費完。如果Broker上沒有新的訊息,就會掛起此請求,直到生產者提交了新的訊息,返回下一批訊息
  19. 基於以上原理,當Consumer突然宕機時,會丟失當前正在消費或者以消費完但是未提交的消費進度,Consumer恢復時會重複拉取這部分訊息,所以要在消費者端做訊息去重,或者保證消費的冪等性。