1. 程式人生 > >一套高可用、易伸縮、高併發的IM群聊架構方案設計實踐

一套高可用、易伸縮、高併發的IM群聊架構方案設計實踐

本文原題為“一套高可用群聊訊息系統實現”,由作者“於雨氏”授權整理和釋出,內容有些許改動,作者部落格地址:alexstocks.github.io。應作者要求,如需轉載,請聯絡作者獲得授權。

一、引言

要實現一整套能用於大使用者量、高併發場景下的IM群聊,技術難度遠超IM系統中的其它功能,原因在於:IM群聊訊息的實時寫擴散特性帶來了一系列技術難題。

舉個例子:如一個2000人群裡,一條普通訊息的發出問題,將瞬間寫擴散為2000條訊息的接收問題,如何保證這些訊息的及時、有序、高效地送達,涉及到的技術問題點實在太多,更別說個別場景下萬人大群裡的炸群訊息難題了更別說個別場景下萬人大群裡的炸群訊息難題了。

這也是為什麼一般中大型IM系統中,都會將群聊單獨拎出來考慮架構的設計,單獨有針對性地進行架構優化,從而降低整個系統的設計難度。

本文將分享的是一套生產環境下的IM群聊訊息系統的高可用、易伸縮、高併發架構設計實踐,屬於原創第一手資料,內容較專業,適合有一定IM架構經驗的後端程式設計師閱讀。

推薦:如有興趣,本文作者的另一篇《一套原創分散式即時通訊(IM)系統理論架構方案》,也適合正在進行IM系統架構設計研究的同學閱讀。

學習交流:

- 即時通訊開發交流3群:185926912[推薦]

- 移動端IM開發入門文章:《新手入門一篇就夠:從零開發移動端IM

(本文同步釋出於:

http://www.52im.net/thread-2015-1-1.html

二、群聊技術文章

IM群聊訊息究竟是存1份(即擴散讀)還是存多份(即擴散寫)?

IM群聊訊息的已讀回執功能該怎麼實現?

關於IM即時通訊群聊訊息的亂序問題討論

現代IM系統中聊天訊息的同步和儲存方案探討

移動端IM中大規模群訊息的推送如何保證效率、實時性?

微信後臺團隊:微信後臺非同步訊息佇列的優化升級實踐分享

IM群聊訊息如此複雜,如何保證不丟不重?

IM單聊和群聊中的線上狀態同步應該用“推”還是“拉”?

如何保證IM實時訊息的“時序性”與“一致性”?

快速裂變:見證微信強大後臺架構從0到1的演進歷程(一)

三、萬事開頭難:初始的極簡實現

所謂的群聊訊息系統,就是一種多對多群體聊天方式,譬如直播房間內的聊天室對應的伺服器端就是一個群聊訊息系統。

2017年9月初,我們初步實現了一套極簡的群聊訊息系統,其大致架構如下:

系統名詞解釋:

1)Client : 訊息釋出者【或者叫做服務端群聊訊息系統呼叫者】,publisher;

2)Proxy : 系統代理,對外統一介面,收集Client發來的訊息轉發給Broker;

3)Broker :系統訊息轉發Server,Broker 會根據 Gateway Message 組織一個 RoomGatewayList【key為RoomID,value為 Gateway IP:Port 地址列表】,然後把 Proxy 發來的訊息轉發到 Room 中所有成員登入的所有 Gateway;

4)Router :使用者登入訊息轉發者,把Gateway轉發來的使用者登入登出訊息轉發給所有的Broker;

5)Gateway :所有服務端的入口,接收合法客戶端的連線,並把客戶端的登入登出訊息通過Router轉發給所有的Broker;

6)Room Message : Room聊天訊息;

7)Gateway Message : Room內某成員 登入 或者 登出 某Gateway訊息,包含使用者UIN/RoomID/Gateway地址{IP:Port}等訊息。

當一個 Room 中多個 Client 連線一個 Gateway 的時候,Broker只會根據 RoomID 把房間內的訊息轉發一次給這個Gateway,由Gateway再把訊息複製多份分別傳送給連線這個 Gateway 的 Room 中的所有使用者的客戶端。

這套系統有如下特點:

1)系統只轉發房間內的聊天訊息,每個節點收到後立即轉發出去,不儲存任何房間內的聊天訊息,不考慮訊息丟失以及訊息重複的問題;

2)系統固定地由一個Proxy、三個Broker和一個Router構成;

3)Proxy接收後端傳送來的房間訊息,然後按照一定的負載均衡演算法把訊息發往某個Broker,Broker則把訊息傳送到所有與Room有關係的介面機Gateway;

4)Router接收Gateway轉發來的某個Room內某成員在這個Gateway的登出或者登入訊息,然後把訊息傳送到所有Broker;

5)Broker收到Router轉發來的Gateway訊息後,更新(新增或者刪除)與某Room相關的Gateway集合記錄;

6)整個系統的通訊鏈路採用UDP通訊方式。

從以上特點,整個訊息系統足夠簡單,沒有考慮擴縮容問題,當系統負載到達極限的時候,就重新再部署一套系統以應對後端client的訊息壓力。

這種處理方式本質是把系統的擴容能力甩鍋給了後端Client以及前端Gateway:每次擴容一個系統,所有Client需要在本地配置檔案中新增一個Proxy地址然後全部重啟,所有Gateway則需要再本地配置檔案新增一個Router地址然後全部重啟。

這種“幸福我一人,辛苦千萬家”的擴容應對方式,必然導致公司內部這套系統的使用者怨聲載道,下一階段的升級就是必然的了。

四、進一步重點設計:“可擴充套件性”

4.1、基本思路

大道之行也,天下為公,不同的系統有不同的構架,相同的系統總有類似的實現。類似於資料庫的分庫分表【關於分庫分表,目前看到的最好的文章是《一種支援自由規劃無須資料遷移和修改路由程式碼的Replicaing擴容方案》】,其擴充套件實現核心思想是分Partition分Replica,但各Replica之間還區分leader(leader-follower,只有leader可接受寫請求)和non-leader(所有replica均可接收寫請求)兩種機制。

從資料角度來看,這套系統接收兩種訊息:Room Message(房間聊天訊息)和Gateway Message(使用者登入訊息)。兩種訊息的交匯之地就是Broker,所以應對擴充套件的緊要地方就是Broker,Broker的每個Partition採用non-leader機制,各replica均可接收Gateway Message訊息寫請求和Room Message轉發請求。

首先,當Room Message量加大時可以對Proxy進行水平擴充套件,多部署Proxy即可因應Room Message的流量。

其次,當Gateway Message量加大時可以對Router進行水平擴充套件,多部署Router即可因應Gateway Message的流量。

