1. 程式人生 > 實用技巧 >RocketMq-02-特性詳解&場景介紹

RocketMq-02-特性詳解&場景介紹

一、基本概念 1、訊息模型 RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產訊息,Consumer 負責消費訊息,Broker 負責儲存訊息。 2、訊息生產者(producer) 負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息傳送到broker伺服器。RocketMQ提供多種傳送方式,同步傳送、非同步傳送、順序傳送、單向傳送。同步和非同步方式均需要Broker返回確認資訊,單向傳送不需要。 3、訊息消費者(Consumer) 負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費(pull consumer)、推動式消費(push consumer)。 4、主題(Topic)
表示一類訊息的集合,每個主題包含若干條訊息,每條訊息只能屬於一個主題,是RocketMQ進行訊息訂閱的基本單位。 5、代理伺服器(Broker Server) 訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ系統中負責接收從生產者傳送來的訊息並存儲、同時為消費者的拉取請求作準備。代理伺服器也儲存訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等。 6、名字服務(Name Server) 名稱服務充當路由訊息的提供者。生產者或消費者能夠通過名字服務查詢各主題相應的Broker IP列表。多個Namesrv例項組成叢集,但相互獨立,沒有資訊交換。 7、拉取式消費(Pull Consumer)
Consumer消費的一種型別,應用通常主動呼叫Consumer的拉訊息方法從Broker伺服器拉訊息、主動權由應用控制。一旦獲取了批量訊息,應用就會啟動消費過程。 8 、推動式消費(Push Consumer) Consumer消費的一種型別,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。 9、生產者組(Producer Group) 同一類Producer的集合,這類Producer傳送同一類訊息且傳送邏輯一致。如果傳送的是事物訊息且原始生產者在傳送之後崩潰,則Broker伺服器會聯絡同一生產者組的其他生產者例項以提交或回溯消費。 10 、消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類訊息且消費邏輯一致。消費者組使得在訊息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者例項必須訂閱完全相同的Topic。RocketMQ 支援兩種訊息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。 11、叢集消費(Clustering) 叢集消費模式下,相同Consumer Group的每個Consumer例項平均分攤訊息。 12 、廣播消費(Broadcasting) 廣播消費模式下,相同Consumer Group的每個Consumer例項都接收全量的訊息。 13、普通順序訊息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費佇列收到的訊息是有順序的,不同訊息佇列收到的訊息則可能是無順序的。 14、嚴格順序訊息(Strictly Ordered Message) 嚴格順序訊息模式下,消費者收到的所有訊息均是有順序的。 15 、訊息(Message) 訊息系統所傳輸資訊的物理載體,生產和消費資料的最小單位,每條訊息必須屬於一個主題。RocketMQ中每個訊息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢訊息的功能。 16 、標籤(Tag) 為訊息設定的標誌,用於同一主題下區分不同型別的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。 二、整體架構 三、RocketMQ特性 Producer 傳送方式 package org.apache.rocketmq.client.impl; public enum CommunicationMode { SYNC, ASYNC, ONEWAY, } Sync: 同步的傳送方式,會等待發送結果後才返回 Async: 非同步的傳送方式,傳送完後,立刻返回。Client 在拿到 Broker 的響應結果後,會 回撥指定的 callback. 這個 API 也可以指定 Timeout,不指定也是預設的 3000ms. Oneway: 比較簡單,發出去後,什麼都不管直接返回。 傳送狀態 package org.apache.rocketmq.client.producer; public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE, } SEND_OK 訊息傳送成功。要注意的是訊息傳送成功也不意味著它是可靠的。要確保不會丟失任何 訊息,還應啟用同步Master伺服器或同步刷盤,即SYNC_MASTER或 SYNC_FLUSH FLUSH_DISK_TIMEOUT 訊息傳送成功但是伺服器刷盤超時。此時訊息已經進入伺服器佇列(記憶體),只有服務 器宕機,訊息才會丟失。訊息儲存配置引數中可以設定刷盤方式和同步刷盤時間長度,如果 Broker伺服器設定了刷盤方式為同步刷盤,即FlushDiskType=SYNC_FLUSH(預設為非同步 刷盤方式),當Broker伺服器未在同步刷盤時間內(預設為5s)完成刷盤,則將返回該狀 態——刷盤超時。 FLUSH_SLAVE_TIMEOUT 訊息傳送成功,但是伺服器同步到Slave時超時。此時訊息已經進入伺服器佇列,只有 伺服器宕機,訊息才會丟失。如果Broker伺服器的角色是同步Master,即 SYNC_MASTER(預設是非同步Master即ASYNC_MASTER),並且從Broker伺服器未在同 步刷盤時間(預設為5秒)內完成與主伺服器的同步,則將返回該狀態——資料同步到 Slave伺服器超時。 SLAVE_NOT_AVAILABLE 訊息傳送成功,但是此時Slave不可用。如果Broker伺服器的角色是同步Master,即 SYNC_MASTER(預設是非同步Master伺服器即ASYNC_MASTER),但沒有配置slave Broker伺服器,則將返回該狀態——無Slave伺服器可用。 順序訊息 訊息有序指的是可以按照訊息的傳送順序來消費(FIFO)。RocketMQ可以嚴格的保證消 息有序,可以分為分割槽有序或者全域性有序。 順序消費的原理解析,在預設的情況下訊息傳送會採取Round Robin輪詢方式把訊息 傳送到不同的queue(分割槽佇列);而消費訊息的時候從多個queue上拉取訊息,這種情況發 送和消費是不能保證順序。但是如果控制傳送的順序訊息只依次傳送到同一個queue中,消 費的時候只從這個queue上依次拉取,則就保證了順序。當傳送和消費參與的queue只有一 個,則是全域性有序;如果多個queue參與,則為分割槽有序,即相對每個queue,訊息都是有 序的。 下面用訂單進行分割槽有序的示例。一個訂單的順序流程是:建立、付款、推送、完成。 訂單號相同的訊息會被先後傳送到同一個佇列中,消費時,同一個OrderId獲取到的肯定是 同一個佇列。 訊息狀態 public enum ConsumeOrderlyStatus { /** *消費成功 * Success consumption */ SUCCESS, /** *不能跳過訊息,等待一下 * Suspend current queue a moment */ SUSPEND_CURRENT_QUEUE_A_MOMENT; } 延時訊息 定時訊息是指訊息發到 Broker 後,不能立刻被 Consumer 消費,要到特定的時間點 或者等待特定的時間後才能被消費。 使用場景 如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h後去檢查這個訂單的 狀態,如果還是未付款就取消訂單釋放庫存。 延時機制 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 現在RocketMq並不支援任意時間的延時,需要設定幾個固定的延時等級,從1s到2h 分別對應著等級1到18 ,訊息傳送時間與設定的延時等級 和重試次數有關。 批量訊息 批量傳送訊息能顯著提高傳遞小訊息的效能。限制是這些批量訊息應該有相同的 topic,相同的waitStoreMsgOK,而且不能是延時訊息。此外,這一批訊息的總大小不應 超過4MB。rocketmq建議每次批量訊息大小大概在1MB。 當訊息大小超過4MB時,需要將訊息進行分割 過濾訊息 大多數情況下,可以通過TAG來選擇您想要的訊息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); 使用Filter功能,需要在啟動配置檔案當中配置以下選項 enablePropertyFilter=true 消費者將接收包含TAGA或TAGB或TAGC的訊息。但是限制是一個訊息只能有一個標 籤,這對於複雜的場景可能不起作用。在這種情況下,可以使用SQL表示式篩選訊息。SQL 特性可以通過傳送訊息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些 簡單的邏輯。下面是一個例子 ‐‐‐‐‐‐‐‐‐‐‐‐ 2 | message | 3 |‐‐‐‐‐‐‐‐‐‐| a > 5 AND b = 'abc' 4 | a = 10 | ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐> Gotten 5 | b = 'abc'| 6 | c = true | 7 ‐‐‐‐‐‐‐‐‐‐‐‐ 8 ‐‐‐‐‐‐‐‐‐‐‐‐ 9 | message | 10 |‐‐‐‐‐‐‐‐‐‐| a > 5 AND b = 'abc' 11 | a = 1 | ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐> Missed 12 | b = 'abc'| 13 | c = true | 14 ‐‐‐‐‐‐‐‐‐‐‐‐ 基本語法: RocketMQ只定義了一些基本語法來支援這個特性。你也可以很容易地擴充套件它。 2 數值比較,比如:>,>=,<,<=,BETWEEN,=; 3 字元比較,比如:=,<>,IN; 4 IS NULL 或者 IS NOT NULL; 5 邏輯符號 AND,OR,NOT; 6 常量支援型別 7 數值,比如:123,3.1415; 8 字元,比如:'abc',必須用單引號包裹起來; 9 NULL,特殊的常量 10 布林值,TRUE 或 FALSE 11 只有使用push模式的消費者才能用使用SQL92標準的sql語句,介面如下: 12 public void subscribe(final String topic, final MessageSelector messageS elector) 事務訊息 概念:
  • 事務訊息:訊息佇列 MQ 提供類似 X/Open XA 的分散式事務功能,通過訊息隊 MQ 事務訊息能達到分散式事務的最終一致。
  • 半事務訊息:暫不能投遞的訊息,傳送方已經成功地將訊息傳送到了訊息佇列 MQ 服務端,但是服務端未收到生產者對該訊息的二次確認,此時該訊息被標記 暫不能投遞 狀態,處於該種狀態下的訊息即半事務訊息。
  • 訊息回查:由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確 認丟失,訊息佇列 MQ 服務端通過掃描發現某條訊息長期處於 半事務訊息 時,需要 主動向訊息生產者詢問該訊息的最終狀態( Commit 或是 Rollback ),該詢問過程即 訊息回查。
