1. 程式人生 > 其它 >深入理解RocketMQ訊息查詢機制

深入理解RocketMQ訊息查詢機制

在實際開發中,經常需要檢視MQ中訊息的內容,RocketMQ提供了多種訊息查詢方式,給開發和運維帶來了極大的便利,一些其他訊息中介軟體,如Kafka,並不具備訊息查詢能力。

本文對RocketMQ提供到的查詢機制和背後原理進行深入的介紹。文章主要包括3個部分:

  • 訊息查詢介紹:介紹訊息查詢中使用到的Message Key 、Unique Key、Message Id 的區別

  • 訊息查詢工具:分別介紹命令列工具、管理平臺、客戶端API這三種工具的詳細用法,以及如何讓消費者重新消費特定的訊息。

  • 實現原理:介紹Message Key & Unique Key與Message Id的實現機制上區別,Unique Key在精確一次消費(Exactly Once)語義下的作用,以及為什麼Message Id查詢效率更高。

訊息查詢介紹

RocketMQ提供了3種訊息查詢方式:

  • 按照Message Key 查詢:訊息的key是業務開發同學在傳送訊息之前自行指定的,通常會把具有業務含義,區分度高的欄位作為訊息的key,如使用者id,訂單id等。

  • 按照Unique Key查詢:除了業務開發同學明確的指定訊息中的key,RocketMQ生產者客戶端在傳送傳送訊息之前,會自動生成一個UNIQ_KEY,設定到訊息的屬性中,從邏輯上唯一代表一條訊息。

  • 按照Message Id 查詢:Message Id 是訊息傳送後,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移資訊,並會將Message Id作為傳送結果的一部分進行返回。Message Id中屬於精確匹配,從物理上唯一代表一條訊息,查詢效率更高。

RocketMQ有意弱化Unique Key與Message Id的區別,有時都稱之為Message Id。在通過RocketMQ的命令列工具或管理平臺進行查詢時,二者可以通用。在根據Unique Key進行查詢時,本身是有可能查詢到多條訊息的,但是查詢工具會進行過濾,只會返回一條訊息。種種情況導致很多RocketMQ的使用者,並未能很好對二者進行區分。

業務開發同學在使用RocketMQ時,應該養成良好的習慣,在傳送/消費訊息時,將這些資訊記錄下來,通常是記錄到日誌檔案中,以便在出現問題時進行排查。

以生產者在傳送訊息為例,通常由以下3步組成:

//1 構建訊息物件Message
Message msg = new Message();
msg.setTopic("TopicA");
msg.setKeys("Key1");
msg.setBody("message body".getBytes());
try{
    //2 傳送訊息
    SendResult result = producer.send(msg);
    
    //3 打印發送結果
    System.out.println(result);
}catch (Exception e){
    e.printStackTrace();
}

第1步:構建訊息

構建訊息物件Message,在這裡我們通過setKeys方法設定訊息的key,如果有多個key可以使用空格" "進行分割

第2步:傳送訊息

傳送訊息,會返回一個SendResult物件表示訊息傳送結果。

第3步:打印發送結果

結果中包含Unique Key和Message Id,如下所示:

SendResult [
sendStatus=SEND_OK, 
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], 
queueOffset=0]

其中:

  • sendStatus:表示訊息傳送結果的狀態

  • msgId:注意這裡的命名雖然是msgId,但實際上其是Unique Key

  • offsetMsgId:Broker返回的Message ID 。在後文中,未進行特殊說明的情況下,Message ID總是表示offsetMsgId。

  • messageQueue:訊息傳送到了哪個的佇列,如上圖顯示傳送到broker-a的第0個的佇列

  • queueOffset:訊息在佇列中的偏移量,每次傳送到一個佇列時,offset+1

事實上,使用者主動設定的Key以及客戶端自動生成的Unique Key,最終都會設定到Message物件的properties屬性中,如下圖所示:

其中:

  • KEYS:表示使用者通過setKeys方法設定的訊息key,

  • UNIQ_KEY:表示訊息傳送之前由RocketMQ客戶端自動生成的Unique Key。細心的讀者發現了其值與上述列印SendResult結果中的msgId欄位的值是一樣的,這驗證了前面所說的msgId表示的實際上就是Unique Key的說法。