最後,兩種訊息的交匯之地Broker如何擴充套件呢?可以把若干Broker Replica組成一個Partition,因為Gateway Message是在一個Partition內廣播的,所有Broker Replica都會有相同的RoomGatewayList 資料,因此當Gateway Message增加時擴容Partition即可。當Room Message量增加時,水平擴容Partition內的Broker Replica即可,因為Room Message只會傳送到Partition內某個Replica上。

從個人經驗來看,Room ID的增長以及Room內成員的增加量在一段時間內可以認為是直線增加,而Room Message可能會以指數級增長,所以若設計得當則Partition擴容的概率很小,而Partition內Replica水平增長的概率幾乎是100%。

不管是Partition級別的水平擴容還是Partition Replica級別的水平擴容,不可能像系統極簡版本那樣每次擴容後都需要Client或者Gateway去更新配置檔案然後重啟,因應之道就是可用zookeeper充當角色的Registriy。通過這個zookeeper註冊中心,相關角色擴容的時候在Registry註冊後,與之相關的其他模組得到通知即可獲取其地址等資訊。採用zookeeper作為Registry的時候,所以程式實現的時候採用實時watch和定時輪詢的策略保證資料可靠性,因為一旦網路有任何的抖動,zk就會認為客戶端已經宕機把連結關閉。

分析完畢,與之相對的架構圖如下:

以下各分章節將描述各個模組詳細流程。

4.2、Client

Client詳細流程如下:

1)從配置檔案載入Registry地址;

2)從Registy上Proxy註冊路徑/pubsub/proxy下獲取所有的Proxy,依據各個Proxy ID大小順序遞增組成一個ProxyArray;

3)啟動一個執行緒實時關注Registry路徑/pubsub/proxy,以獲取Proxy的動態變化,及時更新ProxyArray;

4)啟動一個執行緒定時輪詢獲取Registry路徑/pubsub/proxy下各個Proxy例項,作為關注策略的補充,以期本地ProxyArray內各個Proxy成員與Registry上的各個Proxy保持一致;定時給各個Proxy傳送心跳,非同步獲取心跳回包;定時清除ProxyArray中心跳超時的Proxy成員;

5)傳送訊息的時候採用snowflake演算法給每個訊息分配一個MessageID,然後採用相關負載均衡演算法把訊息轉發給某個Proxy。

4.3、Proxy

Proxy詳細流程如下:

1)讀取配置檔案,獲取Registry地址;

2)把自身資訊註冊到Registry路徑/pubsub/proxy下,把Registry返回的ReplicaID作為自身ID;

3)從Registry路徑/pubsub/broker/partition(x)下獲取每個Broker Partition的各個replica;

4)從Registry路徑/pubsub/broker/partition_num獲取當前有效的Broker Partition Number;

5)啟動一個執行緒關注Registry上的Broker路徑/pubsub/broker,以實時獲取以下資訊:

    {Broker Partition Number}

     - 新的Broker Partition(此時發生了擴容);

     - Broker Partition內新的broker replica(Partition內發生了replica擴容);

     - Broker Parition內某replica掛掉的資訊;

6)定時向各個Broker Partition replica傳送心跳,非同步等待Broker返回的心跳響應包,以探測其活性,以保證不向超時的replica轉發Room Message;

7)啟動一個執行緒定時讀取Registry上的Broker路徑/pubsub/broker下各個子節點的值,以定時輪詢的策略觀察Broker Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Broker,從有效的BrokerList中刪除;

8)依據規則【BrokerPartitionID = RoomID % BrokerPartitionNum, BrokerReplicaID = RoomID % BrokerPartitionReplicaNum】向某個Partition的replica轉發Room Message,收到Client的Heatbeat包時要及時給予響應。

之所以把Room Message和Heartbeat Message放在一個執行緒處理,是為了防止程序假死這種情況。

當/pubsub/broker/partition_num的值發生改變的時候(譬如值改為4),意味著Router Partition進行了擴充套件,Proxy要及時獲取新Partition路徑(如/pubsub/broker/Partition2和/pubsub/broker/Partition3)下的例項,並關注這些路徑,獲取新Partition下的例項。

之所以Proxy在獲取Registry下所有當前的Broker例項資訊後再註冊自身資訊,是因為此時它才具有轉發訊息的資格。

Proxy轉發某個Room訊息時候,只發送給處於Running狀態的Broker。為Broker Partition內所有replica依據Registry給其分配的replicaID進行遞增排序,組成一個Broker Partition Replica Array,規則中BrokerPartitionReplicaNum為Array的size,而BrokerReplicaID為replica在Array中的下標。

4.4、Pipeline

收到的 Room Message 需要做三部工作:收取 Room Message、訊息協議轉換和向 Broker 傳送訊息。

初始系統這三步流程如果均放在一個執行緒內處理,proxy 的整體吞吐率只有 50 000 Msg/s。

最後的實現方式是按照訊息處理的三個步驟以 pipeline 方式做如下流程處理:

1)啟動 1 個訊息接收執行緒和 N【N == Broker Parition 數目】個多寫一讀形式的無鎖佇列【稱之為訊息協議轉換佇列】,訊息接收執行緒分別啟動一個 epoll 迴圈流程收取訊息,然後把訊息以相應的 hash 演算法【佇列ID = UIN % N】寫入對應的訊息協議轉換佇列;

2)啟動 N 個執行緒 和 N * 3 個一寫一讀的無鎖佇列【稱之為訊息傳送佇列】,每個訊息協議專家執行緒從訊息協議轉換佇列接收到訊息並進行協議轉換後,根據相應的 hash 演算法【佇列ID = UIN % 3N】寫入訊息傳送佇列;

3)啟動 3N 個訊息傳送執行緒,分別建立與之對應的 Broker 的連線,每個執行緒單獨從對應的某個訊息傳送佇列接收訊息然後傳送出去。

經過以上流水線改造後,Proxy 的整體吞吐率可達 200 000 Msg/s。

關於 pipeline 自身的解釋,本文不做詳述,可以參考下圖:

4.5、大房間訊息處理

每個 Room 的人數不均,最簡便的解決方法就是給不同人數量級的 Room 各搭建一套訊息系統,不用修改任何程式碼。

然所謂需求推動架構改進,在系統迭代升級過程中遇到了這樣一個需求:業務方有一個全國 Room,用於給所有線上使用者進行訊息推送。針對這個需求,不可能為了一個這樣的 Room 單獨搭建一套系統,況且這個 Room 的訊息量很少。

如果把這個 Room 的訊息直接傳送給現有系統,它有可能影響其他 Room 的訊息傳送:訊息系統是一個寫放大的系統,全國 Room 內有系統所有的線上使用者,每次傳送都會卡頓其他 Room 的訊息傳送。

