1. 程式人生 > 實用技巧 >RocketMQ 原理 - 部署 - 入門 (圖解)

RocketMQ 原理 - 部署 - 入門 (圖解)


瘋狂創客圈 《SpringCloud Nginx 高併發核心程式設計》Java 工程師/ 架構師 必備【連結

RocketMQ簡介

RocketMQ是什麼

  • RocketMQ是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式特點。
  • Producer、Consumer、佇列都可以分散式。
  • Producer 向一些佇列輪流傳送訊息,佇列集合稱為 Topic,Consumer 如果做廣播消費,則一個 consumer 例項消費這個 Topic 對應的所有佇列,如果做叢集消費,則多個 Consumer 例項平均消費這個 topic 對應的佇列集合。
  • 能夠保證嚴格的訊息順序
  • 提供豐富的訊息拉取模式
  • 高效的訂閱者水平擴充套件能力
  • 實時的訊息訂閱機制
  • 億級訊息堆積能力
  • 較少的依賴

選擇RocketMQ的理由

強調叢集無單點,可擴充套件,任意一點高可用,水平可擴充套件

​ 方便叢集配置,而且容易擴充套件(橫向和縱向),通過slave的方式每一點都可以實現高可用

支援上萬個佇列,順序訊息

​ 順序消費是實現在同一佇列的,如果高併發的情況就需要佇列的支援,rocketmq可以滿足上萬個佇列同時存在

任性定製你的訊息過濾

​ rocketmq提供了兩種型別的訊息過濾,也可以說三種可以通過topic進行訊息過濾、可以通過tag進行訊息過濾、還可以通過filter的方式任意定製過濾

訊息的可靠性(無Buffer,持久化,容錯,回溯消費)

​ 訊息無buffer就不用擔心buffer回滿的情況,rocketmq的所有訊息都是持久化的,生產者本身可以進行錯誤重試,釋出者也會按照時間階梯的方式進行訊息重發,訊息回溯說的是可以按照指定的時間進行訊息的重新消費,既可以向前也可以向後(前提條件是要注意訊息的擦除時間)

海量訊息堆積能力,訊息堆積後,寫入低延遲

​ 針對於provider需要配合部署方式,對於consumer,如果是叢集方式一旦master返現訊息堆積會向consumer下發一個重定向指令,此時consumer就可以從slave進行資料消費了

分散式事務

​ 我個人感覺rocketmq對這一塊說的不是很清晰,而且官方也說現在這塊存在缺陷(會令系統pagecache過多),所以線上建議還是少用為好

訊息失敗重試機制

​ 針對provider的重試,當訊息傳送到選定的broker時如果出現失敗會自動選擇其他的broker進行重發,預設重試三次,當然重試次數要在訊息傳送的超時時間範圍內。

​ 針對consumer的重試,如果訊息因為各種原因沒有消費成功,會自動加入到重試佇列,一般情況如果是因為網路等問題連續重試也是照樣失敗,所以rocketmq也是採用階梯重試的方式。

定時消費

​ 除了上面的配置,在傳送訊息是也可以針對message設定setDelayTimeLevel

活躍的開源社群

​ 現在rocketmq成為了apache的一款開源產品,活躍度也是不容懷疑的

成熟度(經過雙十一考驗)

​ 針對本身的成熟度,我們看看這麼多年的雙十一就可想而知了

RocketMQ 邏輯結構

Broker

  • Broker即是物理上的概念,也是邏輯上的概念。多個物理Broker通過IP:PORT區分,多個邏輯Broker通過BrokerName區分。
  • 多個邏輯Broker組成Cluster。
  • Broker與Topic是多對多的關係。
  • Broker自身包含一個使用10911埠的NettyServer、一個10909的NettyServer,以及一個NettyClient。
  • HA通過10912埠提供服務,用於Broker內部各個部分的資料傳輸。
  • Broker是最重要的部分,包括持久化訊息、Broker叢集一致性(HA)、儲存歷史訊息、儲存Consumer消費偏移量、索引建立等。
  • Producer傳送來的訊息最終會通過CommitLog序列化到硬碟,執行序列化邏輯的類為AppendMessageCallback介面的實現類。
  • Broker序列化訊息是順序寫,序列化檔案儲存在userHome/store/commitlog目錄下,檔名為總偏移量。
  • 預設為非同步刷盤、提交日誌單個檔案1個G、單個consumer佇列檔案為不到6M

消費者組(Consumer Group)

在正式開始說消費之前,我們首先要明白一個概念,就是消費組

消費者組(Consumer Group)是一類消費者的集合,這類消費者通常消費同一類訊息並且消費邏輯一致,所以將這些消費者分組在一起。消費者組與生產者組類似,都是將相同角色的消費者分組在一起並命名的。

分組是一個很精妙的概念設計,RocketMQ正是通過這種分組機制,實現了天然的訊息負載均衡。在消費訊息時,通過消費者組實現了將訊息分發到多個消費者伺服器例項

設定消費者的名字是在程式碼中實現的,如下:

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");

舉個例子

某個主題有9條訊息,其中一個消費者組有3個例項(3個程序或3臺機器),那麼每個例項將均攤3條訊息,這也意味著我們可以很方便地通過增加機器來實現水平擴充套件。

如果還不理解的話,我們看下面這張圖,由訂單系統來的訊息,被庫存和積分兩個組所分配,每個組就是一個消費組

