個推基於 Apache Pulsar 的優先順序佇列方案
作者:個推平臺研發工程師 祥子
一、業務背景
在個推的推送場景中,訊息佇列在整個系統中佔有非常重要的位置。 當 APP 有推送需求的時候, 會向個推傳送一條推送命令,接到推送需求後,我們會把APP要求推送訊息的使用者放入下發佇列中,進行訊息下發;當同時有多個APP進行訊息下發時,難免會出現資源競爭的情況, 因此就產生了優先順序佇列的需求,在下發資源固定的情況下, 高優先順序的使用者需要有更多的下發資源。
二、基於 Kafka 的優先順序佇列方案
針對以上場景,個推基於 Kafka 設計了第一版的優先順序佇列方案。Kafka 是 LinkedIn 開發的一個高效能、分散式訊息系統;Kafka 在個推有非常廣泛的應用,如日誌收集、線上和離線訊息分發等。
架構 在該方案中,個推將優先順序統一設定為高、中、低三個級別。具體操作方案如下:
-
對某個優先順序根據 task (單次推送任務)維度,存入不同的 Topic,一個 task 只寫入一個 Topic,一個 Topic 可存多個 task;
-
消費模組根據優先順序配額(如 6:3:1),獲取不同優先順序的訊息數,同一優先順序輪詢獲取訊息;這樣既保證了高優先順序使用者可以更快地傳送訊息,又避免了低優先順序使用者出現沒有下發的情況。
Kafka 方案遇到的問題
隨著個推業務的不斷髮展,接入的 APP 數量逐漸增多,第一版的優先順序方案也逐漸暴露出一些問題:
- 當相同優先順序的 APP 在同一時刻推送任務越來越多時,後面進入的 task 訊息會因為前面 task 訊息還存在佇列情況而出現延遲。如下圖所示, 當 task1 訊息量過大時,在task1 消費結束前,taskN 將一直處於等待狀態。
- Kafka 在 Topic 數量由 64 增長到 256 時,吞吐量下降嚴重,Kafka 的每個 Topic、每個分割槽都會對應一個物理檔案。當 Topic 數量增加時,訊息分散的落盤策略會導致磁碟 IO 競爭激烈,因此我們不能僅通過增加 Topic 數量來緩解第一點中的問題。
基於上述問題,個推進行了新一輪的技術選型, 我們需要可以建立大量的 Topic, 同時吞吐效能不能比 Kafka 遜色。經過一段時間的調研,Apache Pulsar 引起了我們的關注。
三、為什麼是 Pulsar
Apache Pulsar 是一個企業級的分散式訊息系統,最初由 Yahoo 開發,在 2016 年開源,並於2018年9月畢業成為 Apache 基金會的頂級專案。Pulsar 已經在 Yahoo 的生產環境使用了三年多,主要服務於Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 儲存)。
架構
Topic 數量 Pulsar 可以支援百萬級別 Topic 數量的擴充套件,同時還能一直保持良好的效能。Topic 的伸縮性取決於它的內部組織和儲存方式。Pulsar 的資料儲存在 bookie (BookKeeper 伺服器)上,處於寫狀態的不同 Topic 的訊息,在記憶體中排序,最終聚合儲存到大檔案中,在 Bookie 中需要更少的檔案控制代碼。另一方面 Bookie 的 IO 更少依賴於檔案系統的 Pagecache,Pulsar 也因此能夠支援大量的主題。
消費模型 Pulsar 支援三種消費模型:Exclusive、Shared 和Failover。 Exclusive (獨享):一個 Topic 只能被一個消費者消費。Pulsar 預設使用這種模式。
Shared(共享):共享模式,多個消費者可以連線到同一個 Topic,訊息依次分發給消費者。當一個消費者宕機或者主動斷開連線時,那麼分發給這個消費者的未確認(ack)的訊息會得到重新排程,分發給其他消費者。
Failover (災備):一個訂閱同時只有一個消費者,可以有多個備份消費者。一旦主消費者故障,則備份消費者接管。不會出現同時有兩個活躍的消費者。
Exclusive和Failover訂閱,僅允許一個消費者來使用和消費每個訂閱的Topic。這兩種模式都按 Topic 分割槽順序使用訊息。它們最適用於需要嚴格訊息順序的流(Stream)用例。
Shared 允許每個主題分割槽有多個消費者。同一個訂閱中的每個消費者僅接收Topic分割槽的一部分訊息。Shared最適用於不需要保證訊息順序佇列(Queue)的使用模式,並且可以按照需要任意擴充套件消費者的數量。
儲存 Pulsar 引入了 Apache BookKeeper 作為儲存層,BookKeeper 是一個專門為實時系統優化過的分散式儲存系統,具有可擴充套件、高可用、低延遲等特性。具體介紹,請參考 BookKeeper官網。
Segment BookKeeper以 Segment (在 BookKeeper 內部被稱作 ledger) 作為儲存的基本單元。從 Segment 到訊息粒度,都會均勻分散到 BookKeeper 的叢集中。這種機制保證了資料和服務均勻分散在 BookKeeper 叢集中。
Pulsar 和 Kafka 都是基於 partition 的邏輯概念來做 Topic 儲存的。最根本的不同是,Kafka 的物理儲存是以 partition 為單位的,每個 partition 必須作為一個整體(一個目錄)儲存在某個 broker 上。 而 Pulsar 的 partition 是以 segment 作為物理儲存的單位,每個 partition 會再被打散並均勻分散到多個 bookie 節點中。
這樣的直接影響是,Kafka 的 partition 的大小,受制於單臺 broker 的儲存;而 Pulsar 的 partition 則可以利用整個叢集的儲存容量。
擴容 當 partition 的容量達到上限後,需要擴容的時候,如果現有的單臺機器不能滿足,Kafka 可能需要新增新的儲存節點,並將 partition 的資料在節點之間搬移達到 rebalance 的狀態。
而 Pulsar 只需新增新的 Bookie 儲存節點即可。新加入的節點由於剩餘空間大,會被優先使用,接收更多的新資料;整個擴容過程不涉及任何已有資料的拷貝和搬移。
Broker 故障 Pulsar 在單個節點失敗時也會體現同樣的優勢。如果 Pulsar 的某個服務節點 broker 失效,由於 broker 是無狀態的,其他的 broker 可以很快接管 Topic,不會涉及 Topic 資料的拷貝;如果儲存節點 Bookie 失效,在集群后臺中,其他的 Bookie 會從多個 Bookie 節點中併發讀取資料,並對失效節點的資料自動進行恢復,對前端服務不會造成影響。
Bookie 故障 Apache BookKeeper 中的副本修復是 Segment (甚至是 Entry)級別的多對多快速修復。這種方式只會複製必須的資料,這比重新複製整個主題分割槽要精細。如下圖所示,當錯誤發生時, Apache BookKeeper 可以從 bookie 3 和 bookie 4 中讀取 Segment 4 中的訊息,並在 bookie 1 處修復 Segment 4。所有的副本修復都在後臺進行,對 Broker 和應用透明。
當某個 Bookie 節點出錯時,BookKeeper會自動新增可用的新 Bookie 來替換失敗的 Bookie,出錯的 Bookie 中的資料在後臺恢復,所有 Broker 的寫入不會被打斷,而且不會犧牲主題分割槽的可用性。
四、基於 Pulsar 的優先順序佇列方案
在設計思路上,Pulsar 方案和 Kafka 方案並沒有多大區別。但在新方案中,個推技術團隊藉助 Pulsar 的特性,解決了 Kafka 方案中存在的問題。
- 根據 task 動態生成 Topic,保證了後進入的 task 不會因為其他 task 訊息堆積而造成等待情況。
- 中高優先順序 task 都獨享一個 Topic,低優先順序 task 共享 n 個 Topic。
- 相同優先順序內,各個 task 輪詢讀取訊息,配額滿後流轉至下一個優先順序。
- 相同優先順序內, 各個 task 可動態調整 quota, 在相同機會內,可讀取更多訊息。
- 利用 Shared 模式, 可以動態新增刪除 consumer,且不會觸發 Rebalance 情況。
- 利用 BookKeeper 特性,可以更靈活的新增儲存資源。
五、Pulsar 其他實踐
- 不同 subscription 之間相對獨立,如果想要重複消費某個 Topic 的訊息,需要使用不同的 subscriptionName 訂閱;但是一直增加新的 subscriptionName,backlog 會不斷累積。
- 如果 Topic 無人訂閱,發給它的訊息預設會被刪除。因此如果 producer 先發送,consumer 後接收,一定要確保 producer 傳送之前,Topic 有 subscription 存在(哪怕 subscribe 之後 close 掉),否則這段時間傳送的訊息會導致無人處理。
- 如果既沒有人傳送訊息,又沒有人訂閱訊息,一段時間後 Topic 會自動刪除。
- Pulsar 的 TTL 等設定,是針對整個 namespace 起效的,無法針對單個 Topic。
- Pulsar 的鍵都建立在 zookeeper 的根目錄上,在初始化時建議增加總節點名。
- 目前 Pulsar 的 java api 設計,訊息預設需要顯式確認,這一點跟 Kafka 不一樣。
- Pulsar dashboard 上的 storage size 和 prometheus 上的 storage size (包含副本大小)概念不一樣。
- 把
dbStorage_rocksDB_blockCacheSize
設定的足夠大;當訊息體量大,出現backlog 大量堆積時, 使用預設大小(256M)會出現讀耗時過大情況,導致消費變慢。 - 使用多 partition,提高吞吐。
- 在系統出現異常時,主動抓取 stats 和 stats-internal,裡面有很多有用資料。
- 如果業務中會出現單 Topic 體量過大的情況,建議把
backlogQuotaDefaultLimitGB
設定的足夠大(預設10G), 避免因為預設使用producer_request_hold
模式出現 block producer 的情況;當然可以根據實際業務選擇合適的backlogQuotaDefaultRetentionPolicy
。 - 根據實際業務場景主動選擇 backlog quota。
- prometheus 內如果發現讀耗時為空情況,可能是因為直接讀取了快取資料;Pulsar 在讀取訊息時會先讀取 write cache, 然後讀取 read cache;如果都沒有命中, 則會在 RocksDB 中讀取條目位子後,再從日誌檔案中讀取該條目。
- 寫入訊息時, Pulsar 會同步寫入 journal 和 write cache;write cache 再非同步寫入日誌檔案和 RocksDB; 所以有資源的話,建議 journal 盤使用SSD。
六、總結
現在, 個推針對優先順序中介軟體的改造方案已經在部分現網業務中試執行,對於 Pulsar 的穩定性,我們還在持續關注中。 作為一個2016 年才開源的專案,Pulsar 擁有非常多吸引人的特性,也彌補了其他競品的短板,例如跨地域複製、多租戶、擴充套件性、讀寫隔離等。儘管在業內使用尚不廣泛, 但從現有的特性來說, Pulsar 表現出了取代 Kafka 的趨勢。在使用 Pulsar 過程中,我們也遇到了一些問題, 在此特別感謝翟佳和郭斯傑(兩位均為 Stream Native 的核心工程師、開源專案 Apache Pulsar 的 PMC 成員)給我們提供的支援和幫助。
參考文獻:
[1] 比拼 Kafka, 大資料分析新秀Pulsar 到底好在哪(https://www.infoq.cn/article/1UaxFKWUhUKTY1t_5gPq)
[2] 開源實時資料處理系統Pulsar:一套搞定Kafka+Flink+DB(https://juejin.im/post/5af41436