最終的解決方案是:使用類似於分割槽的方法,把這樣的大 Room 對映為 64 個虛擬 Room【稱之為 VRoom】。在 Room 號段分配業務線的配合下,給訊息系統專門保留了一個號段,用於這種大 Room 的切分,在 Proxy 層依據一個 hash 方法 【 VRoomID = UserID % 64】 把每個 User 分配到相應的 VRoom,其他模組程式碼不用修改即完成了大 Room 訊息的路由。

4.6、Broker

Broker詳細流程如下:

1)Broker載入配置,獲取自身所在Partition的ID(假設為3);

2)向Registry路徑/pubsub/broker/partition3註冊,設定其狀態為Init,註冊中心返回的ID作為自身的ID(replicaID);

3)接收Router轉發來的Gateway Message,放入GatewayMessageQueue;

4)從Database載入資料,把自身所在的Broker Partition所應該負責的 RoomGatewayList 資料載入進來;

5)非同步處理GatewayMessageQueue內的Gateway Message,只處理滿足規則【PartitionID == RoomID % PartitionNum】的訊息,把資料存入本地路由資訊快取;

6)修改Registry路徑/pubsub/broker/partition3下自身節點的狀態為Running;

7)啟動執行緒實時關注Registry路徑/pubsub/broker/partition_num的值;

8)啟動執行緒定時查詢Registry路徑/pubsub/broker/partition_num的值;

9)當Registry路徑/pubsub/broker/partition_num的值發生改變的時候,依據規則【PartitionID == RoomID % PartitionNum】清洗本地路由資訊快取中每條資料;

10)接收Proxy發來的Room Message,依據RoomID從路由資訊快取中查詢Room有成員登陸的所有Gateway,把訊息轉發給這些Gateway。

注意Broker之所以先註冊然後再載入Database中的資料,是為了在載入資料的時候同時接收Router轉發來的Gateway Message,但是在資料載入完前這些受到的資料先被快取起來,待所有 RoomGatewayList 資料載入完後就把這些資料重放一遍;

Broker之所以區分狀態,是為了在載入完畢 RoomGatewayList 資料前不對Proxy提供轉發訊息的服務,同時也方便Broker Partition應對的訊息量增大時進行水平擴充套件。

當Broker發生Partition擴充套件的時候,新的Partition個數必須是2的冪,只有新Partition內所有Broker Replica都載入例項完畢,再更改/pubsub/broker/partition_num的值。

老的Broker也要watch路徑/pubsub/broker/partition_num的值,當這個值增加的時候,它也需要清洗本地的路由資訊快取。

Broker的擴容過程猶如細胞分裂,形成中的兩個細胞有著完全相同的資料,分裂完成後【Registry路徑/pubsub/broker/partition_num的值翻倍】則需要清洗垃圾資訊。這種方法稱為翻倍法。

4.7、Router

Router詳細流程如下:

1)Router載入配置,Registry地址;

2)把自身資訊註冊到Registry路徑/pubsub/router下,把Registry返回的ReplicaID作為自身ID;

3)從Registry路徑/pubsub/broker/partition(x)下獲取每個Broker Partition的各個replica;

4)從Registry路徑/pubsub/broker/partition_num獲取當前有效的Broker Partition Number;

5)啟動一個執行緒關注Registry上的Broker路徑/pubsub/broker,以實時獲取以下資訊:

    {Broker Partition Number}

    - 新的Broker Partition(此時發生了擴容);

    - Broker Partition內新的broker replica(Partition內發生了replica擴容);

- Broker Parition內某replica掛掉的資訊;

6)定時向各個Broker Partition replica傳送心跳,非同步等待Broker返回的心跳響應包,以探測其活性,以保證不向超時的replica轉發Gateway Message;

7)啟動一個執行緒定時讀取Registry上的Broker路徑/pubsub/broker下各個子節點的值,以定時輪詢的策略觀察Broker Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Broker,從有效的BrokerList中刪除;

8)從Database全量載入路由 RoomGatewayList 資料放入本地快取;

9)收取Gateway發來的心跳訊息,及時返回ack包;

10)收取Gateway轉發來的Gateway Message,按照一定規則【BrokerPartitionID % BrokerPartitionNum = RoomID % BrokerPartitionNum】轉發給某個Broker Partition下所有Broker Replica,保證Partition下所有replica擁有同樣的路由 RoomGatewayList 資料,再把Message內資料存入本地快取,當檢測到資料不重複的時候把資料非同步寫入Database。

4.8、Gateway

Gateway詳細流程如下:

1)讀取配置檔案,載入Registry地址;

2)從Registry路徑/pubsub/router/下獲取所有router replica,依據各Replica的ID遞增排序組成replica陣列RouterArray;

3)啟動一個執行緒實時關注Registry路徑/pubsub/router,以獲取Router的動態變化,及時更新RouterArray;

4)啟動一個執行緒定時輪詢獲取Registry路徑/pubsub/router下各個Router例項,作為關注策略的補充,以期本地RouterArray及時更新;定時給各個Router傳送心跳,非同步獲取心跳回包;定時清除RouterArray中心跳超時的Router成員;

5)當有Room內某成員客戶端連線上來或者Room內所有成員都不連線當前Gateway節點時,依據規則【RouterArrayIndex = RoomID % RouterNum】向某個Router傳送Gateway Message;

6)收到Broker轉發來的Room Message時,根據MessageID進行去重,如果不重複則把訊息傳送到連線到當前Gateway的Room內所有客戶端,同時把MessageID快取起來以用於去重判斷。

Gateway本地有一個基於共享記憶體的LRU Cache,儲存最近一段時間傳送的訊息的MessageID。

五、接下來迫切要解決的:系統穩定性

系統具有了可擴充套件性僅僅是系統可用的初步,整個系統要保證最低粒度的SLA(0.99),就必須在兩個維度對系統的可靠性就行感知:訊息延遲和系統內部元件的高可用。

5.1、訊息延遲

準確的訊息延遲的統計,通用的做法可以基於日誌系統對系統所有訊息或者以一定概率抽樣後進行統計,但限於人力目前沒有這樣做。

目前使用了一個方法:通過一種構造一組偽使用者ID,定時地把訊息傳送給proxy,每條訊息經過一層就把在這層的進入時間和發出時間以及元件自身的一些資訊填入訊息,這組偽使用者的訊息最終會被髮送到一個偽Gateway端,偽Gateway對這些訊息的資訊進行歸併統計後,即可計算出當前系統的平均訊息延遲時間。

通過所有訊息的平均延遲可以評估系統的整體效能。同時,因為系統訊息路由的雜湊方式已知,當固定時間內偽Gateway沒有收到訊息時,就把訊息當做傳送失敗,當某條鏈路失敗一定次數後就可以產生告警了。

5.2、高可用

上面的方法同時能夠檢測某個鏈路是否出問題,但是鏈路具體出問題的點無法判斷,且實時性無法保證。