消費者組(Consumer Group)可以用來表示一個消費訊息應用,一個 Consumer Group 下包含多個 Consumer 例項,可以是多臺機器,也可 以是多個程序,或者是一個程序的多個 Consumer 物件。一個 Consumer Group 下的多個 Consumer 以均攤 方式消費訊息,如果設定為廣播方式,那麼這個 Consumer Group 下的每個例項都消費全量資料。

生產者組 Producer Group

用來表示一個傳送訊息應用,一個 Producer Group 下包含多個 Producer 例項,可以是多臺機器,也可以 是一臺機器的多個程序,或者一個程序的多個 Producer 物件。一個 Producer Group 可以傳送多個 Topic 訊息,Producer Group 作用如下:

  • 標識一類 Producer
  • 可以通過運維工具查詢這個傳送訊息應用下有多個 Producer 例項
  • 傳送分散式事務訊息時,如果 Producer 中途意外宕機,Broker 會主動回撥 Producer Group 內的任意 一臺機器來確認事務狀態。

RocketMQ 核心元件圖

RocketMQ是開源的訊息中介軟體,它主要由NameServer,Producer,Broker,Consumer四部分構成。

NameServer

NameServer主要負責Topic和路由資訊的管理,類似zookeeper。

Broker

訊息中轉角色,負責儲存訊息,轉發訊息。

Consumer

訊息消費者,負責訊息消費,一般是後臺系統負責非同步消費。

Producer

訊息生產者,負責產生訊息,一般由業務系統負責產生訊息。

RokcetMQ 物理部署圖

物理概念

NameServer

NameServer是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。

Broker

Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server叢集中的所有節點建立長連線,定時註冊Topic資訊到所有Name Server。

Producer

Producer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊,並向提供Topic服務的Master建立長連線,且定時向Master傳送心跳。Producer完全無狀態,可叢集部署。

Producer與Name Server關係

1)連線 單個Producer和一臺NameServer保持長連線,如果該NameServer掛掉,生產者會自動連線下一個NameServer,直到有可用連線為止,並能自動重連。
2)輪詢時間 預設情況下,生產者每隔30秒從NameServer獲取所有Topic的最新佇列情況,這意味著某個Broker如果宕機,生產者最多要30秒才能感知,在此期間,
發往該broker的訊息傳送失敗。
3)心跳 與nameserver沒有心跳

Consumer

Consumer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,訂閱規則由Broker配置決定。

1、Name Server是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。
2、每個Broker與Name Server叢集中的所有節點建立長連線,定時註冊Topic資訊到所有Name Server。
3、Producer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊。
4、Consumer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊。

Broker 詳解

1、Broker與Name Server關係

1)連線 單個Broker和所有Name Server保持長連線。

2)心跳

心跳間隔:每隔30秒向所有NameServer傳送心跳,心跳包含了自身的Topic配置資訊。

心跳超時:NameServer每隔10秒,掃描所有還存活的Broker連線,若某個連線2分鐘內沒有傳送心跳資料,則斷開連線。

3)斷開:當Broker掛掉;NameServer會根據心跳超時主動關閉連線,一旦連線斷開,會更新Topic與佇列的對應關係,但不會通知生產者和消費者。

2、 負載均衡

一個Topic分佈在多個Broker上,一個Broker可以配置多個Topic,它們是多對多的關係。
如果某個Topic訊息量很大,應該給它多配置幾個Queue,並且儘量多分佈在不同Broker上,減輕某個Broker的壓力。

3 、可用性

由於訊息分佈在各個Broker上,一旦某個Broker宕機,則該Broker上的訊息讀寫都會受到影響。

所以RocketMQ提供了Master/Slave的結構,Salve定時從Master同步資料,如果Master宕機,則Slave提供消費服務,但是不能寫入訊息,此過程對應用透明,由RocketMQ內部解決。
有兩個關鍵點:
思考1一旦某個broker master宕機,生產者和消費者多久才能發現?

受限於Rocketmq的網路連線機制,預設情況下最多需要30秒,因為消費者每隔30秒從nameserver獲取所有topic的最新佇列情況,這意味著某個broker如果宕機,客戶端最多要30秒才能感知。

思考2 master恢復恢復後,訊息能否恢復。
消費者得到Master宕機通知後,轉向Slave消費,但是Slave不能保證Master的訊息100%都同步過來了,因此會有少量的訊息丟失。但是訊息最終不會丟的,一旦Master恢復,未同步過去的訊息會被消費掉。

RocketMQ的領域模型

Message

代表一條訊息,使用MessageId唯一識別,使用者在傳送時可以設定messageKey,便於之後查詢和跟蹤。一個 Message 必須指定 Topic,相當於寄信的地址。Message 還有一個可選的 Tag 設定,以便消費端可以基於 Tag 進行過濾訊息。也可以新增額外的鍵值對,例如你需要一個業務 key 來查詢 Broker 上的訊息,方便在開發過程中診斷問題。

Topic

  • Topic表示訊息的第一級型別,比如一個電商系統的訊息可以分為:交易訊息、物流訊息等。一條訊息必須有一個Topic。
  • 最細粒度的訂閱單位,一個Group可以訂閱多個Topic的訊息。

Tag

Tag表示訊息的第二級型別,比如交易訊息又可以分為:交易建立訊息,交易完成訊息等。RocketMQ提供2級訊息分類,方便靈活控制。

標籤可以被認為是對 Topic 進一步細化。一般在相同業務模組中通過引入標籤來標記不同用途的訊息。

Queue

Topic和Queue是1對多的關係一個Topic下可以包含多個Queue,主要用於負載均衡。傳送訊息時,使用者只指定Topic,Producer會根據Topic的路由資訊選擇具體發到哪個Queue上。Consumer訂閱訊息時,會根據負載均衡策略決定訂閱哪些Queue的訊息。