在瞭解如何主動設定Key,以及如何獲取RocketMQ自動生成的Unique Key和Message Id後,就可以利用一些工具來進行查詢。

訊息查詢工具

RocketMQ提供了3種方式來根據Message Key、Unique Key、Message Id來查詢訊息,包括:

  • 命令列工具:主要是運維同學使用

  • 管理平臺:運維和開發同學都可以使用

  • 客戶端API:主要是開發同學使用

命令列工具

RocketMQ自帶的mqadmin命令列工具提供了一些子命令,用於查詢訊息,如下:

$ sh bin/mqadmin 
The most commonly used mqadmin commands are: 
...
   queryMsgById         按照Message Id查詢訊息
   queryMsgByKey        按照Key查詢訊息 
   queryMsgByUniqueKey  按照UNIQ_KEY查詢訊息
...

管理平臺

RocketMQ提供的命令列工具,雖然功能強大,一般是運維同學使用較多。通過RocketMQ提供的管理平臺進來行訊息查詢,則對業務開發同學更加友好。在管理平臺的訊息一欄,有3個TAB,分別用於:根據Topic時間範圍查詢、Message Key查詢、Message Id查詢

客戶端API

除了通過命令列工具和管理平臺,還可以通過客戶端API的方式來進行查詢,這其實是最本質的方式,命令列工具和管理平臺的查詢功能都是基於此實現。

在org.apache.rocketmq.client.MQAdmin介面中,定義了以下幾個方法用於訊息查詢:
//msgId引數:僅接收SendResult中的offsetMsgId,返回單條訊息
MessageExt viewMessage(final String msgId)
//msgId引數:傳入SendResult中的offsetMsgId、msgId都可以,返回單條訊息
MessageExt viewMessage(String topic,String msgId)
//在指定topic下,根據key進行查詢,並指定最大返回條數,以及開始和結束時間
QueryResult queryMessage(final String topic, final String key, 
                         final int maxNum, final long begin,final long end)

實現原理

Unqiue Key & Message Key都需要利用RocketMQ的雜湊索引機制來完成訊息查詢,由於建立索引有一定的開銷,因此Broker端提供了相關配置項來控制是否開啟索引。
Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset資訊,可以精確匹配一條訊息,查詢訊息更好。下面分別介紹 Unqiue Key & Message Id的生成和作用。

Unique Key生成

Unique Key是生產者傳送訊息之前,由RocketMQ 客戶端自動生成的,具體來說,RocketMQ傳送訊息之前,最終都要通過以下方法:
DefaultMQProducerImpl#sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout)  {//省略異常宣告
     //...略
     
        try {
            //如果不是批量訊息,則生成Unique Key
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }
            
     //...略

如上所示,如果不是批量訊息,會通過MessageClientIDSetter的setUniqID方法為訊息設定Unique key,該方法實現如下所示:
MessageClientIDSetter#setUniqID

public static void setUniqID(final Message msg) {
    // Unique Key不為空的情況下,才進行設定
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,, createUniqID());
    }
}

如果訊息的Unique Key屬性為null,就通過createUniqID()方法為訊息建立一個新的Unique Key,並設定到訊息屬性中。之所以要判斷Unique Key是否為null與其作用有關。

Unique Key作用

瞭解Unique Key的作用對於我們理解訊息重複的原因有很大的幫助。RocketMQ並不保證訊息投遞過程中的Exactly Once語義,即訊息只會被精確消費一次,需要消費者自己做冪等。而通常導致訊息重複消費的原因,主要包括:

  • 生產者傳送時訊息重複:RocketMQ對於無序訊息傳送失敗,預設會重試2次

  • 消費者Rebalance時訊息重複

導致生產者傳送重複訊息的原因可能是:一條訊息已被成功傳送到服務端並完成持久化,由於網路超時此時出現了網路閃斷或者客戶端宕機,導致服務端對客戶端應答失敗,此時生產者將再次嘗試傳送訊息。

在重試傳送時,sendKernelImpl會被重複呼叫,意味著setUniqID方法會被重複呼叫,不過由於setUniqID方法實現中進行判空處理,因此重複設定Unique Key。在這種情況下,消費者後續會收到兩條內容相同並且 Unique Key 也相同的訊息(offsetMsgId不同,因為對Broker來說儲存了多次)。