為了保證各個元件的高可用,系統引入了另一種評估方法:每個層次都給後端元件傳送心跳包,通過心跳包的延遲和成功率判斷其下一級元件的當前的可用狀態。

譬如proxy定時給每個Partition內每個broker傳送心跳,可以依據心跳的成功率來快速判斷broker是否處於“假死”狀態(最近業務就遇到過broker程序還活著,但是對任何收到的訊息都不處理的情況)。

同時依靠心跳包的延遲還可以判斷broker的處理能力,基於此延遲值可在同一Partition內多broker端進行負載均衡。

六、進一步優化:訊息可靠性

公司內部內部原有一個走tcp通道的群聊訊息系統,但是經過元旦一次大事故(幾乎全線崩潰)後,相關業務的一些重要訊息改走這套基於UDP的群聊訊息系統了。這些訊息如服務端下達給客戶端的遊戲動作指令,是不允許丟失的,但其特點是相對於聊天訊息來說量非常小(單人1秒最多一個),所以需要在目前UDP鏈路傳遞訊息的基礎之上再構建一個可靠訊息鏈路。

國內某IM大廠的訊息系統也是以UDP鏈路為基礎的(見《為什麼QQ用的是UDP協議而不是TCP協議?》),他們的做法是訊息重試加ack構建了可靠訊息穩定傳輸鏈路。但是這種做法會降低系統的吞吐率,所以需要獨闢蹊徑。

UDP通訊的本質就是偽裝的IP通訊,TCP自身的穩定性無非是重傳、去重和ack,所以不考慮訊息順序性的情況下可以通過重傳與去重來保證訊息的可靠性。

基於目前系統的可靠訊息傳輸流程如下:

1)Client給每個命令訊息依據snowflake演算法配置一個ID,複製三份,立即傳送給不同的Proxy;

2)Proxy收到命令訊息以後隨機發送給一個Broker;

3)Broker收到後傳輸給Gateway;

4)Gateway接收到命令訊息後根據訊息ID進行重複判斷,如果重複則丟棄,否則就傳送給APP,並快取之。

正常的訊息在群聊訊息系統中傳輸時,Proxy會根據訊息的Room ID傳遞給固定的Broker,以保證訊息的有序性。

七、Router需要進一步強化

7.1、簡述

當線上需要部署多套群聊訊息系統的時候,Gateway需要把同樣的Room Message複製多份轉發給多套群聊訊息系統,會增大Gateway壓力,可以把Router單獨獨立部署,然後把Room Message向所有的群聊訊息系統轉發。

Router系統原有流程是:Gateway按照Room ID把訊息轉發給某個Router,然後Router把訊息轉發給下游Broker例項。新部署一套群聊訊息系統的時候,新系統Broker的schema需要通過一套約定機制通知Router,使得Router自身邏輯過於複雜。

重構後的Router架構參照上圖,也採用分Partition分Replica設計,Partition內部各Replica之間採用non-leader機制;各Router Replica不會主動把Gateway Message內容push給各Broker,而是各Broker主動通過心跳包形式向Router Partition內某個Replica註冊,而後此Replica才會把訊息轉發到這個Broker上。

類似於Broker,Router Partition也以2倍擴容方式進行Partition水平擴充套件,並通過一定機制保證擴容或者Partition內部各個例項停止執行或者新啟動時,盡力保證資料的一致性。

Router Replica收到Gateway Message後,replica先把Gateway Message轉發給Partition內各個peer replica,然後再轉發給各個訂閱者。Router轉發訊息的同時非同步把訊息資料寫入Database。

獨立Router架構下,下面小節將分別詳述Gateway、Router和Broker三個相關模組的詳細流程。

7.2、Gateway

Gateway詳細流程如下:

1)從Registry路徑/pubsub/router/partition(x)下獲取每個Partition的各個replica;

2)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;

3)啟動一個執行緒關注Registry上的Router路徑/pubsub/router,以實時獲取以下資訊:{Router Partition Number} -> 新的Router Partition(此時發生了擴容);  Partition內新的replica(Partition內發生了replica擴容);  Parition內某replica掛掉的資訊;

4)定時向各個Partition replica傳送心跳,非同步等待Router返回的心跳響應包,以探測其活性,以保證不向超時的replica轉發Gateway Message;

5)啟動一個執行緒定時讀取Registry上的Router路徑/pubsub/router下各個子節點的值,以定時輪詢的策略觀察Router Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Router,從有效的BrokerList中刪除;

6 依據規則向某個Partition的replica轉發Gateway Message。

第六步的規則決定了Gateway Message的目的Partition和replica,規則內容有:

如果某Router Partition ID滿足condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber),則把訊息轉發到此Partition;

這裡之所以不採用直接hash方式(RouterPartitionID = RoomID % RouterPartitionNumber)獲取Router Partition,是考慮到當Router進行2倍擴容的時候當所有新的Partition的所有Replica都啟動完畢且資料一致時才會修改Registry路徑/pubsub/router/partitionnum的值,按照規則的計算公式才能保證新Partition的各個Replica在啟動過程中就可以得到Gateway Message,也即此時每個Gateway Message會被髮送到兩個Router Partition。 當Router擴容完畢,修改Registry路徑/pubsub/router/partitionnum的值後,此時新叢集進入穩定期,每個Gateway Message只會被髮送固定的一個Partition,condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber)等效於condition(RouterPartitionID = RoomID % RouterPartitionNumber)。

如果Router Partition內某replia滿足condition(replicaPartitionID = RoomID % RouterPartitionReplicaNumber),則把訊息轉發到此replica。

replica向Registry註冊的時候得到的ID稱之為replicaID,Router Parition內所有replica按照replicaID遞增排序組成replica陣列RouterPartitionReplicaArray,replicaPartitionID即為replica在陣列中的下標。

Gateway Message資料一致性:

Gateway向Router傳送的Router Message內容有兩種:某user在當前Gateway上進入某Room 和 某user在當前Gateway上退出某Room,資料項分別是UIN(使用者ID)、Room ID、Gateway Addr和User Action(Login or Logout。

由於所有訊息都是走UDP鏈路進行轉發,則這些訊息的順序就有可能亂序。Gateway可以統一給其發出的所有訊息分配一個全域性遞增的ID【下文稱為GatewayMsgID,Gateway Message ID】以保證訊息的唯一性和全域性有序性。

Gateway向Registry註冊臨時有序節點時,Registry會給Gateway分配一個ID,Gateway可以用這個ID作為自身的Instance ID【假設這個ID上限是65535】。

GatewayMsgID字長是64bit,其格式如下:

//63 -------------------------- 48 47 -------------- 38 37 ------------ 0

//|  16bit Gateway Instance ID    |   10bit Reserve    |    38bit自增碼  |

7.3、Router

Router系統部署之前,先設定Registry路徑/pubsub/router/partition_num的值為1。

Router詳細流程如下:

1)Router載入配置,獲取自身所在Partition的ID(假設為3);

