訊息佇列RocketMQ應用
一、RocketMQ簡介
RocketMQ
是阿里巴巴開源的分散式訊息中介軟體。支援事務訊息、順序訊息、批量訊息、定時訊息、訊息回溯等。它裡面有幾個區別於標準訊息中件間的概念,如Group
、Topic
、Queue
等。系統組成則由Producer
、Consumer
、Broker
、NameServer
等。
RocketMQ特點:
- 是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式等特點;
-
Producer
、Consumer
、佇列都可以分散式; -
Producer
向一些佇列輪流傳送訊息,佇列集合稱為Topic
,Consumer
如果做廣播消費,則一個Consumer
例項消費這個Topic
Consumer
例項平均消費這個Topic
對應的佇列集合; - 能夠保證嚴格的訊息順序;
- 支援拉(
pull
)和推(push
)兩種訊息模式; - 高效的訂閱者水平擴充套件能力;
- 實時的訊息訂閱機制;
- 億級訊息堆積能力;
- 支援多種訊息協議,如
JMS
、OpenMessaging
等; - 較少的依賴。
二、訊息中介軟體對比
RocketMQ
是阿里巴巴在2012
年開源的分散式訊息中介軟體,目前已經捐贈給Apache
軟體基金會,並於2017年9月25日成為Apache
的頂級專案,撐得住雙11,不過在windows
部署會有一些不穩定,最好還是在linux
上部署。
三、RocketMQ的核心概念
3.1 Name Server、Broker
訊息佇列RocketMQ
在任何一個環境都是可擴充套件的,生產者必須是一個叢集,訊息伺服器必須是一個叢集,消費者也同樣。叢集級別的高可用,是訊息佇列RocketMQ
跟其他的訊息伺服器的主要區別,訊息生產者傳送一條訊息到訊息伺服器,訊息伺服器會隨機的選擇一個消費者,只要這個消費者消費成功就認為是成功了。
注意:文中所提及的訊息佇列RocketMQ
的服務端或者伺服器包含Name Server
、Broker
等。服務端不等同於Broker
。
RocketMQ
主要由Producer
、Broker
、Consumer
三部分組成,其中Producer
負責生產訊息,Consumer
Broker
負責儲存訊息。Broker
在實際部署過程中對應一臺伺服器,每個Broker
可以儲存多個Topic
的訊息,每個Topic
的訊息也可以分片儲存於不同的Broker
。Message Queue
用於儲存訊息的實體地址,每個Topic
中的訊息地址儲存於多個Message Queue
中。ConsumerGroup
由多個Consumer
例項構成。
圖中所涉及到的概念如下所述:
-
Name Server:名稱服務充當路由訊息的提供者。是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。在訊息佇列
RocketMQ
中提供命名服務,更新和發現Broker
服務。
NameServer
即名稱服務,兩個功能:
- 接收
broker
的請求,註冊broker
的路由資訊 - 接收
client(
producer/consumer)
的請求,根據某個topic
獲取其到broker
的路由資訊
NameServer
沒有狀態,可以橫向擴充套件。每個broker
在啟動的時候會到NameServer
註冊;Producer
在傳送訊息前會根據topic
到NameServer
獲取路由(到broker)資訊;Consumer
也會定時獲取topic
路由資訊。
-
Broker:訊息中轉角色,負責儲存訊息,轉發訊息。可以理解為訊息佇列伺服器,提供了訊息的接收、儲存、拉取和轉發服務。
broker
是RocketMQ
的核心,所以需要保證broker
的高可用。
broker
分為Master Broker
和Slave Broker
,一個Master Broker
可以對應多個Slave Broker
,但是一個Slave Broker
只能對應一個Master Broker
。
Master
與Slave
的對應關係通過指定相同的BrokerName
,不同的BrokerId
來定義,BrokerId
為0
表示Master
,非0
表示Slave
。Master
也可以部署多個。
每個Broker
與Name Server
叢集中的所有節點建立長連線,定時註冊Topic
資訊到所有Name Server
。Broker
啟動後需要完成一次將自己註冊至Name Server
的操作;隨後每隔30s
定期向Name Server
上報Topic
路由資訊。 -
Producer:負責生產訊息,一般由業務系統負責生產訊息。與
Name Server
叢集中的其中一個節點(隨機)建立長連結(Keep-alive
),定期從Name Server
讀取Topic
路由資訊,並向提供Topic
服務的Master Broker
建立長連結,且定時向Master Broker
傳送心跳。 -
Consumer:負責消費訊息,一般是後臺系統負責非同步消費。與
Name Server
叢集中的其中一個節點(隨機)建立長連線,定期從Name Server
拉取Topic
路由資訊,並向提供Topic
服務的Master Broker
、Slave Broker
建立長連線,且定時向Master Broker
、Slave Broker
傳送心跳。Consumer
既可以從Master Broker
訂閱訊息,也可以從Slave Broker
訂閱訊息,訂閱規則由Broker
配置決定。
另外,Broker
中還存在一些非常重要的名詞需要說明:
3.2 Topic、Queue、tags
RocketMQ
的Topic/Queue
和JMS
中的Topic/Queue
概念有一定的差異,JMS
中所有消費者都會消費一個Topic
訊息的副本,而Queue
中訊息只會被一個消費者消費;但到了RocketMQ
中Topic
只代表普通的訊息佇列,而Queue
是組成Topic
的更小單元。
Topic:表示訊息的第一級型別,比如一個電商系統的訊息可以分為:交易訊息、物流訊息等。一條訊息必須有一個Topic。
Queue:主題被劃分為一個或多個子主題,稱為“message queues”。一個topic
下,我們可以設定多個queue(訊息佇列)。當我們傳送訊息時,需要要指定該訊息的topic。RocketMQ
會輪詢該topic
下的所有佇列,將訊息傳送出去。
定義:Queue
是Topic
在一個Broker
上的分片,在分片基礎上再等分為若干份(可指定份數)後的其中一份,是負載均衡過程中資源分配的基本單元。叢集消費模式下一個消費者只消費該Topic
中部分Queue
中的訊息,當一個消費者開啟廣播模式時則會消費該Topic
下所有Queue
中的訊息。
先看一張有關Topic
和Queue
的關係圖:
Tags:Tags
是Topic
下的次級訊息型別/二級型別(注:Tags也支援TagA || TagB這樣的表示式),可以在同一個Topic
下基於Tags
進行訊息過濾。Tags
的過濾需要經過兩次比對,首先會在Broker
端通過Tag hashcode
進行一次比對過濾,匹配成功傳到consumer
端後再對具體Tags
進行比對,以防止Tag hashcode
重複的情況。比如交易訊息又可以分為:交易建立訊息,交易完成訊息等,一條訊息可以沒有Tag
。RocketMQ
提供2
級訊息分類,方便大家靈活控制。標籤,換句話說,為使用者提供了額外的靈活性。有了標籤,來自同一個業務模組的不同目的的訊息可能具有相同的主題和不同的標籤。標籤將有助於保持您的程式碼乾淨和連貫,並且標籤還可以為RocketMQ
提供的查詢系統提供幫助。
Queue
中具體的儲存單元結構如下圖,最後面的8
個Byte
儲存Tag
資訊。
3.3 Producer與Producer Group
Producer:表示訊息佇列的生產者。訊息佇列的本質就是實現了publish-subscribe
模式,生產者生產訊息,消費者消費訊息。所以這裡的Producer
就是用來生產和傳送訊息的,一般指業務系統。RocketMQ
提供了傳送:普通訊息(同步、非同步和單向)、定時訊息、延時訊息、事務訊息。
Producer Group:是一類Producer
的集合名稱,這類Producer
通常傳送一類訊息,且傳送邏輯一致。相同角色的生產者被分組在一起。同一生產者組的另一個生產者例項可能被broker
聯絡,以提交或回滾事務,以防原始生產者在交易後崩潰。
警告:考慮提供的生產者在傳送訊息時足夠強大,每個生產者組只允許一個例項,以避免對生產者例項進行不必要的初始化。
3.4 Consumer與Consumer Group
Consumer:訊息消費者,一般由業務後臺系統非同步的消費訊息。
-
Push Consumer:
Consumer
的一種,應用通常向Consumer
物件註冊一個Listener
介面,一旦收到訊息,Consumer
物件立刻回撥Listener
介面方法。 -
Pull Consumer:
Consumer
的一種,應用通常主動呼叫Consumer
的拉訊息方法從Broker
拉訊息,主動權由應用控制。
Consumer Group:Consumer Group
是一類Consumer
的集合名稱,這類Consumer
通常消費一類訊息,且消費邏輯一致(使用相同Group ID
的訂閱者屬於同一個叢集。同一個叢集下的訂閱者消費邏輯必須完全一致(包括Tag
的使用),這些訂閱者在邏輯上可以認為是一個消費節點)。消費者群體是一個偉大的概念,它實現了負載平衡和容錯的目標,在資訊消費方面,是非常容易的。
警告:消費者群體的消費者例項必須訂閱完全相同的主題。
四、元件的關係
4.1 Broker、Producer和Consumer
如果不考慮負載均衡和高可用,最簡單的Broker
,Producer
和Consumer
之間的關係如下圖所示:
4.2 Topic,Topic分片和Queue
Queue是
RocketMQ
中的另一個重要概念。在對該概念進行分析介紹前,我們先來看上面的這張圖:
從本質上來說,RocketMQ
中的Queue是
資料分片的產物。為了更好地理解Queue
的定義,我們還需要引入一個新的概念:Topic
分片。在分散式資料庫和分散式快取領域,分片概念已經有了清晰的定義。同理,對於RocketMQ
,一個Topic
可以分佈在各個Broker
上,我們可以把一個Topic
分佈在一個Broker
上的子集定義為一個Topic
分片。對應上圖,TopicA
有3
個Topic
分片,分佈在Broker1
,Broker2
和Broker3
上,TopicB
有2
個Topic
分片,分佈在Broker1
和Broker2
上,TopicC
有2
個Topic
分片,分佈在Broker2
和Broker3
上。
將Topic
分片再切分為若干等分,其中的一份就是一個Queue
。每個Topic
分片等分的Queue
的數量可以不同,由使用者在建立Topic
時指定。
queue
數量指定方式:
1、程式碼指定:producer.setDefaultTopicQueueNums(8);
2、配置檔案指定
同時設定broker
伺服器的配置檔案broker.properties:defaultTopicQueueNums=16
3、rocket-console
控制檯指定
我們知道,資料分片的主要目的是突破單點的資源(網路頻寬,CPU
,記憶體或檔案儲存)限制從而實現水平擴充套件。RocketMQ
在進行Topic
分片以後,已經達到水平擴充套件的目的了,為什麼還需要進一步切分為Queue
呢?
解答這個問題還需要從負載均衡說起。以訊息消費為例,借用Rocket MQ
官方文件中的Consumer
負載均衡示意圖來說明:
如圖所示,TOPIC_A
在一個Broker
上的Topic
分片有5
個Queue
,一個Consumer Group
內有2
個Consumer
按照叢集消費的方式消費訊息,按照平均分配策略進行負載均衡得到的結果是:第一個Consumer
消費3
個Queue
,第二個Consumer
消費2
個Queue
。如果增加Consumer
,每個Consumer
分配到的Queue
會相應減少。Rocket MQ
的負載均衡策略規定:Consumer
數量應該小於等於Queue
數量,如果Consumer
超過Queue
數量,那麼多餘的Consumer
將不能消費訊息。
在一個Consumer Group
內,Queue
和Consumer
之間的對應關係是一對多的關係:一個Queue
最多隻能分配給一個Consumer
,一個Cosumer
可以分配得到多個Queue
。這樣的分配規則,每個Queue
只有一個消費者,可以避免消費過程中的多執行緒處理和資源鎖定,有效提高各Consumer
消費的並行度和處理效率。
由此,我們可以給出Queue
的定義:Queue
是Topic
在一個Broker
上的分片等分為指定份數後的其中一份,是負載均衡過程中資源分配的基本單元。
五、RocketMQ生產模式
- 訊息傳送方式
RocketMQ提供三種方式可以傳送普通訊息:同步、非同步、和單向傳送。 - 訊息型別
訊息客戶端提供多種SDK:普通、順序、事務、延時訊息
5.1 傳送方式
RocketMQ
傳送普通訊息的三種方式:同步訊息(預設)、非同步訊息和單向訊息。其中前兩種訊息是可靠的,因為會有傳送是否成功的應答。
5.1.1 同步傳送(Sync)
1、原理
同步傳送是指訊息傳送方發出一條訊息後,會在收到服務端返回響應之後才發下一條訊息的通訊方式。
2、應用場景
可靠的同步傳輸應用於廣泛的場景,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。
3、例項程式碼
SendResult sendResult = producer.send(msg);
5.2.2 非同步傳送(Async)
1、原理
非同步傳送是指傳送方發出一條訊息後,不等服務端返回響應,接著傳送下一條訊息的通訊方式。訊息佇列RocketMQ
版的非同步傳送,需要您實現非同步傳送回撥介面(SendCallback
)。訊息傳送方在傳送了一條訊息後,不需要等待服務端響應即可傳送第二條訊息。傳送方通過回撥介面接收服務端響應,並處理響應結果。
2、應用場景
非同步傳送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如,您視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
3、例項程式碼
// 非同步傳送訊息, 傳送結果通過callback返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 訊息傳送成功。
System.out.println("send message success. topic=" + sendResult.getTopic()
+ ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println("send message failed. topic=" + context.getTopic()
+ ", msgId=" + context.getMessageId());
}
});
5.2.3 單向傳送(Oneway)
由於在oneway
方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。
若資料不可丟,建議選用可靠同步或可靠非同步傳送方式。
1、原理
傳送方只負責傳送訊息,不等待服務端返回響應且沒有回撥函式觸發,即只發送請求不等待應答。此方式傳送訊息的過程耗時非常短,一般在微秒級別。
2、應用場景
適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。
3、例項程式碼
//傳送單向訊息,沒有任何返回結果
producer.sendOneway(msg);
5.2 訊息型別
訊息客戶端提供多種SDK:普通、順序、事務、延時訊息
5.2.1 收發順序訊息
順序訊息(FIFO
訊息)是訊息佇列RocketMQ
版提供的一種嚴格按照順序來發布和消費的訊息型別。
順序訊息分為兩類:
-
全域性順序:對於指定的一個
Topic
,所有訊息按照嚴格的先入先出FIFO
(First In First Out)的順序進行釋出和消費。 -
分割槽順序:對於指定的一個
Topic
,所有訊息根據Sharding Key
進行區塊分割槽。同一個分割槽內的訊息按照嚴格的FIFO
順序進行釋出和消費。Sharding Key
是順序訊息中用來區分不同分割槽的關鍵欄位,和普通訊息的Key
是完全不同的概念。
例項程式碼
傳送訊息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
訂閱訊息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
5.2.2 傳送延遲訊息
延時訊息用於指定訊息傳送到訊息佇列RocketMQ
版的服務端後,延時一段時間才被投遞到客戶端進行消費(例如3
秒後才被消費),適用於解決一些訊息生產和消費有時間視窗要求的場景,或者通過訊息觸發延遲任務的場景,類似於延遲佇列。
例項程式碼
// 延時訊息,單位毫秒(ms),在指定延遲時間(當前時間之後)進行投遞,例如訊息在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設定訊息需要被投遞的時間。
msg.setStartDeliverTime(delayTime);
5.2.3 傳送定時訊息
定時訊息可以做到在指定時間戳之後才可被消費者消費,適用於對訊息生產和消費有時間視窗要求,或者利用訊息觸發定時任務的場景。
例項程式碼
// 定時訊息,單位毫秒(ms),在指定時間戳(當前時間之後)進行投遞,例如2021-06-26 16:20:00投遞。
//如果被設定成當前時間戳之前的某個時刻,訊息將立即被投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-06-26 16:20:00").getTime();
msg.setStartDeliverTime(timeStamp);
5.2.4 傳送延時訊息
使用場景
比如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h
後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
延時訊息的使用限制
現在RocketMQ
並不支援任意時間的延時,需要設定幾個固定的延時等級,從1s
到2h
分別對應著等級1
到18
訊息消費失敗會進入延時訊息佇列,訊息傳送時間與設定的延時等級和重試次數有關
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Message message = new Message("delay", msg.getBytes());
//在建立好message物件之後
message.setDelayTimeLevel(3); //這裡是對應延時等級3對應是10s
5.2.5 傳送事務訊息
訊息佇列RocketMQ
版提供類似XA
或Open XA
的分散式事務功能,通過訊息佇列RocketMQ
版事務訊息,能達到分散式事務的最終一致。
事務訊息互動流程如下圖所示。
例項程式碼
傳送事務訊息包含以下兩個步驟:
- 傳送半事務訊息(
Half Message
)及執行本地事務,
public static void main(String[] args) throws MQClientException {
/**
* 建立事務訊息Producer
*/
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
transactionMQProducer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
transactionMQProducer.start();
for (int i = 0; i < 10; i++) {
try {
Message message = new Message(MqConfig.TOPIC,
MqConfig.TAG,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
@Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("開始執行本地事務: " + msg);
return LocalTransactionState.UNKNOW;
}
}, null);
assert sendResult != null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 提交事務訊息狀態。
當本地事務執行完成(執行成功或執行失敗),需要通知伺服器當前訊息的事務狀態。通知方式有以下兩種:
- 執行本地事務完成後提交。
- 執行本地事務一直沒提交狀態,等待伺服器回查訊息的事務狀態。
事務狀態有以下三種:
-
TransactionStatus.CommitTransaction
:提交事務,允許訂閱方消費該訊息。 -
TransactionStatus.RollbackTransaction
:回滾事務,訊息將被丟棄不允許消費。 -
TransactionStatus.Unknow
:無法判斷狀態,期待訊息佇列RocketMQ
版的Broker
向傳送方再次詢問該訊息對應的本地事務的狀態。
/**
* MQ傳送事務訊息本地Check介面實現類
*/
public class LocalTransactionCheckerImpl implements TransactionCheckListener {
@Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("收到事務訊息的回查請求, MsgId: " + msg.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事務回查機制說明
-
傳送事務訊息為什麼必須要實現回查
Check
機制?當步驟1中半事務訊息傳送完成,但本地事務返回狀態為TransactionStatus.Unknow
,或者應用退出導致本地事務未提交任何狀態時,從Broker
的角度看,這條Half
狀態的訊息的狀態是未知的。因此Broker
會定期要求傳送方Check
該Half
狀態訊息,並上報其最終狀態。 -
Check
被回撥時,業務邏輯都需要做些什麼?事務訊息的Check
方法裡面,應該寫一些檢查事務一致性的邏輯。訊息佇列RocketMQ版
傳送事務訊息時需要實現LocalTransactionChecker
介面,用來處理Broker主動發起
的本地事務狀態回查請求,因此在事務訊息的Check
方法中,需要完成兩件事情:- 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。
- 向
Broker
提交該半事務訊息本地事務的狀態。
5.2.6 傳送批量訊息
批量傳送訊息能顯著提高傳遞小訊息的效能。限制是這些批量訊息應該有相同的topic
,相同的waitStoreMsgOK
,而且不能是延時訊息。此外,這一批訊息的總大小不應超過4MB
。
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//處理error
}
批量傳送訊息
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > sizeLimit) {
//it is unexpected that single message exceeds the sizeLimit
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
5.3 分散式訊息OpenMessaging
除了以上傳送方式外,RocketMQ
還支援面向雲的分散式訊息,請參考OpenMessaging 示例
六、RocketMQ消費模式
6.1 訂閱方式
訊息佇列RocketMQ
版支援以下兩種訂閱方式:
1.叢集訂閱:同一個Group ID
所標識的所有Consumer
平均分攤消費訊息。例如某個Topic
有9
條訊息,一個Group ID
有3
個Consumer
例項,那麼在叢集消費模式下每個例項平均分攤,只消費其中的3
條訊息。設定方式如下所示。
// 叢集訂閱方式設定(不設定的情況下,預設為叢集訂閱方式)。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
2.廣播訂閱:同一個Group ID
所標識的所有Consumer
都會各自消費某條訊息一次。例如某個Topic
有9
條訊息,一個Group ID
有3
個Consumer
例項,那麼在廣播消費模式下每個例項都會各自消費9
條訊息。設定方式如下所示。
// 廣播訂閱方式設定。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
說明:請確保同一個
Group ID
下所有Consumer
例項的訂閱關係保持一致。兩種不同的訂閱方式有著不同的功能限制,例如,廣播模式不支援順序訊息、不維護消費進度、不支援重置消費位點等。
6.2 消費方式
訊息佇列RocketMQ
版支援以下兩種訊息獲取方式:
-
Push(預設):訊息由訊息佇列
RocketMQ
版推送至Consumer
。Push
方式下,訊息佇列RocketMQ
版還支援批量消費功能,可以將批量訊息統一推送至Consumer
進行消費。 -
Pull:訊息由
Consumer
主動從訊息佇列RocketMQ
版拉取。
push方式實現
// 程式第一次啟動從訊息佇列頭獲取資料
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 廣播消費
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 註冊回撥實現類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該訊息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pull方式實現
//消費者監聽處理訊息的方法,這裡是抽取的方式
PullResult pullResult = consumer.pull(new MessageQueue("pull-push","*");
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
System.out.println("消費執行緒:"+Thread.currentThread().getName()+
",訊息ID:"+messageExt.getMsgId()+
",訊息內容:"+new String(messageExt.getBody()));
}
消費端的Push
模式是通過長輪詢的模式來實現的,就如同下圖:
Consumer
端每隔一段時間主動向broker
傳送拉訊息請求,broker
在收到Pull
請求後,如果有訊息就立即返回資料,Consumer
端收到返回的訊息後,再回調消費者設定的Listener
方法。如果broker
在收到Pull
請求時,訊息佇列裡沒有資料,broker
端會阻塞請求直到有資料傳遞或超時才返回。
當然,Consumer
端是通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>
中的PullRequest
傳送到broker
拉取訊息,以防止Consumer
一致被阻塞。而Broker
端,在接收到Consumer
的PullRequest
時,如果發現沒有訊息,就會把PullRequest
扔到ConcurrentHashMap
中快取起來。
broker
在啟動時,會啟動一個執行緒不停的從ConcurrentHashMap
取出PullRequest
檢查,直到有資料返回。
區別:
推送方式:必須消費者線上,當生產者生成了資訊之後,消費者自動獲取
抽取方式:當生成者生成了訊息之後,這時剛剛啟動消費者,自動抽取生成者沒有消費的資訊
注意:如果是
springboot
繼承的話,消費者不會停止啟動,都是使用推送方式
6.3 訊息過濾
生產者建立訊息
// 建立訊息,並指定Topic,Tag和訊息體生成者
Message msg = new Message(
"Topic" ,// Topic
"Tag", // Tag
("hello world").getBytes("utf-8") //Message body
);
6.3.1 Tag標籤過濾
傳送訊息時我們會為每一條訊息設定Tag
標籤,同一大類中的訊息放在一個主題Topic
下,但是如果進行分類我們則可以根據Tag
進行分類,每一類消費者可能不是關係某個主題下的所有訊息,我們就可以通過Tag
進行過濾,訂閱關注的某一類資料。
// 設定標籤 消費者
// 訂閱主題Topic和Tag
consumer.subscribe("Topic", "TagA || TagB");
消費者組訂閱相同的主題不同的
Tag時
,如果訂閱是多個Tag
則通過“||”分割
同一個消費者組訂閱的主題,Tag
必須相同
6.3.2 SQL過濾
SQL92
表示式訊息過濾,是通過訊息的屬性執行SQL
過濾表示式進行條件匹配,訊息傳送時需要設定使用者的屬性putUserProperty
方法設定屬性。
支援的語法:
- 數值比較, 如
>,>=,<,<=,BETWEEN,=
; - 字元比較, 如
=,<>,IN
; -
IS NULL
或者IS NOT NULL
; - 邏輯連線符,
AND,OR,NOT
;
支援的常量型別:
- 數值型,如
123
,3.1415
; - 字元型,如‘abc’(必須用單引號);
- NULL,特殊常數;
- 布林值,
TRUE
或FALSE
;
//訂閱主題Topic和Tag
consumer.subscribe("filterSqlTopic", MessageSelector.bySql("i>5"));
注意:1.只有使用push
模式的消費者才能用使用SQL92
標準的sql
語句
2.SQL92
過濾預設關閉,需要在代理開始前設定conf
資料夾下broker.conf
的引數:
#開啟對filter的支援
enablePropertyFilter=true
優化
拉取訊息時進行過濾的效能很差:
- 堆外到堆,一旦每個消費者訂閱相同的主題拉訊息。
- 解碼訊息屬性,一旦每個消費者訂閱相同的主題拉訊息。
可使用BloomFilter
預計算優化情況,具體請參考:RocketMQ通過SQL92過濾訊息
注意:此優化預設關閉,需要在代理開始開啟時設定一些配置:
- enableCalcFilterBitMap = true,表示在構建消費佇列時計算點陣圖。
- expectConsumerNumUseFilter = XX(Integer, default is 32), 表示估計訂閱相同主題的消費者數量。
- maxErrorRateOfBloomFilter = XX(1~100,預設為20),表示布隆過濾器的錯誤率。
- enableConsumeQueueExt = true,表示構造消費佇列擴充套件檔案。
6.3.3 類過濾
通過定義訊息過濾類的介面實現訊息過濾
//使用Java程式碼,在伺服器做訊息過濾
String filterCode = MixAll.file2String("opt\\classfilter\\MessageFilterImpl.java");
consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode);
自定義訊息的過濾類
public class MessageFilterImpl implements MessageFilter {
public boolean match(MessageExt msg, FilterContext arg1) {
String property = msg.getUserProperty("age");
if (property != null) {
int age = Integer.parseInt(property);
if ((age % 3) == 0 && (age > 10)) {
return true;
}
}
return false;
}
}
使用類訊息過濾模式,需要額外需要啟動
filter
元件mqfiltersrv
服務,否則消費不了,每個broker
都需要啟動一個,相當於加了一層過濾層。啟動命令如下:./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &
filtersrv
出現了。減少了Broker
的負擔,又減少了Consumer
接收無用的訊息。當然缺點也是有的,多了一層filtersrv
網路開銷MessageFilterImpl
訊息過濾實現類中的程式碼最好不要帶有中文防止錯誤
注意:RocketMQ4.3.1
開始刪除與mqfilter
伺服器相關的指令碼,4.3.2
刪除客戶端關於mqfilter
客戶端程式碼,後面版本不支援該功能。