1. 程式人生 > 其它 >RocketMQ學習:Broker

RocketMQ學習:Broker

Broker

Broker充當著訊息中轉角色,負責儲存訊息、轉發訊息。 Broker在 Rocketmq系統中負責接收並存儲從生產者傳送來的訊息,同時為消費者的拉取請求作準備。Broker同時也儲存著訊息相關的元資料,包括消費者組消費進度偏移 offset、主題、佇列等。

  • Remoting Module:整個 Broker的實體,負責處理來自 clients端的請求。而這個Broker實體則由以下模組構成:

  • Client Manager:客戶端管理器。負責接收、解析客戶端( Producer/Consumer)請求,管理客戶端。例如,維護 Consumer的Topic訂閱資訊

  • Store service:儲存服務。提供方便簡單的AP介面,處理訊息儲存到物理硬碟和訊息査詢功能。

  • HA Service:高可用服務,提供 Master broker和 Slave broker之間的資料同步功能。

  • Index service:索引服務。根據特定的 Message key,對投遞到 Broker的訊息進行索引服務,同時也提供根據 Message Key對訊息進行快速查詢的功能。

為了增強 Broker效能與吞吐量, Broker一般都是以叢集形式岀現的。各叢集節點中可能存放著相同Topic的不同 Queue。不過,這裡有個問題,如果某 Broker節點宕機,如何保證資料不丟失呢?其解決方案是,將每個Broker叢集節點進行橫向擴充套件,即將 Broker節點再建為一個HA叢集,解決單點問題。

Broker節點叢集是一個主從叢集,即叢集中具有 Master與 Slave兩種角色。Master負責處理讀寫操作請求,而 Slave僅負責讀操作請求。一個 Maste可以包含多個 Slave,但一個 Slave只能隸屬於個 Master。Master與 Slave的對應關係是通過指定相同的 BrokerName、不同的 BrokerId來確定的。BrokerId為0表示Master,非0表示Slave,每個Broker與NameServer叢集中的所有節點建立長連線,定時註冊Topic資訊到所有 Nameserver。

工作流程

  1. 啟動 NameServer, NameServer啟動後開始監聽埠,等待 Broker、 Producer、 Consumer連線。

  2. 啟動 Broker時, Broker會與所有的 NameServer建立並保持長連線,然後每30秒向NameServer定時傳送心跳包

  3. 傳送訊息前,可以先建立 Topic,建立 Topic時需要指定該 Topic要儲存在哪些 Broker上,當然,在建立Topic時也會將 Topic與 Broker的關係寫入到 NameServer中。不過,這步是可選的,也可以在傳送訊息時自動建立 Topic

  4. Producer傳送訊息,啟動時先跟NameServer叢集中的其中一臺建立長連線,並從NameServer中獲取路由資訊,即當前傳送的 Topic訊息的Queue與 Broker的地址(IP+port)的對映關係。然後根據演算法策略從隊選擇一個 Queue,與佇列所在的 Broker建立長連線從而向 Broker發訊息。當然,在獲取到路由資訊後,Producer會首先將路由資訊快取到本地,再每30秒從 Name server更新一次路由資訊。

  5. Consumer跟 Producer類似,跟其中一臺 Nameserver建立長連線,獲取其所訂閱 Topic的路由資訊,然後根據演算法策略從路由資訊中獲取到其所要消費的queue,然後直接跟 Broker建立長連線,開始消費其中的訊息。 Consumer在獲取到路由資訊後,同樣也會每30秒從 Name Server更新一次路由資訊。不過不同於Producer的是,Consumer還會向 Broker傳送心跳,以確保 Broker的存活狀態。

手動Topic的建立有兩種模型

1.叢集模式:該模式下建立的Topic在該叢集中,所有Broker中的Queue數量是相同的
2.Broker模式:該模式下建立的Topic在該叢集中,每個Broker中的Queue數量可以不同

自動建立Topic時,預設採用的是Broker模式,會為每個Broker預設建立4個Queue

讀寫佇列

從物理上來講,讀寫佇列是同一個佇列。所以,不存在讀寫佇列資料同步問題。讀寫佇列是邏輯上進行區分的概念。一般情況下,讀寫佇列數量是一樣的。

例如,建立Topic時設定的寫佇列數量為8,讀佇列數量為4,此時系統會建立8個 Queue,分別是0 1 2 3 4 5 6 7。Producer會將訊息寫入到這8個佇列,但 Consumer只會消費0 1 2 3這4個佇列中的訊息,4 5 6 7中的訊息是不會被消費到的。

再如,建立 Topick時設定的寫佇列數量為4,讀佇列數量為8,此時系統會建立8個 Queue,分別是0 1 2 3 4 5 6 7。Producer會將訊息寫入到0 1 2 3這4個佇列,但 Consumer只會消費0 1 2 3 4 5 6 7這8個佇列中的訊息,但是4567中是沒有訊息的。此時假設Consumer Group中包含兩個Consumer,Consumer1消費0 1 2 3,而Consumer2消費4 5 6 7。但實際情況,Consumer2是沒有訊息可以消費。

也就是當讀寫佇列數量不一致時,總是有問題的。其這樣設計的目的是為了方便Topic的Queue的縮容。

例如,原來建立的Topic中包含16個Queue,如何能夠使其Queue縮容為8個,還不會丟失訊息?可以動態修改寫佇列數量為8,讀佇列數量不變。此時新的訊息只能寫入到前8個佇列,而消費都消費的卻是16個佇列中的資料。當發現後8個 Queue中的訊息消費完畢後,就可以再將讀佇列數量動態設定為8.

perm

perm用於設定對當前建立Topic的操作許可權:2表示只寫,4表示只讀,6表示讀寫