2)向Registry路徑/pubsub/router/partition3註冊,設定其狀態為Init,註冊中心返回的ID作為自身的ID(replicaID);

3)註冊完畢會收到Gateway發來的Gateway Message以及Broker發來的心跳訊息(HeartBeat Message),先快取到訊息佇列MessageQueue;

4)從Registry路徑/pubsub/router/partition3下獲取自身所在的Partition內的各個replica;

5)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;

6)啟動一個執行緒關注Registry路徑/pubsub/router,以實時獲取以下資訊:{Router Partition Number}  -> Partition內新的replica(Partition內發生了replica擴容);  Parition內某replica掛掉的資訊;

7)從Database載入資料;

8)啟動一個執行緒非同步處理MessageQueue內的Gateway Message,把Gateway Message轉發給同Partition內其他peer replica,然後依據規則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】轉發給BrokerList內每個Broker;處理Broker發來的心跳包,把Broker的資訊存入本地BrokerList,然後給Broker傳送回包;

9)修改Registry路徑/pubsub/router/partition3下節點的狀態為Running;

10)啟動一個執行緒定時讀取Registry路徑/pubsub/router下各個子路徑的值,以定時輪詢的策略觀察Router各Partition的變動情況,作為實時策略的補充;檢查超時的Broker,把其從BrokerList中剔除;

11)當RouterPartitionNum倍增時,Router依據規則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】清洗自身路由資訊快取中資料;

12)Router本地儲存每個Gateway的最大GatewayMsgID,收到小於GatewayMsgID的Gateway Message可以丟棄不處理,否則就更新GatewayMsgID並根據上面邏輯進行處理。

之所以把Gateway Message和Heartbeat Message放在一個執行緒處理,是為了防止程序假死這種情況。

Broker也採用了分Partition分Replica機制,所以向Broker轉發Gateway Message時候路由規則,與Gateway向Router轉發訊息的路由規則相同。

另外啟動一個工具,當水平擴充套件後新啟動的Partition內所有Replica的狀態都是Running的時候,修改Registry路徑/pubsub/router/partition_num的值為所有Partition的數目。

7.4、Broker

Broker詳細流程如下:

1)Broker載入配置,獲取自身所在Partition的ID(假設為3);

2)向Registry路徑/pubsub/broker/partition3註冊,設定其狀態為Init,註冊中心返回的ID作為自身的ID(replicaID);

3)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;

4)從Registry路徑/pubsub/router/partition(x)下獲取每個Router Partition的各個replica;

5)啟動一個執行緒關注Registry路徑/pubsub/router,以實時獲取以下資訊:{Router Partition Number} -> 新的Router Partition(此時發生了擴容);  Partition內新的replica(Partition內發生了replica擴容);  Parition內某replica掛掉的資訊;

6)依據規則【RouterPartitionID % BrokerPartitionNum == BrokerPartitionID % BrokerPartitionNum,RouterReplicaID = BrokerReplicaID % BrokerPartitionNum】選定目標Router Partition下某個Router replica,向其傳送心跳訊息,包含BrokerPartitionNum、BrokerPartitionID、BrokerHostAddr和精確到秒級的Timestamp,並非同步等待所有Router replica的回覆,所有Router轉發來的Gateway Message放入GatewayMessageQueue;

7)依據規則【BrokerPartitionID == RoomID % BrokerParitionNum】從Database載入資料;

8)依據規則【BrokerPartitionID % BrokerParitionNum == RoomID % BrokerParitionNum】非同步處理GatewayMessageQueue內的Gateway Message,只留下合乎規則的訊息的資料;

9)修改Registry路徑/pubsub/broker/partition3下自身節點的狀態為Running;

10)啟動一個執行緒定時讀取Registry路徑/pubsub/router下各個子路徑的值,以定時輪詢的策略觀察Router各Partition的變動情況,作為實時策略的補充;定時檢查超時的Router,某Router超時後更換其所在的Partition內其他Router替換之,定時傳送心跳包;

11)當Registry路徑/pubsub/broker/partition_num的值BrokerPartitionNum發生改變的時候,依據規則【PartitionID == RoomID % PartitionNum】清洗本地路由資訊快取中每條資料;

12)接收Proxy發來的Room Message,依據RoomID從路由資訊快取中查詢Room有成員登陸的所有Gateway,把訊息轉發給這些Gateway;

13)Broker本地儲存每個Gateway的最大GatewayMsgID,收到小於GatewayMsgID的Gateway Message可以丟棄不處理,否則更新GatewayMsgID並根據上面邏輯進行處理。

BrokerPartitionNumber可以小於或者等於或者大於RouterPartitionNumber,兩個數應該均是2的冪,兩個叢集可以分別進行擴充套件,互不影響。譬如BrokerPartitionNumber=4而RouterPartitionNumber=2,則Broker Partition 3只需要向Router Partition 1的某個follower傳送心跳訊息即可;若BrokerPartitionNumber=4而RouterPartitionNumber=8,則Broker Partition 3需要向Router Partition 3的某個follower傳送心跳訊息的同時,還需要向Router Partition 7的某個follower傳送心跳,以獲取全量的Gateway Message。

Broker需要關注/pubsub/router/partitionnum和/pubsub/broker/partitionnum的值的變化,當router或者broker進行parition水平擴充套件的時候,Broker需要及時重新構建與Router之間的對應關係,及時變動傳送心跳的Router Replica物件【RouterPartitionID = BrokerReplicaID % RouterPartitionNum,RouterPartitionID為Router Replica在PartitionRouterReplicaArray陣列的下標】。

當Router Partition內replica死掉或者傳送心跳包的replica物件死掉(無論是註冊中心通知還是心跳包超時),broker要及時變動傳送心跳的Router replica物件。

另外,Gateway使用UDP通訊方式向Router傳送Gateway Message,如若這個Message丟失則此Gateway上該Room內所有成員一段時間內(當有新的成員在當前Gateway上加入Room 時會產生新的Gateway Message)都無法再接收訊息,為了保證訊息的可靠性,可以使用這樣一個約束解決問題:在此Gateway上登入的某Room內的人數少於3時,Gateway會把Gateway Message複製兩份非連續(如以10ms為時間間隔)重複傳送給某個Partition leader。因Gateway Message訊息處理的冪等性,重複Gateway Message並不會導致Room Message傳送錯誤,只在極少概率的情況下會導致Gateway收到訊息的時候Room內已經沒有成員在此Gateway登入,此時Gateway會把訊息丟棄不作處理。