訊息的物理管理單位。一個Topic下可以有多個Queue,Queue的引入使得訊息的儲存可以分散式叢集化,具有了水平擴充套件能力。

在 RocketMQ 中,所有訊息佇列都是持久化,長度無限的資料結構,所謂長度無限是指佇列中的每個儲存單元都是定長,訪問其中的儲存單元使用 Offset 來訪問,offset 為 java long 型別,64 位,理論上在 100年內不會溢位,所以認為是長度無限。

Queue和消費者的 ConsumeQueue物理概念一 一對應, ConsumeQueue是一個長度無限的陣列,Offset 就是下標。

Offset

RocketMQ在儲存訊息時會為每個Topic下的每個Queue生成一個訊息的索引檔案,每個Queue都對應一個Offset記錄當前Queue中訊息條數

Producer

訊息生產者,位於使用者的程序內,Producer通過NameServer獲取所有Broker的路由資訊,根據負載均衡策略選擇將訊息發到哪個Broker,然後呼叫Broker介面提交訊息。

Producer Group

生產者組,簡單來說就是多個傳送同一類訊息的生產者稱之為一個生產者組。

Consumer

訊息消費者,位於使用者程序內。Consumer通過NameServer獲取所有broker的路由資訊後,向Broker傳送Pull請求來獲取訊息資料。Consumer可以以兩種模式啟動,廣播(Broadcast)和叢集(Cluster)廣播模式下,一條訊息會發送給所有Consumer,叢集模式下訊息只會傳送給一個Consumer

Consumer Group

消費者組,和生產者類似,消費同一類訊息的多個 Consumer 例項組成一個消費者組。

NameServer

NameServer可以看作是RocketMQ的註冊中心,它管理兩部分資料:叢集的Topic-Queue的路由配置;Broker的實時配置資訊。其它模組通過Nameserv提供的介面獲取最新的Topic配置和路由資訊。

  • Producer/Consumer :通過查詢介面獲取Topic對應的Broker的地址資訊
  • Broker : 註冊配置資訊到NameServer, 實時更新Topic資訊到NameServer

RocketMQ 訊息儲存設計原理圖

RocketMQ儲存邏輯物件層

  • 該層主要包含了RocketMQ資料檔案儲存直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。
  • IndexFile為索引資料檔案提供訪問服務,ConsumerQueue為邏輯訊息佇列提供訪問服務,CommitLog則為訊息儲存的日誌資料檔案提供訪問服務。
  • 這三個模型類也是構成了RocketMQ儲存層的整體結構。

CommitLog

訊息儲存檔案,所有訊息主題的訊息都儲存在 CommitLog 檔案中。
Commitlog 檔案儲存的邏輯檢視如圖所示

ConsumeQueue

訊息消費佇列,訊息到達 CommitLog 檔案後,將非同步轉發到 消費佇列,供訊息消費者消費。ConsumeQueue儲存格式如下:

  • 單個 ConsumeQueue 檔案中預設包含 30 萬個條目,單個檔案的長度為 30w × 20 位元組, 單個 ConsumeQueue 檔案可以看出是一個 ConsumeQueue 條目的陣列,其下標為 ConsumeQueue 的邏輯偏移量,訊息消費進度儲存的偏移量 即邏輯偏移量。
  • ConsumeQueue 即為 Commitlog 檔案的索引檔案, 其構建機制是當訊息到達 Commitlog 檔案後, 由專門的執行緒 產生訊息轉發任務,從而構建訊息消費佇列檔案與下文提到的索引檔案。

為什麼需要 ConsumeQueue ?

RocketMQ的訊息都是按照先來後到,順序的儲存在CommitLog中的,而消費者通常只關心某個Topic下的訊息。順序的查詢CommitLog肯定是不現實的,我們可以構建一個索引檔案,裡面存放著某個Topic下面所有訊息在CommitLog中的位置,這樣消費者獲取訊息的時候,只需要先查詢這個索引檔案,然後再去CommitLog中獲取訊息就 OK了。這個索引檔案,就是我們的ComsumerQueue。

IndexFile

訊息索引檔案,主要儲存訊息 Key 與 Offset 的對應關係。

lndexFile 總共包含 lndexHeader、 Hash 槽、 Hash 條目。

訊息消費佇列是RocketMQ專門為訊息訂閱構建的索引檔案,提高根據主題與訊息隊 列檢索訊息的速度 ,另外 RocketMQ 引入了 Hash 索引機制為訊息建立索引, HashMap 的設 計包含兩個基本點 : Hash 槽與 Hash 衝突的連結串列結構。 RocketMQ 索引檔案佈局如圖所示

儲存分析的例項

1、流程圖

我們由簡單到複雜的來理解,它的一些核心概念

這個圖很好理解,訊息先發到Topic,然後消費者去Topic拿訊息。只是Topic在這裡只是個概念,那它到底是怎麼儲存訊息資料的呢,這裡就要引入Broker概念。

2、Topic的儲存

Topic是一個邏輯上的概念,實際上Message是在每個Broker上以Queue的形式記錄。

從上面的圖片可以總結下幾條結論。

1、消費者傳送的Message會在Broker中的Queue佇列中記錄。
2、一個Topic的資料可能會存在多個Broker中。
3、一個Broker存在多個Queue。
4、單個的Queue也可能儲存多個Topic的訊息。

