1. 程式人生 > 其它 >RocketMQ訊息的生產和儲存

RocketMQ訊息的生產和儲存

一 : 訊息的生產

1. 訊息的生產過程

Producer在傳送訊息時可以將訊息寫入到指定topic的某Broker中的某Queue中,其經歷瞭如下過程:

  • Producer傳送訊息之前,會先向NameServer發出獲取訊息Topic的路由資訊的請求

  • NameServer返回該Topic的路由表Broker列表

  • Producer根據程式碼中指定的Queue選擇策略,從Queue列表中選出一個佇列,用於後續儲存訊息

  • Produer對訊息做一些特殊處理,例如,訊息本身超過4M,則會對其進行壓縮

  • Producer向選擇出的Queue所在的Broker發出RPC請求,將訊息傳送到選擇出的Queue

nameServer維護的路由表,實際是一個MapkeyTopic名稱,value是一個QueueData例項集合

而一個QueueData則包含一個Broker例項的所有此topic的Queue資訊

即一個Broker對應一個QueueData。QueueData中包含brokerName

簡單來說,路由表的keyTopic名稱,value則為所有涉及該Topic的 BrokerName列表

那根據Topic可以獲取到BrokerName,怎麼由BrokerName獲取到對應的連線地址資訊呢

Broker列表:其實際也是一個MapkeybrokerNamevalue

BrokerData

一個BrokerData對應一套brokerName名稱相同的Master-Slave小叢集

BrokerData中包含brokerName及一個map。該map的key為brokerId,value為該

broker對應的地址。brokerId0表示該brokerMaster,非0表示Slave

2. Queue選擇演算法

對於無序訊息,其Queue選擇演算法,也稱為訊息投遞演算法,常見的有兩種:

  • 1.輪詢演算法

預設選擇演算法。該演算法保證了每個Queue中可以均勻的獲取到訊息。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                //輪詢計算
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

該演算法存在一個問題:由於某些原因,在某些Broker上的Queue可能投遞延遲較嚴重。從而導致

Producer的快取佇列中出現較大的訊息積壓,影響訊息的投遞效能。

  • 2.最小投遞延遲演算法

該演算法會統計每次訊息投遞的時間延遲,然後根據統計出的結果將訊息投遞到時間延遲最小的Queue。

如果延遲相同,則採用輪詢演算法投遞。該演算法可以有效提升訊息的投遞效能。

	// 根據sendLatencyFaultEnable 是否開啟決定是否使用
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    //基於index和佇列數量取餘,確定位置
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                
                // 從延遲容錯broker列表中挑選一個容錯性最好的一個 broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                     // 取餘挑選其中一個佇列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
          // 取餘挑選其中一個佇列
            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

該演算法也存在一個問題:訊息在Queue上的分配不均勻。投遞延遲小的Queue其可能會存在大量

的訊息。而對該Queue的消費者壓力會增大,降低訊息的消費能力,可能會導致MQ中訊息的堆

積。

當然我們也可以自己指定選擇演算法,通過實現MessageQueueSelector ,自帶的有三個實現:

  • SelectMessageQueueByRandom 隨機分配策略
  • SelectMessageQueueByHash 基於hash的分配策略
  • SelectMessageQueueByMachineRoom 伺服器的就近原則分配策略

其中hash演算法就可以用來解決,對於訊息的時序性有嚴格要求,需要保證全域性有序的情況,例如一個訂單的:建立、付款、推送、完成。

下面是手動實現選擇演算法的示例:

/**
* Producer,傳送順序訊息
*/
public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA", "TagC", "TagD"};

       // 訂單列表
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加個時間字首
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根據訂單id選擇傳送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//訂單id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   /**
    * 訂單的步驟
    */
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId() {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc() {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }

   /**
    * 生成模擬訂單資料
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("建立");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("建立");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("建立");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       return orderList;
   }
}

二: 訊息的儲存

RocketMQ中的訊息儲存在本地檔案系統中,這些相關檔案預設在當前安裝主目錄下的store目錄中。

  • abort:該檔案在Broker啟動後會自動建立,正常關閉Broker,該檔案會自動消失。若在沒有啟動

    Broker的情況下,發現這個檔案是存在的,則說明之前Broker的關閉是非正常關閉。

  • checkpoint:其中儲存著commitlog、consumequeue、index檔案的最後刷盤時間戳

  • commitlog:存放著commitlog檔案(訊息就是記錄在commitlog檔案中的)

  • config:存放著Broker執行期間的一些配置資料

  • consumequeue:存放著consumequeue檔案(佇列資訊就存放在這個目錄中)

  • index:其中存放著訊息索引檔案indexFile

  • lock:執行期間使用到的全域性資源鎖

1. commitlog檔案

介紹:

在很多資料中commitlog目錄中的檔案簡單就稱為commitlog檔案。但在原始碼中,該檔案被命名為mappedFile

commitlog目錄中存放著很多的mappedFile檔案,當前Broker中的所有訊息都是落盤到這些mappedFile檔案中的。mappedFile單個檔案大小為1G(小於等於1G),檔名由20位十進位制數構成,表示當前檔案的第一條訊息的起始位移偏移量。(一個位元組為一個偏移量)

例如:

第一個檔名一定是200構成的。因為第一個檔案的第一條訊息的偏移量commitlog offset0 當第一個檔案放滿時,則會自動生成第二個檔案繼續存放訊息。假設第一個檔案大小是 1073741820位元組(1G = 1073741824位元組),下條訊息4個位元組無法儲存,

則生成第二個檔案儲存,第二個檔名就是00000000001073741820。

以此類推,第n個檔名應該是前前n-1個檔案大小之和。

一個Broker中所有mappedFile檔案的commitlog offset在邏輯上是連續的,但是在物理上可能不連續

需要注意的是,一個Broker中僅包含一個commitlog目錄,所有的mappedFile檔案都是存放在該目錄中

的。即無論當前Broker中存放著多少Topic的訊息,這些訊息都是被順序寫入到了mappedFile檔案中

的。也就是說,這些訊息在Broker中存放時並沒有被按照Topic進行分類存放。

訊息單元:

檔名為n(第一條訊息的偏移量就為n)的mappedFile 檔案內部訊息存放結構示意圖

如上圖所示,每個mappedFile 檔案都是由一個一個訊息單元構成, 每個訊息單元都有其不同的結構

  • MsgLen: 每個訊息單元中包含訊息總長度
  • physicalOffset: 訊息的物理位置
  • Body: 訊息體內容
  • BodyLength: 訊息體長度
  • Topic: 訊息主題Topic
  • TopicLength: Topic長度
  • BornHost: 訊息生產者
  • BornTimestamp: 訊息傳送時間戳
  • QueueId: 訊息所在的佇列
  • QueueOffset: 訊息在Queue中儲存的偏移量

等近20餘項訊息相關屬性。

需要注意到,訊息單元中是包含Queue相關屬性的。所以,我們在後續的學習中,就需要十分留意commitlog與queue間的關係是什麼?

2. consumequeue檔案

上面說到, 每個broker 的所有訊息全都按照順序寫在同一個地方,並且用偏移量標記每個資訊的位置,

那麼 consumequeue 下的檔案,就是用於記錄每個Topic下的queue對應每條訊息的偏移量地址資訊

命名規則:

每個Topic在~/store/consumequeue中建立資料夾,目錄名為Topic名稱。在該每個Topic目錄下,會再為每個該Topic的Queue建立一個目錄,目錄名為queueId。如下圖

每個目錄中存放著若干consumequeue檔案,consumequeue檔案是commitlog的索引檔案,可以根據consumequeue定位到具體的訊息。

檔案格式:

consumequeue檔名也由20位數字構成,表示當前檔案的第一個索引條目的起始位移偏移量。與mappedFile檔名不同的是,其後續檔名是固定的。

因為每個consumequeue檔案大小是固定不變的。其每個記錄單元大小是固定的20位元組,如下格式:

每個consumequeue檔案可以包含30w個索引條目,每個索引條目包含了三個訊息重要屬性:

  • 訊息在mappedFile檔案中的偏移量CommitLog Offset

  • 訊息長度、

  • 訊息Tag的hashcode值。

這三個屬性佔20個位元組,所以每個檔案的大小是固定的30w * 20位元組。每個索引條目的起始偏移量,就是該訊息在Queue中的 Queue Offset

3. 對檔案的讀寫過程

上面介紹了commitLog 和 consumeQueue檔案,下面先看看它們之間的關係

如上圖所示, commitlog依次存放了producer傳送的五條訊息, 而對應的每條訊息所屬的Queue都記錄在其對應的consumequeue檔案中,並使用commitlog offset 指向 commitlog,

下面介紹一下每個環節的檔案讀寫流程:

訊息寫入

一條訊息進入到Broker後經歷了以下幾個過程才最終被持久化。

  1. Broker根據queueId,獲取到該訊息對應索引條目要在consumequeue目錄中的寫入偏移量,即QueueOffset

  2. 將queueId、queueOffset等資料,與訊息一起封裝為訊息單元

  3. 將訊息單元寫入到commitlog

  4. 同時,形成訊息索引條目,將訊息索引條目分發到相應的consumequeue

訊息拉取

當Consumer來拉取訊息時會經歷以下幾個步驟:

  1. Consumer獲取到其要消費訊息所在Queue的消費偏移量offset,計算出其要消費訊息的 訊息offset

    消費offset即消費進度,consumer對某個Queue的消費offset,即消費到了該Queue的第幾條訊息 ,即已經消費的個數

    訊息offset = 消費offset + 1

  2. Consumer向Broker傳送拉取請求,其中會包含其要拉取訊息的Queue、訊息offset及訊息 Tag。

  3. Broker計算在該consumequeue中的queueOffset。

    queueOffset = 訊息offset * 20位元組 (每個訊息單元20位元組)

  4. 從該queueOffset處開始向後查詢第一個指定Tag的索引條目。

  5. 解析該索引條目的前8個位元組,即可定位到該訊息在commitlog中的commitlog offset

  6. 從對應commitlog檔案中根據commitlog offset讀取訊息單元,併發送給Consumer

效能提升

RocketMQ中,無論是訊息本身還是訊息索引,都是儲存在磁碟上的。其不會影響訊息的消費嗎?

當然不會。其實RocketMQ的效能在目前的MQ產品中效能是非常高的。因為系統通過一系列相關機制大大 提升了效能。

  1. 首先,RocketMQ主要通過MappedByteBuffer對檔案進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap的方式減少了傳統IO將磁碟檔案資料在作業系統核心地址空間的緩衝區和使用者應用程式地址空間的緩衝區之間來回進行拷貝的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率(正因為需要使用記憶體對映機制,故RocketMQ的檔案儲存都使用定長結構來儲存,方便一次將整個檔案對映至記憶體)。

  2. 其次,consumequeue中的資料是順序存放的,使得OS 的PageCache的預讀取機制,導致對 consumequeue檔案的讀取幾乎接近於記憶體讀取,即使在有訊息堆積情況下也不會影響效能

PageCache機制,頁快取機制,是OS對檔案的快取機制,用於加速對檔案的讀寫操作。一般來 說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體讀寫速度,主要原因是由於OS使用 PageCache機制對讀寫訪問操作進行效能優化,將一部分的記憶體用作PageCache。

寫操作:OS會先將資料寫入到PageCache中,隨後會以非同步方式由pdæush(page dirty æush) 核心執行緒將Cache中的資料刷盤到物理磁碟

讀操作:若使用者要讀取資料,其首先會從PageCache中讀取,若沒有命中,則OS在從物理磁 盤上載入該資料到PageCache的同時,也會順序對其相鄰資料塊中的資料進行預讀取。

所以RocketMQ中可能會影響效能的是對commitlog檔案的讀取。因為對commitlog檔案來說,讀取訊息時 會產生大量的隨機訪問,而隨機訪問會嚴重影響效能。不過,如果選擇合適的系統IO排程演算法,比如 設定排程演算法為Deadline(採用SSD固態硬碟的話),隨機讀的效能也會有所提升。

4. 與Kafka的簡單對比

RocketMQ的很多思想來源於Kafka,其中commitlog與consumequeue就是。

RocketMQ中的commitlog目錄與consumequeue的結合就類似於Kafka中的partition分割槽目錄。 mappedFile檔案就類似於Kafka中的segment段。

Kafka中的Topic的訊息被分割為一個或多個partition。partition是一個物理概念,對應到系統上 就是topic目錄下的一個或多個目錄。每個partition中包含的檔案稱為segment,是具體存放訊息 的檔案。

Kafka中訊息存放的目錄結構是:topic目錄下有partition目錄,partition目錄下有segment檔案

Kafka中沒有二級分類標籤Tag這個概念

Kafka中無需索引檔案。因為生產者是將訊息直接寫在了partition中的,消費者也是直接從 partition中讀取資料的