傳遞實時訊息群聊訊息系統的Broker向特定Gateway轉發Room Message的時候,會帶上Room內在此Gateway上登入的使用者列表,Gateway根據這個使用者列表下發訊息時如果檢測到此使用者已經下線,在放棄向此使用者轉發訊息的同時,還應該把此使用者已經下線的訊息傳送給Router,當Router把這個訊息轉發給Broker後,Broker把此使用者從使用者列表中剔除。通過這種負反饋機制保證使用者狀態更新的及時性。

八、離線訊息的處理

8.1、簡述

前期的系統只考慮了使用者線上情況下實時訊息的傳遞,當用戶離線時其訊息便無法獲取。

若系統考慮使用者離線訊息傳遞,需要考慮如下因素:

1)訊息固化:保證使用者上線時收到其離線期間的訊息;

2)訊息有序:離線訊息和線上訊息都在一個訊息系統傳遞,給每個訊息分配一個ID以區分訊息先後順序,訊息順序越靠後則ID愈大。

離線訊息的儲存和傳輸,需要考慮使用者的狀態以及每條訊息的傳送狀態,整個訊息核心鏈路流程會有大的重構。

新訊息架構如下圖:

系統名詞解釋:

1)Pi : 訊息ID儲存模組,儲存每個人未傳送的訊息ID有序遞增集合;

2)Xiu : 訊息儲存KV模組,儲存每個人的訊息,給每個訊息分配ID,以ID為key,以訊息內為value;

3)Gateway Message(HB) : 使用者登入登出訊息,包括APP保活定時心跳(Hearbeat)訊息。

系統內部代號貔貅(貔貅者,雄貔雌貅),源自上面兩個新模組。

這個版本架構流程的核心思想為“訊息ID與訊息內容分離,訊息與使用者狀態分離”。訊息傳送流程涉及到模組 Client/Proxy/Pi/Xiu,訊息推送流程則涉及到模組 Pi/Xiu/Broker/Router/Gateway。

下面小節先細述Pi和Xiu的介面,然後再詳述傳送和推送流程。

8.2、Xiu

Xiu模組功能名稱是Message Storage,使用者快取和固化訊息,並給訊息分配ID。Xiu 叢集採用分 Partition 分 Replica 機制,Partition 初始數目須是2的倍數,叢集擴容時採用翻倍法。

8.2.1 儲存訊息

儲存訊息請求的引數列表為{SnowflakeID,UIN, Message},其流程如下:

1)接收客戶端發來的訊息,獲取訊息接收人ID(UIN)和客戶端給訊息分配的 SnowflakeID;

2)檢查 UIN % Xiu_Partition_Num == Xiu_Partition_ID % Xiu_Partition_Num 新增是否成立【即接收人的訊息是否應當由當前Xiu負責】,不成立則返回錯誤並退出;

3)檢查 SnowflakeID 對應的訊息是否已經被儲存過,若已經儲存過則返回其對應的訊息ID然後退出;

4)給訊息分配一個 MsgID:

每個Xiu有自己唯一的 Xiu_Partition_ID,以及一個初始值為 0 的 Partition_Msg_ID。MsgID = 1B[ Xiu_Partition_ID ] + 1B[ Message Type ] + 6B[ ++ Partition_Msg_ID ]。每次分配的時候 Partition_Msg_ID 都自增加一。

5)以 MsgID 為 key 把訊息存入基於共享記憶體的 Hashtable,並存入訊息的 CRC32 hash值和插入時間,把 MsgID 存入一個 LRU list 中:

LRU List 自身並不存入共享記憶體中,當程序重啟時,可以根據Hashtable中的資料重構出這個List。把訊息存入 Hashtable 中時,如果 Hashtable full,則依據 LRU List 對Hashtable 中的訊息進行淘汰。

6)把MsgID返回給客戶端;

7)把MsgID非同步通知給訊息固化執行緒,訊息固化執行緒根據MsgID從Hashtable中讀取訊息並根據CRC32 hash值判斷訊息內容是否完整,完整則把訊息存入本地RocksDB中。

8.2.2讀取訊息

讀取訊息請求的引數列表為{UIN, MsgIDList},其流程為:

1)獲取請求的 MsgIDList,判斷每個MsgID MsgID{Xiu_Partition_ID} == Xiu_Partition_ID 條件是否成立,不成立則返回錯誤並退出;

2)從 Hashtable 中獲取每個 MsgID 對應的訊息;

3)如果 Hashtable 中不存在,則從 RocksDB 中讀取 MsgID 對應的訊息;

4)讀取完畢則把所有獲取的訊息返回給客戶端。

8.2.3主從資料同步

目前從簡,暫定Xiu的副本只有一個。

Xiu節點啟動的時候根據自身配置檔案中分配的 Xiu_Partition_ID 到Registry路徑 /pubsub/xiu/partition_id 下進行註冊一個臨時有序節點,註冊成功則Registry會返回Xiu的節點 ID。

Xiu節點獲取 /pubsub/xiu/partition_id 下的所有節點的ID和地址資訊,依據 節點ID最小者為leader 的原則,即可判定自己的角色。只有leader可接受讀寫資料請求。

資料同步流程如下:

1)follower定時向leader傳送心跳資訊,心跳資訊包含本地最新訊息的ID;

2)leader啟動一個數據同步執行緒處理follower的心跳資訊,leader的資料同步執行緒從LRU list中查詢 follower_latest_msg_id 之後的N條訊息的ID,若獲取到則讀取訊息並同步給follower,獲取不到則回覆其與leader之間訊息差距太大;

3)follower從leader獲取到最新一批訊息,則儲存之;

4)follower若獲取leader的訊息差距太大響應,則請求leader的agent把RocksDB的固化資料全量同步過來,整理完畢後再次啟動與leader之間的資料同步流程。

follower會關注Registry路徑 /pubsub/xiu/partition_id 下所有所有節點的變化情況,如果leader掛掉則及時轉換身份並接受客戶端請求。如果follower 與 leader 之間的心跳超時,則follower刪掉 leader 的 Registry 路徑節點,及時進行身份轉換處理客戶端請求。

當leader重啟或者follower轉換為leader的時候,需要把 Partition_Msg_ID 進行一個大數值增值(譬如增加1000)以防止可能的訊息ID亂序情況。

8.2.4叢集擴容

Xiu 叢集擴容採用翻倍法,擴容時新 Partition 的節點啟動後工作流程如下:

1)向Registry的路徑 /pubsub/xiu/partition_id 下自己的 node 的 state 為 running,同時註冊自己的對外服務地址資訊;

2)另外啟動一個工具,當水平擴充套件後所有新啟動的 Partition 內所有 Replica 的狀態都是 Running 的時候,修改 Registry 路徑 /pubsub/xiu/partition_num 的值為擴容後 Partition 的數目。按照開頭的例子,即由2升級為4。