也就是說每個Topic在Broker上會劃分成幾個邏輯佇列,每個邏輯佇列儲存一部分訊息資料,但是儲存的訊息資料實際上不是真正的訊息資料,而是指向commit log的訊息索引。

Queue不是真正儲存Message的地方,真正儲存Message的地方是在CommitLog

如圖(盜圖)

左邊的是CommitLog。這個是真正儲存訊息的地方。RocketMQ所有生產者的訊息都是往這一個地方存的。

右邊是ConsumeQueue。這是一個邏輯佇列。和上文中Topic下的Queue是一一對應的。消費者是直接和ConsumeQueue打交道。ConsumeQueue記錄了消費位點,這個消費位點關聯了commitlog的位置。所以即使ConsumeQueue出問題,只要commitlog還在,訊息就沒丟,可以恢復出來。還可以通過修改消費位點來重放或跳過一些訊息。

事務狀態服務

儲存每條訊息的事務狀態。

定時訊息服務

每一個延遲級別對應一個訊息消費佇列,儲存延遲佇列的訊息拉取進度。

RMQ檔案儲存模型層

封裝的檔案記憶體對映層

  • RocketMQ主要採用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成資料檔案的讀寫。
  • 其中,採用MappedByteBuffer這種記憶體對映磁碟檔案的方式完成對大檔案的讀寫,在RocketMQ中將該類封裝成MappedFile類。
  • 這裡,每一種類的單個檔案均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、記憶體資料刷盤、記憶體清理等和檔案相關的服務)。

磁碟儲存層

主要指的是部署RocketMQ伺服器所用的磁碟。這裡,需要考慮不同磁碟型別(如SSD或者普通的HDD)特性以及磁碟的效能引數(如IOPS、吞吐量和訪問時延等指標)對順序寫/隨機讀操作帶來的影響。

RocketMQ中訊息刷盤

在RocketMQ中訊息刷盤主要可以分為同步刷盤和非同步刷盤兩種。

同步刷盤

  • 在返回寫成功狀態時,訊息已經被寫入磁碟。
  • 具體流程是,訊息寫入記憶體的PAGECACHE後,立刻通知刷盤執行緒刷盤,然後等待刷盤完成,刷盤執行緒執行完成後喚醒等待的執行緒,返回訊息寫成功的狀態。
  • 一般只用於金融場景。

非同步刷盤

在返回寫成功狀態時,訊息可能只是被寫入了記憶體的PAGECACHE,寫操作的返回快,吞吐量大;當記憶體裡的訊息量積累到一定程度時,統一觸發寫磁碟操作,快速寫入。

訊息在系統中流轉圖

1.Producer 傳送訊息,訊息從 socket 進入 java 堆。

2.Producer 傳送訊息,訊息從 java 堆轉入 PAGACACHE,實體記憶體。

3.Producer 傳送訊息,由非同步執行緒刷盤,訊息從 PAGECACHE 刷入磁碟。

4.Consumer 拉訊息(正常消費),訊息直接從 PAGECACHE(資料在實體記憶體)轉入 socket,到達 consumer, 不經過 java 堆。這種消費場景最多,線上 96G 實體記憶體,按照 1K 訊息算,可以在實體記憶體快取 1 億條消 息。

5.Consumer 拉訊息(異常消費),訊息直接從 PAGECACHE(資料在虛擬記憶體)轉入 socket。

6.Consumer 拉訊息(異常消費),由於 Socket 訪問了虛擬記憶體,產生缺頁中斷,此時會產生磁碟 IO,從磁 盤 Load 訊息到 PAGECACHE,然後直接從 socket 發出去。

7.同 5 一致。

8.同 6 一致。

RocketMQ的消費模式

RocketMQ的消費模式有2種: 叢集消費(CLUSTERING) 、 廣播消費(BROADCASTING)。原始碼如下:

 1 public enum MessageModel {
 2     BROADCASTING("BROADCASTING"),
 3     CLUSTERING("CLUSTERING");
 4 
 5     private String modeCN;
 6 
 7     private MessageModel(String modeCN) {
 8         this.modeCN = modeCN;
 9     }
10 
11     public String getModeCN() {
12         return this.modeCN;
13     }
14 }

叢集消費(CLUSTERING)

叢集消費(CLUSTERING)是指: 一個ConsumerGroup中的Consumer例項平均分攤消費訊息。例如某個Topic有9條訊息,其中一個ConsumerGroup有3個例項(可能是3個程序,或者3臺機器),那麼每個例項只消費其中部分,消費完的訊息不能被其他例項消費。

例如:某個Topic有9條訊息,有3個消費者,廣播模式就是每個消費者都收到9條訊息,叢集模式就是消費者平均分攤9條訊息

其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的訊息負載均衡!通俗點來說,RocketMQ中的訊息通過ConsumeGroup實現了將訊息分發到C1/C2/C3/……的機制,這意味著我們將非常方便的通過加機器來實現水平擴充套件!

至於訊息分發到C1/C2/C3,其實也是可以設定策略的:

預設的分配演算法是AllocateMessageQueueAveragely

還有另外一種平均的演算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條queue,只是以環狀輪流分queue的形式,如下圖:

廣播消費(BROADCASTING)

**廣播消費 **,類似於ActiveMQ中的釋出訂閱模式,訊息會發給Consume Group中的每一個消費者進行消費。 由於廣播模式下要求一條訊息需要投遞到一個消費組下面每一個消費者例項,所以也就沒有訊息被分攤消費的說法。

程式碼核心

