1. 程式人生 > 其它 >RocketMQ:(5) 訊息過濾機制

RocketMQ:(5) 訊息過濾機制

  RocketMQ支援表示式過濾與類過濾兩種模式,其中表達式又分為TAG和SQL92。類過濾模式允許提交一個過濾類到FilterServer,訊息消費者從FilterServer拉取訊息,訊息經過FilterServer時會執行過濾邏輯。

基於表示式的訊息過濾

  訊息傳送者在訊息傳送時如果設定了訊息的tags屬性,儲存在訊息屬性中,先儲存在CommitLog檔案中,然後轉發到訊息消費佇列,訊息消費佇列會用8個位元組儲存訊息tag的hashcode,之所以不直接儲存tag字串,是因為將ConumeQueue設計為定長結構,加快訊息消費的載入效能。
  RocketMQ基於表示式的訊息過濾是在訂閱時做過濾。在Broker端拉取訊息時,遍歷ConsumeQueue,只對比訊息tag的hashcode

,如果匹配則返回,否則忽略該訊息。Consume在收到訊息後,同樣需要先對訊息進行過濾,只是此時比較的是訊息tag的值而不再是hashcode

Step1:消費者訂閱訊息主題與訊息過濾表示式。構建訂閱資訊subscriptionData並加入到RebalanceImpl進行訊息佇列負載。
  subscriptionData的核心屬性:
  1)String SUB_ALL:過濾模式,預設為全匹配。
  2)boolean classFilterMode:是否是類過濾模式,預設為false。
  3)String topic:訊息主題名稱。
  4)String subString:訊息過濾表示式,多個用雙豎線隔開,例如“TAGA||TAGB”。


  5)Set<String> tagsSet:訊息過濾tag集合,消費端過濾時進行訊息過濾的依據。
  6)Set<String> codeSet:訊息過濾tag hashcode集合。
  7)String expressionType:過濾型別,TAG或SQL92。
Step2:根據訂閱訊息構建訊息拉取標記。根據主題、訊息過濾表示式構建訂閱訊息實體。構建訊息過濾物件。
Step3:根據偏移量拉取訊息後,首先根據ConsumeQueue條目進行訊息過濾,如果不匹配則直接跳過該條訊息,繼續拉取下一條訊息。
Step4:如果訊息根據ConsumeQueue條目通過過濾,則需要從CommitLog檔案中載入整個訊息體,然後根據屬性進行過濾。基於TAG模式,根據ConsumeQueue進行訊息過濾時只對比tag的hashcode,所以基於TAG模式訊息過濾,還需要在訊息消費端對訊息tag進行精確匹配。

  從訊息拉取流程知道,訊息拉取執行緒PullMessageService預設會使用非同步方式從伺服器拉取訊息,如果訊息過濾模式為TAG模式,並且訂閱TAG集合不為空,則對訊息的tag進行判斷,如果集合中包含訊息的TAG則返回給消費者消費,否則跳過。