之所以 Xiu 不用像 Broker 和 Router 那樣啟動的時候向老的 Partition 同步資料,是因為每個 Xiu 分配的 MsgID 中已經帶有 Xiu 的 PartitionID 資訊,即使叢集擴容這個 ID 也不變,根據這個ID也可以定位到其所在的Partition,而不是藉助 hash 方法。

8.3、Pi

Pi 模組功能名稱是 Message ID Storage,儲存每個使用者的 MsgID List。Xiu 叢集也採用分 Partition 分 Replica 機制,Partition 初始數目須是2的倍數,叢集擴容時採用翻倍法。

8.3.1儲存訊息ID

MsgID 儲存的請求引數列表為{UIN,MsgID},Pi 工作流程如下:

1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;

2)把 MsgID 插入UIN的 MsgIDList 中,保持 MsgIDList 中所有 MsgID 不重複有序遞增,把請求內容寫入本地log,給請求者返回成功響應。

Pi有專門的日誌記錄執行緒,給每個日誌操作分配一個 LogID,每個 Log 檔案記錄一定量的寫操作,當檔案 size 超過配置的上限後刪除之。

8.3.2讀取訊息ID列表

讀取請求引數列表為{UIN, StartMsgID, MsgIDNum, ExpireFlag},其意義為獲取使用者 UIN 自起始ID為 StartMsgID 起(不包括 StartMsgID )的數目為 MsgIDNum 的訊息ID列表,ExpireFlag意思是 所有小於等於 StartMsgID 的訊息ID是否刪除。 

流程如下:

1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;

2)獲取 (StartID, StartMsgID + MsgIDNum] 範圍內的所有 MsgID,把結果返回給客戶端;

3)如果 ExpireFlag 有效,則刪除MsgIDList內所有在 [0, StartMsgID] 範圍內的MsgID,把請求內容寫入本地log。

8.3.3主從資料同步

同 Xiu 模組,暫定 Pi 的同 Parition 副本只有一個。

Pi 節點啟動的時候根據自身配置檔案中分配的 Pi_Partition_ID 到Registry路徑 /pubsub/pi/partition_id 下進行註冊一個臨時有序節點,註冊成功則 Registry 會返回 Pi 的節點 ID。

Pi 節點獲取 /pubsub/pi/partition_id 下的所有節點的ID和地址資訊,依據 節點ID最小者為leader 的原則,即可判定自己的角色。只有 leader 可接受讀寫資料請求。

資料同步流程如下:

1)follower 定時向 leader 傳送心跳資訊,心跳資訊包含本地最新 LogID;

2)leader 啟動一個數據同步執行緒處理 follower 的心跳資訊,根據 follower 彙報的 logID 把此 LogID;

3)follower 從 leader 獲取到最新一批 Log,先儲存然後重放。

follower 會關注Registry路徑 /pubsub/pi/partition_id 下所有節點的變化情況,如果 leader 掛掉則及時轉換身份並接受客戶端請求。如果follower 與 leader 之間的心跳超時,則follower刪掉 leader 的 Registry 路徑節點,及時進行身份轉換處理客戶端請求。

8.3.4叢集擴容

Pi 叢集擴容採用翻倍法。則節點啟動後工作流程如下:

1)向 Registry 註冊,獲取 Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber;

2)如果發現自己 PartitionID 滿足條件 PartitionID >= PartitionNumber 時,則意味著當前 Partition 是擴容後的新叢集,更新 Registry 中自己狀態為start;

3)讀取 Registry 路徑 /pubsub/xiu 下所有 Parition 的 leader,根據條件 自身PartitionID % PartitionNumber == PartitionID % PartitionNumber 尋找對應的老 Partition 的 leader,稱之為 parent_leader;

4)快取收到 Proxy 轉發來的使用者請求;

5)向 parent_leader 獲取log;

6)向 parent_leader 同步記憶體資料;

7)重放 parent_leader 的log;

8)更新 Registry 中自己的狀態為 Running;

9)重放使用者請求;

10)當 Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber 滿足條件 PartitionID >= PartitionNumber 時,意味著擴容完成,處理使用者請求時要給使用者返回響應。

Proxy 會把讀寫請求參照條件 UIN % Pi\_Partition\_Num == Pi\_Partition\_ID % Pi\_Partition\_Num 向相關 partition 的 leader 轉發使用者請求。假設原來 PartitionNumber 值為2,擴容後值為4,則原來轉發給 partition0 的寫請求現在需同時轉發給 partition0 和 partition2,原來轉發給 partition1 的寫請求現在需同時轉發給 partition1 和 partition3。

另外啟動一個工具,當水平擴充套件後所有新啟動的 Partition 內所有 Replica 的狀態都是 Running 的時候,修改Registry路徑/pubsub/xiu/partition_num的值為擴容後 Partition 的數目。

8.4、資料傳送流程

訊息自 PiXiu 的外部客戶端(Client,服務端所有使用 PiXiu 提供的服務者統稱為客戶端)按照一定負載均衡規則傳送到 Proxy,然後存入 Xiu 中,把 MsgID 存入 Pi 中。

其詳細流程如下:

1)Client 依據 snowflake 演算法給訊息分配 SnowflakeID,依據 ProxyID = UIN % ProxyNum 規則把訊息發往某個 Proxy;

2)Proxy 收到訊息後轉發到 Xiu;

3)Proxy 收到 Xiu 返回的響應後,把響應轉發給 Client;

4)如果 Proxy 收到 Xiu 返回的響應帶有 MsgID,則發起 Pi 寫流程,把 MsgID 同步到 Pi 中;

5)如果 Proxy 收到 Xiu 返回的響應帶有 MsgID,則給 Broker 傳送一個 Notify,告知其某 UIN 的最新 MsgID。

8.5、資料轉發流程

轉發訊息的主體是Broker,原來的線上訊息轉發流程是它收到 Proxy 轉發來的 Message,然後根據使用者是否線上然後轉發給 Gateway。

PiXiu架構下 Broker 會收到以下型別訊息:

1)使用者登入訊息;

2)使用者心跳訊息;

3)使用者登出訊息;

4)Notify 訊息;

5)Ack 訊息。

Broker流程受這五種訊息驅動,下面分別詳述其收到這五種訊息時的處理流程。

使用者登入訊息流程如下:

1)檢查使用者的當前狀態,若為 OffLine 則把其狀態值為線上 OnLine;

2)檢查使用者的待發送訊息佇列是否為空,不為空則退出;

3)向 Pi 模組傳送獲取 N 條訊息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設定使用者狀態為 GettingMsgIDList 並等待迴應;

4)根據 Pi 返回的訊息 ID 佇列,向 Xiu 發起獲取訊息請求 {UIN: uin, MsgIDList: msg ID List},設定使用者狀態為 GettingMsgList 並等待迴應;