//設定廣播消費模式就是在消費者這裡設定一下,其餘的程式碼不變
consumer.setMessageModel(MessageModel.BROADCASTING);

廣播消費(BROADCASTING)下,一條訊息被多個consumer消費,即使這些consumer屬於同一個ConsumerGroup,訊息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在訊息劃分方面無意義。

RocketMQ-廣播消費模式設定:

RocketMQ原始碼: 路由中心

早期的rocketmq版本的路由功能是使用zookeeper實現的,後來rocketmq為了追求效能,自己實現了一個性能更高效且實現簡單的路由中心 NameServer,

可以通過部署多個 NameServer 路由節點實現高可用,但它們 NameServer 之間並不能互相通訊,這也就會導致在某一個時刻各個路由節點間的資料並不完全相同,但資料某個時刻不一致並不會導致訊息傳送不了,這也是rocketmq 追求簡單高效的一個做法。

Nameserver的原始碼

Nameserver的原始碼很簡單,整個NameServer總共就由這麼幾個類組成:

其中NamesrvStartup為啟動類,NamesrvController為核心控制器,RouteInfoManager為路由資訊表。

路由資訊表與心跳(路由註冊)

路由註冊即是Broker向Nameserver註冊的過程,它們是通過Broker的心跳功能實現的,

我們知道RouteInfoManager為路由資訊表,先來看看Nameserver到底儲存了哪些資訊:

public class org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • topicQueueTable:Topic訊息佇列路由資訊,

    包括topic所在的broker名稱,讀佇列數量,寫佇列數量,同步標記等資訊,rocketmq根據topicQueueTable的資訊進行負載均衡訊息傳送。

  • brokerAddrTable:Broker節點資訊

    包括brokername,所在叢集名稱,還有主備節點資訊。

  • clusterAddrTable:Broker叢集資訊

    儲存了叢集中所有的Brokername。

  • brokerLiveTable:Broker狀態資訊

    Nameserver每次收到Broker的心跳包就會更新該資訊。

這裡也先講一下rocketmq是基於訂閱釋出機制,我之前也寫過一篇文章《rocketmq的消費模式》,我們可知一個Topic擁有多個訊息佇列,如果不指定佇列的數量,一個Broker會為每個Topic建立4個讀佇列和4個寫佇列,多個Broker組成叢集,Broker會通過傳送心跳包將自己的資訊註冊到路由中心,路由中心brokerLiveTable儲存Broker的狀態,它會根據Broker的心跳包更新Broker狀態資訊。

步驟一:Broker傳送心跳包

org.apache.rocketmq.broker.BrokerController#start:

public void start() throws Exception {
    
    // 初次啟動,這裡會強制執行傳送心跳包
    this.registerBrokerAll(true, false, true);
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}

Broker在核心控制器啟動時,會強制傳送一次心跳包,接著建立一個定時任務,定時向路由中心傳送心跳包。

org.apache.rocketmq.broker.BrokerController#registerBrokerAll:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // 建立一個topic包裝類
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    // 這裡比較有趣,如果該broker沒有讀寫許可權,那麼會新建一個臨時的topicConfigTable,再set進包裝類
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                                this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

     // 判斷是否該Broker是否需要傳送心跳包
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                                      this.getBrokerAddr(),
                                      this.brokerConfig.getBrokerName(),
                                      this.brokerConfig.getBrokerId(),
                                      this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 執行傳送心跳包
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

該方法是Broker執行傳送心跳包的核心控制方法,它主要做了topic的包裝類封裝操作,判斷Broker此時是否需要執行傳送心跳包,但我查了下org.apache.rocketmq.common.BrokerConfig#forceRegister欄位的值永遠等於true,也就是該判斷永遠為true,即每次都需要傳送心跳包。

我們定位到needRegister遠端呼叫到路由中心的方法:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged:

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);
    return null == prev || !prev.equals(dataVersion);
}

public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
    if (prev != null) {
        return prev.getDataVersion();
    }
    return null;
}

發現,Broker是否需要傳送心跳包由該Broker在路由中心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion決定,如果dataVersion為空或者當前dataVersion不等於brokerLiveTable儲存的brokerLiveTable,Broker就需要傳送心跳包。

步驟二:Nameserver處理心跳包

Nameserver的netty服務監聽收到心跳包之後,會呼叫到路由中心以下方法進行處理:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();

            // 獲取叢集下所有的Broker,並將當前Broker加入clusterAddrTable,由於brokerNames是Set結構,並不會重複
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            // 獲取Broker資訊,如果是首次註冊,那麼新建一個BrokerData並加入brokerAddrTable
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // 這裡判斷Broker是否是已經註冊過
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            // 如果是Broker是Master節點嗎,並且Topic資訊更新或者是首次註冊,那麼建立更新topic佇列資訊
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 更新BrokerLiveInfo狀態資訊
            BrokerLiveInfo prevBrokerLiveInfo = 
                this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

該方法是處理Broker心跳包的最核心方法,它主要做了對RouteInfoManager路由資訊的一些更新操作,包括對clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable等路由資訊。

NamesrvStartup啟動類

知道了這幾個類的功能之後,我們就直接定位到NamesrvStartup啟動類的啟動方法

Namesrv顧名思義就是名稱服務,是沒有狀態可橫向擴充套件的服務。廢話不多說了,直接貼程式碼。。