那麼消費者如何判斷,消費重複是因為重複傳送還是Rebalance導致的重複消費呢?

消費者實現MessageListener介面監聽到的訊息型別是MessageExt,可以將其強制轉換為MessageClientExt,之後呼叫getMsgId方法獲取Unique Key,呼叫getOffsetMsgId獲得Message Id。如果多訊息的Unique Key相同,但是offsetMsgId不同,則有可能是因為重複傳送導致。

批量傳送模式下的Unique Key

DefaultMQProducer提供了批量傳送訊息的介面:

public SendResult send(Collection<Message> msgs)

在內部,這批訊息首先會被構建成一個MessageBatch物件。在前面sendKernelImpl方法中我們也看到了,對於MessageBatch物件,並不會設定Unique Key。這是因為在將批量訊息轉換成MessageBatch時,已經設定過了。

可能有一部分同學會誤以為一個批量訊息中每條訊息Unique Key是相同的,其實不然,每條訊息Unique Key都不同
可以參考DefaultMQProducer#batch方法原始碼:

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
        //1 將訊息集合轉換為MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        
        //2 迭代每個訊息,逐一設定Unique Key
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
        }
        //3 設定批量訊息的訊息體
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    return msgBatch;
}

Message Id生成

SendResult中的offsetMsgId,即常規意義上我們所說的Message Id是在Broker端生成的,用於唯一標識一條訊息,在根據Message Id查詢的情況下,最多隻能查詢到一條訊息。Message Id總共 16 位元組,包含訊息儲存主機地址,訊息 Commit Log offset。如下圖所示:

RocketMQ內部通過一個MessageId物件進行表示:

public class MessageId {
    private SocketAddress address; //broker地址
    private long offset; //commit log offset

並提供了一個MessageDecoder物件來建立或者解碼MessageId。

public static String createMessageId(final ByteBuffer input,
                                     final ByteBuffer addr, final long offset)
public static MessageId decodeMessageId(final String msgId)

Broker端在順序儲存訊息時,首先會通過createMessageId方法建立msgId。原始碼如下所示:

CommitLog.DefaultAppendMessageCallback#doAppend

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, 
                      final int maxBlank,final MessageExtBrokerInner msgInner) {
 
    //1 PHY OFFSET:即Commit Log Offset 或者稱之為msgOffsetId
    long wroteOffset = fileFromOffset + byteBuffer.position();
 
    //2 hostHolder用於維護broker地址資訊
    this.resetByteBuffer(hostHolder, 8);
 
    //3 建立msgOffsetId
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, 
                        msgInner.getStoreHostBytes(hostHolder), wroteOffset);

而客戶端在根據msgId向Broker查詢訊息時,首先會將通過MessageDecoder的decodeMessageId方法,之後直接向這個broker進行查詢指定位置的訊息。

參見:MQAdminImpl#viewMessage

public MessageExt viewMessage(String msgId) {//省略異常宣告
 
    //1 根據msgId解碼成MessageId物件
    MessageId messageId = null;
    try {
        messageId = MessageDecoder.decodeMessageId(msgId);
    } catch (Exception e) {
        throw new MQClientException(ResponseCode.NO_MESSAGE, 
                      "query message by id finished, but no message.");
    }
    //2 根據MessageId中的Broker地址和commit log offset資訊進行查詢
    return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
            RemotingUtil.socketAddress2String(messageId.getAddress()),
            messageId.getOffset(), 
            timeoutMillis);
}

由於根據Message Id進行查詢,實際上是直接從特定Broker的CommitLog中的指定位置進行查詢的,屬於精確匹配,並不像使用者設定的key,或者Unique Key那麼樣,需要使用到雜湊索引機制,因此效率很高。

總結

  • RocketMQ提供了3種訊息查詢方式:Message Key & Unique Key & Message Id

  • RocketMQ提供了3種訊息查詢工具:命令列、管理平臺、客戶端API,且支援將查詢到讓特定/所有消費者組重新消費

  • RocketMQ有意對使用者遮蔽Unique Key & Message Id區別,很多地方二者可以通用

  • Message Key & Unique Key 需要使用到雜湊索引機制,有額外的索引維護成本

  • Message Id由Broker和commit log offset組成,屬於精確匹配,查詢效率更好