5)Xiu 返回訊息列表後,設定狀態為 SendingMsg,並向 Gateway 轉發訊息。

可以把使用者心跳訊息當做使用者登入訊息處理。

Gateway的使用者登出訊息產生有三種情況:

1)使用者主動退出;

2)使用者心跳超時;

3)給使用者轉發訊息時發生網路錯誤。

使用者登出訊息處理流程如下:

1)檢查使用者狀態,如果為 OffLine,則退出;

2)使用者狀態不為 OffLine 且檢查使用者已經發送出去的訊息列表的最後一條訊息的 ID(LastMsgID),向 Pi 傳送獲取 MsgID 請求{UIN: uin, StartMsgID: LastMsgID, MsgIDNum: 0, ExpireFlag: True},待 Pi 返回響應後退出。

處理 Proxy 發來的 Notify 訊息處理流程如下:

1)如果使用者狀態為 OffLine,則退出;

2)更新使用者的最新訊息 ID(LatestMsgID),如果使用者傳送訊息佇列不為空則退出;

3)向 Pi 模組傳送獲取 N 條訊息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設定使用者狀態為 GettingMsgIDList 並等待迴應;

4)根據 Pi 返回的訊息 ID 佇列,向 Xiu 發起獲取訊息請求 {UIN: uin, MsgIDList: msg ID List},設定使用者狀態為 GettingMsgList 並等待迴應;

5)Xiu 返回訊息列表後,設定狀態為 SendingMsg,並向 Gateway 轉發訊息。

所謂 Ack 訊息,就是 Broker 經 Gateway 把訊息轉發給 App 後,App 給Broker的訊息回覆,告知Broker其最近成功收到訊息的 MsgID。 

Ack 訊息處理流程如下:

1)如果使用者狀態為 OffLine,則退出;

2)更新 LatestAckMsgID 的值;

3)如果使用者傳送訊息佇列不為空,則傳送下一個訊息後退出;

4)如果 LatestAckMsgID >= LatestMsgID,則退出;

5)向 Pi 模組傳送獲取 N 條訊息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設定使用者狀態為 GettingMsgIDList 並等待迴應;

6)根據 Pi 返回的訊息 ID 佇列,向 Xiu 發起獲取訊息請求 {UIN: uin, MsgIDList: msg ID List},設定使用者狀態為 GettingMsgList 並等待迴應;

7)Xiu 返回訊息列表後,設定狀態為 SendingMsg,並向 Gateway 轉發訊息。

總體上,PiXiu 轉發訊息流程採用拉取(pull)轉發模型,以上面五種訊息為驅動進行狀態轉換,並作出相應的動作行為。

九、本文總結

這套群聊訊息系統尚有以下task list需完善:

1)訊息以UDP鏈路傳遞,不可靠【2018/01/29解決之】;

2)目前的負載均衡演算法採用了極簡的RoundRobin演算法,可以根據成功率和延遲新增基於權重的負載均衡演算法實現;

3)只考慮傳遞,沒有考慮訊息的去重,可以根據訊息ID實現這個功能【2018/01/29解決之】;

4)各個模組之間沒有考慮心跳方案,整個系統的穩定性依賴於Registry【2018/01/17解決之】;

5)離線訊息處理【2018/03/03解決之】;

6)區分訊息優先順序。

此記。

參考文件:一種支援自由規劃無須資料遷移和修改路由程式碼的Replicaing擴容方案

十、本文成文歷程

於雨氏,2017/12/31,初作此文於豐臺金箱堂。

於雨氏,2018/01/16,於海淀新增“系統穩定性”一節。

於雨氏,2018/01/29,於海淀新增“訊息可靠性”一節。

於雨氏,2018/02/11,於海淀新增“Router”一節,並重新格式化全文。

於雨氏,2018/03/05,於海淀新增“PiXiu”一節。

於雨氏,2018/03/14,於海淀新增負反饋機制、根據Gateway Message ID保證Gateway Message資料一致性 和 Gateway使用者退出訊息產生機制 等三個細節。

於雨氏,2018/08/05,於海淀新增 “pipeline” 一節。

於雨氏,2018/08/28,於海淀新增 “大房間訊息處理” 一節。

附錄:更多IM架構設計文章

淺談IM系統的架構設計

簡述移動端IM開發的那些坑:架構設計、通訊協議和客戶端

一套海量線上使用者的移動端IM架構設計實踐分享(含詳細圖文)

一套原創分散式即時通訊(IM)系統理論架構方案

從零到卓越:京東客服即時通訊系統的技術架構演進歷程

蘑菇街即時通訊/IM伺服器開發之架構選擇

騰訊QQ1.4億線上使用者的技術挑戰和架構演進之路PPT

微信後臺基於時間序的海量資料冷熱分級架構設計實踐

微信技術總監談架構:微信之道——大道至簡(演講全文)

如何解讀《微信技術總監談架構:微信之道——大道至簡》

快速裂變:見證微信強大後臺架構從0到1的演進歷程(一)

17年的實踐:騰訊海量產品的技術方法論

移動端IM中大規模群訊息的推送如何保證效率、實時性?

現代IM系統中聊天訊息的同步和儲存方案探討

IM開發基礎知識補課(二):如何設計大量圖片檔案的服務端儲存架構?

IM開發基礎知識補課(三):快速理解服務端資料庫讀寫分離原理及實踐建議

IM開發基礎知識補課(四):正確理解HTTP短連線中的Cookie、Session和Token

WhatsApp技術實踐分享:32人工程團隊創造的技術神話

微信朋友圈千億訪問量背後的技術挑戰和實踐總結

王者榮耀2億使用者量的背後:產品定位、技術架構、網路方案等

IM系統的MQ訊息中介軟體選型:Kafka還是RabbitMQ?

騰訊資深架構師乾貨總結:一文讀懂大型分散式系統設計的方方面面

以微博類應用場景為例,總結海量社交系統的架構設計步驟

快速理解高效能HTTP服務端的負載均衡技術原理

子彈簡訊光鮮的背後:網易雲信首席架構師分享億級IM平臺的技術實踐

知乎技術分享:從單機到2000萬QPS併發的Redis高效能快取實踐之路

IM開發基礎知識補課(五):通俗易懂,正確理解並用好MQ訊息佇列

微信技術分享:微信的海量IM聊天訊息序列號生成實踐(演算法原理篇)

微信技術分享:微信的海量IM聊天訊息序列號生成實踐(容災方案篇)

新手入門:零基礎理解大型分散式架構的演進歷史、技術原理、最佳實踐

一套高可用、易伸縮、高併發的IM群聊架構方案設計實踐

>> 更多同類文章 ……

(本文同步釋出於:http://www.52im.net/thread-2015-1-1.html