public static NamesrvController main0(String[] args) {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
            NettySystemConfig.socketSndbufSize = 4096;// socket傳送緩衝區大小
        }

        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
            NettySystemConfig.socketRcvbufSize = 4096;// Socket接收緩衝區大小
        }

        try {
            //PackageConflictDetect.detectFastjson();

            Options options = ServerUtil.buildCommandlineOptions(new Options());//構建Options,有h代表help,n代表namesrvAddr
            //Options加上c代表configFile,p代表printConfigItem
            //解析得到commandLine
            commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
                return null;
            }

            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);//監聽埠是9876
            if (commandLine.hasOption('c')) {//有配置檔案
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);//載入配置檔案到prop
                    MixAll.properties2Object(properties, namesrvConfig);//根據prop檔案解析,給namesrvConfig填充對應的值
                    MixAll.properties2Object(properties, nettyServerConfig);//根據prop檔案解析,給nettyServerConfig填充對應的值

                    namesrvConfig.setConfigStorePath(file);//設定config儲存路徑

                    System.out.printf("load config properties file OK, " + file + "%n");
                    in.close();
                }
            }
            if (commandLine.hasOption('p')) {//列印namesrvConfig和nettyServerConfig的非靜態,非this開頭的欄位
                MixAll.printObjectProperties(null, namesrvConfig);
                MixAll.printObjectProperties(null, nettyServerConfig);
                System.exit(0);
            }
            //再根據commandLine得到一個prop,再給namesrvConfig填充對應的值
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
            if (null == namesrvConfig.getRocketmqHome()) {//預設NamesrvConfig.rocketmqHome為空,且配置,引數中RocketMQHome為空的話拋異常
                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation%n");
                System.exit(-2);
            }
            //配置Logback
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
            final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
            //列印namesrvConfig和nettyServerConfig的非靜態,非this開頭的欄位
            MixAll.printObjectProperties(log, namesrvConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            //根據namesrvConfig,nettyServerConfig構造NamesrvController
            final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);

            boolean initResult = controller.initialize();//初始化NamesrvController
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }

            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    controller.shutdown();//shutdown的時候 NamesrvController也shutdown
                    return null;
                }
            }));

            controller.start();//啟動服務

            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf(tip + "%n");

            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

RocketMQ的部署

部署方式

單Master模式

​ 只有一個 Master節點

​ 優點:配置簡單,方便部署

​ 缺點:這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用

多Master模式

​ 一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

​ 優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁碟配置為RAID10 時,即使機器宕機不可恢復情況下,由與 RAID10磁碟非常可靠,訊息也不會丟(非同步刷盤丟失少量訊息,同步刷盤一條不丟)。效能最高。多 Master 多 Slave 模式,非同步複製

​ 缺點:單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響

多Master多Slave模式(非同步複製)---本文稍後以這種方式部署叢集為例

​ 每個 Master 配置一個 Slave,有多對Master-Slave, HA,採用非同步複製方式,主備有短暫訊息延遲,毫秒級。

​ 優點:即使磁碟損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為Master 宕機後,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。

​ 缺點: Master 宕機,磁碟損壞情況,會丟失少量訊息。

多Master多Slave模式(同步雙寫)---文中會說明補充此叢集配置,線上使用的話,推薦使用此模式叢集

​ 每個 Master 配置一個 Slave,有多對Master-Slave, HA採用同步雙寫方式,主備都寫成功,嚮應用返回成功。

​ 優點:資料與服務都無單點, Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高

​ 缺點:效能比非同步複製模式略低,大約低 10%左右,傳送單個訊息的 RT會略高。目前主宕機後,備機不能自動切換為主機,後續會支援自動切換功能

RocketMQ分散式部署實踐案例詳述

此處就RocketMQ的多Master多Slave的模式在Linux伺服器部署案例進行詳細的說明,如系統部署結構圖所示。

1本次部署環境

Linux伺服器192.168.162.235、192.168.162.236兩臺(下文均簡稱235、236),詳細部署環境示意表如下:

2 編輯Hosts

分別修改235 和236 的hosts 檔案

sudo vim /etc/hosts

      IP                NAME

192.168.162.235      nameserver1

192.168.162.235      master1

192.168.162.235      master1-slave1

192.168.162.236      nameserver2

192.168.162.236      master2

192.168.162.236      master2-slave2

注:修改hosts 檔案需獲得sudo 許可權,本機使用者是rocketMQ非root使用者, 故申請了堡壘機許可權(即獲得root許可權)。

3 下載官方原始碼

下載官方RocketMQ壓縮包,下載地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/,並選擇Download the 4.2.0 release 選項的 rocketmq-all-4.2.0-bin-release.zip下載。(其他如source為需要自己編譯的版本)

4 上傳到Linux並解壓

分別上傳rocketmq-all-4.2.0-bin-release.zip到235和236伺服器的/home/rocketMQ/ZHF/rocketMQ-2m2s/目錄下:

cd /home/rocketMQ/ZHF/rocketMQ-2m2s/tar –zxvf rocketmq-all-4.2.0-bin-release.zip

  1. 建立持久化儲存目錄

Master目錄設定:

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/commitlog

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/consumequeue

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/index

Slave目錄設定:

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/commitlog

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/consumequeue

mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/index

6 RocketMQ配置檔案

235伺服器設定:

sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a.properties

sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b-s.properties

236伺服器設定:

sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b.properties

sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a-s.properties

broker-a.properties檔案配置


# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

 

#brokerClusterName=DefaultCluster

#brokerName=broker-a

#brokerId=0

#deleteWhen=04

#fileReservedTime=48

#brokerRole=ASYNC_MASTER

#flushDiskType=ASYNC_FLUSH

 

#所屬叢集名字

brokerClusterName=rocketmq-cluster

