RocketMq-02-特性詳解&場景介紹
阿新 • • 發佈:2020-10-11
一、基本概念
1、訊息模型
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產訊息,Consumer 負責消費訊息,Broker 負責儲存訊息。
2、訊息生產者(producer)
負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息傳送到broker伺服器。RocketMQ提供多種傳送方式,同步傳送、非同步傳送、順序傳送、單向傳送。同步和非同步方式均需要Broker返回確認資訊,單向傳送不需要。
3、訊息消費者(Consumer)
負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費(pull consumer)、推動式消費(push consumer)。
4、主題(Topic)
表示一類訊息的集合,每個主題包含若干條訊息,每條訊息只能屬於一個主題,是RocketMQ進行訊息訂閱的基本單位。
5、代理伺服器(Broker Server)
訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ系統中負責接收從生產者傳送來的訊息並存儲、同時為消費者的拉取請求作準備。代理伺服器也儲存訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等。
6、名字服務(Name Server)
名稱服務充當路由訊息的提供者。生產者或消費者能夠通過名字服務查詢各主題相應的Broker IP列表。多個Namesrv例項組成叢集,但相互獨立,沒有資訊交換。
7、拉取式消費(Pull Consumer)
Consumer消費的一種型別,應用通常主動呼叫Consumer的拉訊息方法從Broker伺服器拉訊息、主動權由應用控制。一旦獲取了批量訊息,應用就會啟動消費過程。
8 、推動式消費(Push Consumer)
Consumer消費的一種型別,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。
9、生產者組(Producer Group)
同一類Producer的集合,這類Producer傳送同一類訊息且傳送邏輯一致。如果傳送的是事物訊息且原始生產者在傳送之後崩潰,則Broker伺服器會聯絡同一生產者組的其他生產者例項以提交或回溯消費。
10 、消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類訊息且消費邏輯一致。消費者組使得在訊息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者例項必須訂閱完全相同的Topic。RocketMQ 支援兩種訊息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。
11、叢集消費(Clustering)
叢集消費模式下,相同Consumer Group的每個Consumer例項平均分攤訊息。
12 、廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer例項都接收全量的訊息。
13、普通順序訊息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費佇列收到的訊息是有順序的,不同訊息佇列收到的訊息則可能是無順序的。
14、嚴格順序訊息(Strictly Ordered Message)
嚴格順序訊息模式下,消費者收到的所有訊息均是有順序的。
15 、訊息(Message)
訊息系統所傳輸資訊的物理載體,生產和消費資料的最小單位,每條訊息必須屬於一個主題。RocketMQ中每個訊息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢訊息的功能。
16 、標籤(Tag)
為訊息設定的標誌,用於同一主題下區分不同型別的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。
二、整體架構
三、RocketMQ特性
Producer
傳送方式
package org.apache.rocketmq.client.impl;
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
Sync:
同步的傳送方式,會等待發送結果後才返回
Async:
非同步的傳送方式,傳送完後,立刻返回。Client 在拿到 Broker 的響應結果後,會
回撥指定的 callback. 這個 API 也可以指定 Timeout,不指定也是預設的 3000ms.
Oneway:
比較簡單,發出去後,什麼都不管直接返回。
傳送狀態
package org.apache.rocketmq.client.producer;
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
SEND_OK
訊息傳送成功。要注意的是訊息傳送成功也不意味著它是可靠的。要確保不會丟失任何
訊息,還應啟用同步Master伺服器或同步刷盤,即SYNC_MASTER或
SYNC_FLUSH
。
FLUSH_DISK_TIMEOUT
訊息傳送成功但是伺服器刷盤超時。此時訊息已經進入伺服器佇列(記憶體),只有服務
器宕機,訊息才會丟失。訊息儲存配置引數中可以設定刷盤方式和同步刷盤時間長度,如果
Broker伺服器設定了刷盤方式為同步刷盤,即FlushDiskType=SYNC_FLUSH(預設為非同步
刷盤方式),當Broker伺服器未在同步刷盤時間內(預設為5s)完成刷盤,則將返回該狀
態——刷盤超時。
FLUSH_SLAVE_TIMEOUT
訊息傳送成功,但是伺服器同步到Slave時超時。此時訊息已經進入伺服器佇列,只有
伺服器宕機,訊息才會丟失。如果Broker伺服器的角色是同步Master,即
SYNC_MASTER(預設是非同步Master即ASYNC_MASTER),並且從Broker伺服器未在同
步刷盤時間(預設為5秒)內完成與主伺服器的同步,則將返回該狀態——資料同步到
Slave伺服器超時。
SLAVE_NOT_AVAILABLE
訊息傳送成功,但是此時Slave不可用。如果Broker伺服器的角色是同步Master,即
SYNC_MASTER(預設是非同步Master伺服器即ASYNC_MASTER),但沒有配置slave
Broker伺服器,則將返回該狀態——無Slave伺服器可用。
順序訊息
訊息有序指的是可以按照訊息的傳送順序來消費(FIFO)。RocketMQ可以嚴格的保證消
息有序,可以分為分割槽有序或者全域性有序。
順序消費的原理解析,在預設的情況下訊息傳送會採取Round Robin輪詢方式把訊息
傳送到不同的queue(分割槽佇列);而消費訊息的時候從多個queue上拉取訊息,這種情況發
送和消費是不能保證順序。但是如果控制傳送的順序訊息只依次傳送到同一個queue中,消
費的時候只從這個queue上依次拉取,則就保證了順序。當傳送和消費參與的queue只有一
個,則是全域性有序;如果多個queue參與,則為分割槽有序,即相對每個queue,訊息都是有
序的。
下面用訂單進行分割槽有序的示例。一個訂單的順序流程是:建立、付款、推送、完成。
訂單號相同的訊息會被先後傳送到同一個佇列中,消費時,同一個OrderId獲取到的肯定是
同一個佇列。
訊息狀態
public enum ConsumeOrderlyStatus {
/**
*消費成功
* Success consumption
*/
SUCCESS,
/**
*不能跳過訊息,等待一下
* Suspend current queue a moment
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
延時訊息
定時訊息是指訊息發到 Broker 後,不能立刻被 Consumer 消費,要到特定的時間點
或者等待特定的時間後才能被消費。
使用場景
:
如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h後去檢查這個訂單的
狀態,如果還是未付款就取消訂單釋放庫存。
延時機制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
現在RocketMq並不支援任意時間的延時,需要設定幾個固定的延時等級,從1s到2h
分別對應著等級1到18 ,訊息傳送時間與設定的延時等級
和重試次數有關。
批量訊息
批量傳送訊息能顯著提高傳遞小訊息的效能。限制是這些批量訊息應該有相同的
topic,相同的waitStoreMsgOK,而且不能是延時訊息。此外,這一批訊息的總大小不應
超過4MB。rocketmq建議每次批量訊息大小大概在1MB。
當訊息大小超過4MB時,需要將訊息進行分割
過濾訊息
大多數情況下,可以通過TAG來選擇您想要的訊息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
使用Filter功能,需要在啟動配置檔案當中配置以下選項
enablePropertyFilter=true
消費者將接收包含TAGA或TAGB或TAGC的訊息。但是限制是一個訊息只能有一個標
籤,這對於複雜的場景可能不起作用。在這種情況下,可以使用SQL表示式篩選訊息。SQL
特性可以通過傳送訊息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些
簡單的邏輯。下面是一個例子
‐‐‐‐‐‐‐‐‐‐‐‐
2 | message |
3 |‐‐‐‐‐‐‐‐‐‐| a > 5 AND b = 'abc'
4 | a = 10 | ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐> Gotten
5 | b = 'abc'|
6 | c = true |
7 ‐‐‐‐‐‐‐‐‐‐‐‐
8 ‐‐‐‐‐‐‐‐‐‐‐‐
9 | message |
10 |‐‐‐‐‐‐‐‐‐‐| a > 5 AND b = 'abc'
11 | a = 1 | ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐> Missed
12 | b = 'abc'|
13 | c = true |
14 ‐‐‐‐‐‐‐‐‐‐‐‐
基本語法:
RocketMQ只定義了一些基本語法來支援這個特性。你也可以很容易地擴充套件它。
2 數值比較,比如:>,>=,<,<=,BETWEEN,=;
3 字元比較,比如:=,<>,IN;
4 IS NULL 或者 IS NOT NULL;
5 邏輯符號 AND,OR,NOT;
6 常量支援型別
7 數值,比如:123,3.1415;
8 字元,比如:'abc',必須用單引號包裹起來;
9 NULL,特殊的常量
10 布林值,TRUE 或 FALSE
11 只有使用push模式的消費者才能用使用SQL92標準的sql語句,介面如下:
12 public void subscribe(final String topic, final MessageSelector messageS
elector)
事務訊息
概念:
- 事務訊息:訊息佇列 MQ 提供類似 X/Open XA 的分散式事務功能,通過訊息隊 列 MQ 事務訊息能達到分散式事務的最終一致。
- 半事務訊息:暫不能投遞的訊息,傳送方已經成功地將訊息傳送到了訊息佇列 MQ 服務端,但是服務端未收到生產者對該訊息的二次確認,此時該訊息被標記 成 “ 暫不能投遞 ” 狀態,處於該種狀態下的訊息即半事務訊息。
- 訊息回查:由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確 認丟失,訊息佇列 MQ 服務端通過掃描發現某條訊息長期處於 “ 半事務訊息 ” 時,需要 主動向訊息生產者詢問該訊息的最終狀態( Commit 或是 Rollback ),該詢問過程即 訊息回查。