深入理解RocketMQ訊息查詢機制
在實際開發中,經常需要檢視MQ中訊息的內容,RocketMQ提供了多種訊息查詢方式,給開發和運維帶來了極大的便利,一些其他訊息中介軟體,如Kafka,並不具備訊息查詢能力。
本文對RocketMQ提供到的查詢機制和背後原理進行深入的介紹。文章主要包括3個部分:
-
訊息查詢介紹
:介紹訊息查詢中使用到的Message Key 、Unique Key、Message Id 的區別 -
訊息查詢工具
:分別介紹命令列工具、管理平臺、客戶端API這三種工具的詳細用法,以及如何讓消費者重新消費特定的訊息。 -
實現原理
:介紹Message Key & Unique Key與Message Id的實現機制上區別,Unique Key在精確一次消費(Exactly Once)語義下的作用,以及為什麼Message Id查詢效率更高。
訊息查詢介紹
RocketMQ提供了3種訊息查詢方式:
-
按照Message Key 查詢
:訊息的key是業務開發同學在傳送訊息之前自行指定的,通常會把具有業務含義,區分度高的欄位作為訊息的key,如使用者id,訂單id等。 -
按照Unique Key查詢
:除了業務開發同學明確的指定訊息中的key,RocketMQ生產者客戶端在傳送傳送訊息之前,會自動生成一個UNIQ_KEY,設定到訊息的屬性中,從邏輯上唯一代表一條訊息。 -
按照Message Id 查詢
:Message Id 是訊息傳送後,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移資訊,並會將Message Id作為傳送結果的一部分進行返回。Message Id中屬於精確匹配,從物理上唯一代表一條訊息,查詢效率更高。
RocketMQ有意弱化Unique Key與Message Id的區別,有時都稱之為Message Id。在通過RocketMQ的命令列工具或管理平臺進行查詢時,二者可以通用。在根據Unique Key進行查詢時,本身是有可能查詢到多條訊息的,但是查詢工具會進行過濾,只會返回一條訊息。種種情況導致很多RocketMQ的使用者,並未能很好對二者進行區分。
業務開發同學在使用RocketMQ時,應該養成良好的習慣,在傳送/消費訊息時,將這些資訊記錄下來,通常是記錄到日誌檔案中,以便在出現問題時進行排查。
以生產者在傳送訊息為例,通常由以下3步組成:
//1 構建訊息物件Message Message msg = new Message(); msg.setTopic("TopicA"); msg.setKeys("Key1"); msg.setBody("message body".getBytes()); try{ //2 傳送訊息 SendResult result = producer.send(msg); //3 打印發送結果 System.out.println(result); }catch (Exception e){ e.printStackTrace(); }
第1步:構建訊息
構建訊息物件Message,在這裡我們通過setKeys方法設定訊息的key,如果有多個key可以使用空格" "進行分割
第2步:傳送訊息
傳送訊息,會返回一個SendResult物件表示訊息傳送結果。
第3步:打印發送結果
結果中包含Unique Key和Message Id,如下所示:
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0],
queueOffset=0]
其中:
-
sendStatus
:表示訊息傳送結果的狀態 -
msgId
:注意這裡的命名雖然是msgId,但實際上其是Unique Key -
offsetMsgId
:Broker返回的Message ID 。在後文中,未進行特殊說明的情況下,Message ID總是表示offsetMsgId。 -
messageQueue
:訊息傳送到了哪個的佇列,如上圖顯示傳送到broker-a的第0個的佇列 -
queueOffset
:訊息在佇列中的偏移量,每次傳送到一個佇列時,offset+1
事實上,使用者主動設定的Key以及客戶端自動生成的Unique Key,最終都會設定到Message物件的properties屬性中,如下圖所示:
其中:
-
KEYS
:表示使用者通過setKeys方法設定的訊息key, -
UNIQ_KEY
:表示訊息傳送之前由RocketMQ客戶端自動生成的Unique Key
。細心的讀者發現了其值與上述列印SendResult結果中的msgId欄位的值是一樣的,這驗證了前面所說的msgId表示的實際上就是Unique Key的說法。
在瞭解如何主動設定Key,以及如何獲取RocketMQ自動生成的Unique Key和Message Id後,就可以利用一些工具來進行查詢。
訊息查詢工具
RocketMQ提供了3種方式來根據Message Key、Unique Key、Message Id來查詢訊息,包括:
-
命令列工具
:主要是運維同學使用 -
管理平臺
:運維和開發同學都可以使用 -
客戶端API
:主要是開發同學使用
命令列工具
RocketMQ自帶的mqadmin命令列工具提供了一些子命令,用於查詢訊息,如下:
$ sh bin/mqadmin
The most commonly used mqadmin commands are:
...
queryMsgById 按照Message Id查詢訊息
queryMsgByKey 按照Key查詢訊息
queryMsgByUniqueKey 按照UNIQ_KEY查詢訊息
...
管理平臺
RocketMQ提供的命令列工具,雖然功能強大,一般是運維同學使用較多。通過RocketMQ提供的管理平臺進來行訊息查詢,則對業務開發同學更加友好。在管理平臺的訊息一欄,有3個TAB,分別用於:根據Topic時間範圍查詢、Message Key查詢、Message Id查詢
客戶端API
除了通過命令列工具和管理平臺,還可以通過客戶端API的方式來進行查詢,這其實是最本質的方式,命令列工具和管理平臺的查詢功能都是基於此實現。
在org.apache.rocketmq.client.MQAdmin介面中,定義了以下幾個方法用於訊息查詢:
//msgId引數:僅接收SendResult中的offsetMsgId,返回單條訊息
MessageExt viewMessage(final String msgId)
//msgId引數:傳入SendResult中的offsetMsgId、msgId都可以,返回單條訊息
MessageExt viewMessage(String topic,String msgId)
//在指定topic下,根據key進行查詢,並指定最大返回條數,以及開始和結束時間
QueryResult queryMessage(final String topic, final String key,
final int maxNum, final long begin,final long end)
實現原理
Unqiue Key & Message Key都需要利用RocketMQ的雜湊索引機制來完成訊息查詢,由於建立索引有一定的開銷,因此Broker端提供了相關配置項來控制是否開啟索引。
Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset資訊,可以精確匹配一條訊息,查詢訊息更好
。下面分別介紹 Unqiue Key & Message Id的生成和作用。
Unique Key生成
Unique Key是生產者傳送訊息之前,由RocketMQ 客戶端自動生成的,具體來說,RocketMQ傳送訊息之前,最終都要通過以下方法:
DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) {//省略異常宣告
//...略
try {
//如果不是批量訊息,則生成Unique Key
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
//...略
如上所示,如果不是批量訊息,會通過MessageClientIDSetter的setUniqID方法為訊息設定Unique key,該方法實現如下所示:
MessageClientIDSetter#setUniqID
public static void setUniqID(final Message msg) {
// Unique Key不為空的情況下,才進行設定
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,, createUniqID());
}
}
如果訊息的Unique Key屬性為null,就通過createUniqID()方法為訊息建立一個新的Unique Key,並設定到訊息屬性中。之所以要判斷Unique Key是否為null與其作用有關。
Unique Key作用
瞭解Unique Key的作用對於我們理解訊息重複的原因有很大的幫助。RocketMQ並不保證訊息投遞過程中的Exactly Once語義,即訊息只會被精確消費一次,需要消費者自己做冪等。而通常導致訊息重複消費的原因,主要包括:
-
生產者傳送時訊息重複:RocketMQ對於無序訊息傳送失敗,預設會重試2次
-
消費者Rebalance時訊息重複
導致生產者傳送重複訊息的原因可能是:一條訊息已被成功傳送到服務端並完成持久化,由於網路超時此時出現了網路閃斷或者客戶端宕機,導致服務端對客戶端應答失敗,此時生產者將再次嘗試傳送訊息。
在重試傳送時,sendKernelImpl會被重複呼叫,意味著setUniqID方法會被重複呼叫,不過由於setUniqID方法實現中進行判空處理,因此重複設定Unique Key。在這種情況下,消費者後續會收到兩條內容相同並且 Unique Key 也相同的訊息(offsetMsgId不同,因為對Broker來說儲存了多次)。
那麼消費者如何判斷,消費重複是因為重複傳送還是Rebalance導致的重複消費呢?
消費者實現MessageListener介面監聽到的訊息型別是MessageExt,可以將其強制轉換為MessageClientExt,之後呼叫getMsgId方法獲取Unique Key,呼叫getOffsetMsgId獲得Message Id。如果多訊息的Unique Key相同,但是offsetMsgId不同,則有可能是因為重複傳送導致。
批量傳送模式下的Unique Key
DefaultMQProducer提供了批量傳送訊息的介面:
public SendResult send(Collection<Message> msgs)
在內部,這批訊息首先會被構建成一個MessageBatch物件。在前面sendKernelImpl方法中我們也看到了,對於MessageBatch物件,並不會設定Unique Key。這是因為在將批量訊息轉換成MessageBatch時,已經設定過了。
可能有一部分同學會誤以為一個批量訊息中每條訊息Unique Key是相同的,其實不然,每條訊息Unique Key都不同
。
可以參考DefaultMQProducer#batch
方法原始碼:
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
//1 將訊息集合轉換為MessageBatch
msgBatch = MessageBatch.generateFromList(msgs);
//2 迭代每個訊息,逐一設定Unique Key
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
}
//3 設定批量訊息的訊息體
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
return msgBatch;
}
Message Id生成
SendResult中的offsetMsgId,即常規意義上我們所說的Message Id是在Broker端生成的,用於唯一標識一條訊息,在根據Message Id查詢的情況下,最多隻能查詢到一條訊息。Message Id總共 16 位元組,包含訊息儲存主機地址,訊息 Commit Log offset。如下圖所示:
RocketMQ內部通過一個MessageId物件進行表示:
public class MessageId {
private SocketAddress address; //broker地址
private long offset; //commit log offset
並提供了一個MessageDecoder物件來建立或者解碼MessageId。
public static String createMessageId(final ByteBuffer input,
final ByteBuffer addr, final long offset)
public static MessageId decodeMessageId(final String msgId)
Broker端在順序儲存訊息時,首先會通過createMessageId方法建立msgId。原始碼如下所示:
CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank,final MessageExtBrokerInner msgInner) {
//1 PHY OFFSET:即Commit Log Offset 或者稱之為msgOffsetId
long wroteOffset = fileFromOffset + byteBuffer.position();
//2 hostHolder用於維護broker地址資訊
this.resetByteBuffer(hostHolder, 8);
//3 建立msgOffsetId
String msgId = MessageDecoder.createMessageId(this.msgIdMemory,
msgInner.getStoreHostBytes(hostHolder), wroteOffset);
而客戶端在根據msgId向Broker查詢訊息時,首先會將通過MessageDecoder的decodeMessageId方法,之後直接向這個broker進行查詢指定位置的訊息。
參見:MQAdminImpl#viewMessage
public MessageExt viewMessage(String msgId) {//省略異常宣告
//1 根據msgId解碼成MessageId物件
MessageId messageId = null;
try {
messageId = MessageDecoder.decodeMessageId(msgId);
} catch (Exception e) {
throw new MQClientException(ResponseCode.NO_MESSAGE,
"query message by id finished, but no message.");
}
//2 根據MessageId中的Broker地址和commit log offset資訊進行查詢
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
RemotingUtil.socketAddress2String(messageId.getAddress()),
messageId.getOffset(),
timeoutMillis);
}
由於根據Message Id進行查詢,實際上是直接從特定Broker的CommitLog中的指定位置進行查詢的,屬於精確匹配,並不像使用者設定的key,或者Unique Key那麼樣,需要使用到雜湊索引機制,因此效率很高。
總結
-
RocketMQ提供了3種訊息查詢方式:Message Key & Unique Key & Message Id
-
RocketMQ提供了3種訊息查詢工具:命令列、管理平臺、客戶端API,且支援將查詢到讓特定/所有消費者組重新消費
-
RocketMQ有意對使用者遮蔽Unique Key & Message Id區別,很多地方二者可以通用
-
Message Key & Unique Key 需要使用到雜湊索引機制,有額外的索引維護成本
-
Message Id由Broker和commit log offset組成,屬於精確匹配,查詢效率更好