場景: 通過購物車進行下單的流程中,使用者入口在購物車系統,交易下單入口在交易系統,兩 個系統之間的資料需要保持最終一致,這時可以通過事務訊息進行處理。交易系統下單之 後,傳送一條交易下單的訊息到訊息佇列 MQ ,購物車系統訂閱訊息佇列 MQ 的交易下單消 息,做相應的業務處理,更新購物車資料 事務訊息狀態 package org.apache.rocketmq.client.producer; public enum LocalTransactionState { // 提交事務,它允許消費者消費此訊息。 COMMIT_MESSAGE, //回滾事務,它代表該訊息將被刪除,不允許被消費 ROLLBACK_MESSAGE, // 中間狀態,它代表需要檢查訊息佇列來確定狀態 UNKNOW, } 互動流程 事務訊息限制 1、事務訊息不支援延時訊息和批量訊息。 2、為了避免單個訊息被檢查太多次而導致半佇列訊息累積,我們預設將單個訊息的檢查次數限製為 15 次,但是使用者可以通過 Broker 配置檔案的 transactionCheckMax引數來修改 此限制。如果已經檢查某條訊息超過 N 次的話( N = transactionCheckMax ) 則 Broker丟棄此訊息,並在預設情況下同時列印錯誤日誌。使用者可以通過重寫AbstractTransactionCheckListener類來修改這個行為。 3、事務訊息將在 Broker 配置檔案中的引數 transactionMsgTimeout 這樣的特定時間長度之後被檢查。當傳送事務訊息時,使用者還可以通過設定使用者屬性 CHECK_IMMUNITY_TIME_IN_SECONDS來改變這個限制,該引數優先於 transactionMsgTimeout 引數。 4、事務性訊息可能不止一次被檢查或消費。 5、提交給使用者的目標主題訊息可能會失敗,目前這依日誌的記錄而定。它的高可用性通過RocketMQ 本身的高可用性機制來保證,如果希望確保事務訊息不丟失、並且事務完整性 得到保證,建議使用同步的雙重寫入機制。 6、事務訊息的生產者 ID 不能與其他型別訊息的生產者 ID 共享。與其他型別的訊息不同,事務訊息允許反向查詢、MQ伺服器能通過它們的生產者 ID 查詢到消費者。 消費模型 consumer 有兩種消費模型: public enum MessageModel { /** * broadcast廣播消費,相同Consumer Group的每個Consumer例項都接收全量的訊息 */ BROADCASTING("BROADCASTING"), /** * clustering叢集消費,相同Consumer Group的每個Consumer例項平均分攤訊息 */ CLUSTERING("CLUSTERING"); private String modeCN; MessageModel(String modeCN) { this.modeCN = modeCN; } public String getModeCN() { return modeCN; } } 消費點位 當建立一個新的消費者組時,需要決定是否需要消費已經存在於 Broker 中的歷史訊息 package org.apache.rocketmq.common.consumer; public enum ConsumeFromWhere { CONSUME_FROM_LAST_OFFSET, //將會忽略歷史訊息,並消費之後生成的任何訊息。 CONSUME_FROM_FIRST_OFFSET,//將會消費每個存在於 Broker 中的資訊 CONSUME_FROM_TIMESTAMP, //消費在指定時間戳後產生的訊息 } 訊息重複冪等 RocketMQ無法避免訊息重複,所以如果業務對消費重複非常敏感,務必要在業務層面去重 冪等令牌是生產者和消費者兩者中的既定協議,在業務中通常是具備唯一業務標識的字元 串,如:訂單號、流水號等。且一般由生產者端生成並傳遞給消費者端。