#broker名字,注意此處不同的配置檔案填寫的不一樣

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=0

#nameServer地址,分號分割

namesrvAddr=nameserver1:9876;nameserver2:9876

#在傳送訊息時,自動建立伺服器不存在的topic,預設建立的佇列數

defaultTopicQueueNums=4

#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉

autoCreateTopicEnable=true

#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉

autoCreateSubscriptionGroup=true

#Broker 對外服務的監聽埠

listenPort=10911

haListenPort=10912

#刪除檔案時間點,預設凌晨 4點

deleteWhen=04

#檔案保留時間,預設 48 小時

fileReservedTime=18

#commitLog每個檔案的大小預設1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue每個檔案預設存30W條,根據業務情況調整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#檢測物理檔案磁碟空間

diskMaxUsedSpaceRatio=88

#儲存路徑

storePathRootDir=/home/rocketMQ/ZHF/rocketMQ-2m2s/store

#commitLog 儲存路徑

storePathCommitLog=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/commitlog

#消費佇列儲存路徑儲存路徑

storePathConsumeQueue=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/consumequeue

#訊息索引儲存路徑

storePathIndex=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/index

#checkpoint 檔案儲存路徑

storeCheckpoint=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/checkpoint

#abort 檔案儲存路徑

abortFile=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/abort

#限制的訊息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 非同步複製Master

#- SYNC_MASTER 同步雙寫Master

#- SLAVE

brokerRole=SYNC_MASTER

#刷盤方式

#- ASYNC_FLUSH 非同步刷盤

#- SYNC_FLUSH 同步刷盤

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#發訊息執行緒池數量

#sendMessageThreadPoolNums=128

#拉訊息執行緒池數量

#pullMessageThreadPoolNums=128

#強制指定本機IP,需要根據每臺機器進行修改。官方介紹可為空,系統預設自動識別,但多網絡卡時IP地址可能讀取錯誤

brokerIP1=192.168.162.235

broker-a-s.properties檔案配置

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License.  You may obtain a copy of the License at

#

#     http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

#brokerClusterName=DefaultCluster

#brokerName=broker-b

#brokerId=1

#deleteWhen=04

#fileReservedTime=48

#brokerRole=SLAVE

#flushDiskType=ASYNC_FLUSH

 

#所屬叢集名字

brokerClusterName=rocketmq-cluster

#broker名字,注意此處不同的配置檔案填寫的不一樣

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=1

#nameServer地址,分號分割

namesrvAddr=nameserver1:9876;nameserver2:9876

#在傳送訊息時,自動建立伺服器不存在的topic,預設建立的佇列數

defaultTopicQueueNums=4

#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉

autoCreateTopicEnable=true

#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉

autoCreateSubscriptionGroup=true

#Broker 對外服務的監聽埠

listenPort=10923

haListenPort=10924

#刪除檔案時間點,預設凌晨 4點

deleteWhen=04

#檔案保留時間,預設 48 小時

fileReservedTime=18

#commitLog每個檔案的大小預設1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue每個檔案預設存30W條,根據業務情況調整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#檢測物理檔案磁碟空間

diskMaxUsedSpaceRatio=88

#儲存路徑

storePathRootDir=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s

#commitLog 儲存路徑

storePathCommitLog=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/commitlog

#消費佇列儲存路徑儲存路徑

storePathConsumeQueue=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/consumequeue

#訊息索引儲存路徑

storePathIndex=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/index

#checkpoint 檔案儲存路徑

storeCheckpoint=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/checkpoint

#abort 檔案儲存路徑

abortFile=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/abort

#限制的訊息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 非同步複製Master

#- SYNC_MASTER 同步雙寫Master

#- SLAVE

brokerRole=SLAVE

#刷盤方式

#- ASYNC_FLUSH 非同步刷盤

#- SYNC_FLUSH 同步刷盤

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#發訊息執行緒池數量

#sendMessageThreadPoolNums=128

#拉訊息執行緒池數量

#pullMessageThreadPoolNums=128

#強制指定本機IP,需要根據每臺機器進行修改。官方介紹可為空,系統預設自動識別,但多網絡卡時IP地址可能讀取錯誤

brokerIP1=192.168.162.235

broker-b.properties檔案配置

參考broker-a.properties

broker-b-s.properties檔案配置

參考broker-a-s.properties

7 啟動引數設定

RocketMQ啟動檔案位於/home/rocketMQ/ZHF/rocketMQ-2m2s/bin/目錄下,Linux中nameserver啟動檔案為:mqnamesrv,broker啟動檔案為:mqbroker,mqnamesrv和mqbroker啟動檔案分別呼叫了runserver.sh和runbroker.sh檔案,這兩個檔案分別設定了nameserver和broker的啟動記憶體,目前記憶體啟動引數分別為nameserver啟動記憶體4G,最大記憶體4G,新生代2G,broker啟動記憶體8G,最大記憶體8G,新生代4G。

8 埠及防火牆設定

RokcetMQ啟動預設使用3個埠9875,10911,10912,三個埠分別代表nameserver伺服器埠,broker埠,broker HA埠。需注意的是在多Master多Slave模式下10911和10912是Master的使用埠,但Slave埠的設定與Master的埠不同,具體埠約束為:Slave - Master > 2,否則可能導致同一臺伺服器無法同時啟動Master和Slave。

如果伺服器啟動了防火牆,為了埠不被遮蔽,需將Master和Slave對應埠加入到iptables表以開放對應埠號,新增完成後重啟防火牆。命令列開放埠操作如下:

分別開啟235和236終端,在root使用者下執行命令:

開放埠:

/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 9876 -j ACCEPT

/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10911 -j ACCEPT

/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10912 -j ACCEPT

/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10923 -j ACCEPT

/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10924 -j ACCEPT

儲存:

/etc/rc.d/init.d/iptables save

重啟:

/etc/init.d/iptables restart

檢視埠開放情況:

/sbin/iptables -L -n

9 啟動Nameserver

分別啟動235、236的Nameserver



cd /home/rocketMQ/ZHF/rocketMQ-2m2s/bin/nohup sh mqnamesrv &


10.啟動Broker

235上Master啟動:

nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a.properties

236上Master啟動:

nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b.properties

235上對應236Master的Slave啟動:

nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b-s.properti

236上對應235Master的Slave啟動:

nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a-s.properti

至此,Nameserver、Broker啟動完成,可以用jobs命令檢視當前執行程序,如下是服務端相關shutdown,即在bin目錄下:

sh mqshutdown namesrvsh mqshutdown broker

六、RocketMQ監控平臺部署

Apache版的RocketMQ管理介面部署工具可以從github上下載原始碼,地址:https://github.com/apache/rocketmq-externals,部署流程如下:

  1. 修改配置檔案,關聯rocketMQ叢集到管理介面

首先解壓並進入解壓後rockemq-externals-master目錄rocketmq-externals-master/rocketmq-externals-master/rocketmq-console/src/main/resources,修改目錄下application.properties配置檔案內容如下圖:

  1. 編譯rocketmq-console
mvn clean package -Dmaven.test.skip=true

編譯需用maven命令進行編譯,如下圖,顯示BIUD SUCCESS,則編譯成功,成功後會在rocketmq-externals-master/rocketmq-console/target目錄下產生一個rocketmq-console-ng-1.0.0.jar檔案。

  1. 將編譯好的rocketmq-console-ng-0.0.jar包上傳linux伺服器

這裡上傳伺服器地址為192.168.162.235,路徑為:/home/rocketMQ/ZHF/

  1. 執行jar包
java -jar target/rocketmq-console-ng-1.0.0.jar

執行顯示下圖則啟動成功:

  1. 訪問管理介面

瀏覽器輸入http://192.168.162.235:8080/回車顯示監控介面如下:

原文:https://blog.csdn.net/tubunanhai/article/details/81738416

RocketMQ的開發快速入門

1、引入 rocketmq-client

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.1.0-incubating</version>
</dependency>

2、編寫Producer

DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
 //指定NameServer地址
 producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的

 /**
 * Producer物件在使用之前必須要呼叫start初始化,初始化一次即可
 * 注意:切記不可以在每次傳送訊息時,都呼叫start方法
 */
 producer.start();

 for (int i = 0; i < 997892; i++) {
 try {
 //構建訊息
 Message msg = new Message("TopicTest" /* Topic */,
 "TagA" /* Tag */,
 ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
 );

 //傳送同步訊息
 SendResult sendResult = producer.send(msg);

 System.out.printf("%s%n", sendResult);
 } catch (Exception e) {
 e.printStackTrace();
 Thread.sleep(1000);
 }
 }


producer.shutdown();

3、編寫Consumer

/**
 * Consumer Group,非常重要的概念,後續會慢慢補充
 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的

/**
 * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
 * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
 try {
 for(MessageExt msg:msgs){
 String msgbody = new String(msg.getBody(), "utf-8");
 System.out.println(" MessageBody: "+ msgbody);//輸出訊息內容
 }
 } catch (Exception e) {
 e.printStackTrace();
 return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
 }
});


consumer.start();

System.out.printf("Consumer Started.%n");

4、說明

各位根據自己的環境,修改NamesrvAddr的值,我的叢集請參考:RocketMQ叢集部署配置。稍後通過RocketMQ管控臺就可以看到之前搭建的多Master多Slave模式,非同步複製叢集模式。

5、通過RocketMQ管控臺

rocketmq-console-ng獲取方式為:rocketmq-console-ng,之後通過mavne進行編譯獲取jar,命令如下:

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

得到rocketmq-console-ng-1.0.0.jar之後,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties檔案,根據自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值。

直接啟動:

java -jar rocketmq-console-ng-1.0.0.jar

管控臺是基於springboot的,的確springboot非常方便和非常火了,所以有必要去學習下springboot了(其實還是spring系列,所以spring也必要深入學習下),稍後通過管控臺進行觀察執行。

6、執行觀察

一個好的習慣是先執行Consumer,之後在執行Producer,之後通過rocketmq-console-ng管控臺觀察

執行完成之後,的確broker-a的資料加上broker-b的資料量就等於我們傳送的資料量,而且slave的數量也master的數量也是一致的,效果如下:

檢視傳送這些資料,2臺機器的磁碟情況如下:

到目前位置,關於RocketMQ快速入門就結束了,未完待續……

參考:

https://www.pianshen.com/article/1215649056/

https://blog.csdn.net/weiwenhou/article/details/100869824

https://www.cnblogs.com/qdhxhz/p/11094624.html

https://blog.csdn.net/linyaogai/article/details/77876078

https://www.pianshen.com/article/1215649056/

https://blog.csdn.net/weiwenhou/article/details/100869824

https://www.cnblogs.com/qdhxhz/p/11094624.html

https://blog.csdn.net/linyaogai/article/details/77876078

回到◀瘋狂創客圈

瘋狂創客圈 - Java高併發研習社群,為大家開啟大廠之門