Pulsar雲原生分散式訊息和流平臺
Pulsar雲原生分散式訊息和流平臺
Apache Pulsar是一個雲原生的分散式訊息和流媒體平臺,最初創建於雅虎!現在是Apache軟體基金會的頂級專案
官網首頁列舉一些關鍵特性和目前使用公司包括國內深度合作騰訊,目前最新版本為2.8.0,背後的開源流資料公司 StreamNative,2019年創立一家公司,作為雲原生時代專注技術細分領域的佼佼者
什麼是Pulsar
Pulsar即可以支援queue模式的訊息中介軟體比如RabbitMQ和RocketMQ,也可以支援stream流模式的Kafka,幾乎涵蓋訊息應用的領域,加上豐富企業特性如多租戶隔離、百萬級Topics、跨地域複製、鑑權認證,是雲原生時代其他訊息中介軟體的演化或者說是替代品也不為過
部署模式
支援多種部署模式,比如本地開發測試環境下單機執行環境,生產使用叢集部署或多叢集部署,還有基於容器化的Docker和K8s部署等
概覽
Pulsar 是一個用於伺服器到伺服器的訊息系統,具有多租戶、高效能等優勢。 Pulsar 最初由 Yahoo 開發,目前由 Apache 軟體基金會管理。
Pulsar 的關鍵特性如下:
Pulsar 的單個例項原生支援多個叢集,可跨機房在叢集間無縫地完成訊息複製。
極低的釋出延遲和端到端延遲。
可無縫擴充套件到超過一百萬個 topic。
支援多種 topic 訂閱模式
(獨佔訂閱、共享訂閱、故障轉移訂閱)。通過 Apache BookKeeper 提供的持久化訊息儲存機制保證訊息傳遞 。
- 由輕量級的 serverless 計算框架 Pulsar Functions 實現流原生的資料處理。
基於 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得資料更易移入、移出 Apache Pulsar。
分層式儲存可在資料陳舊時,將資料從熱儲存解除安裝到冷/長期儲存(如S3、GCS)中。
Pulsar需要解決問題
- 企業需求和資料規模
- 多租戶-百萬Topics-低延遲-持久化-跨地域複製
- 解除儲存計算耦合
- 運維痛點:替換機器、服務擴容、資料rebalance
- 減少檔案系統依賴
- 效能難保障:持久化、一致性、多Topic
- IO不隔離:消費者度Backlog的時候會影響其他生產者和消費者,Kakfa採用順序寫的機制提升效能,當Topic和分割槽數大量增大後便會退化為隨機寫而極大減低起IO效能
架構
Pulsar底層最為關鍵技術是採用儲存和計算分離以及分層+分片的架構,節點是對等的可以獨立擴充套件並支援靈活擴容和快速容錯機制,這也是為什麼說Pulsar是雲原生架構的主要原因;
Pulsar企業級儲存層採用的是Apache BookKeeper持久化儲存。 BookKeeper是一個分散式的預寫日誌(WAL)系統,滿足低延遲、高吞吐、持久化、強一致性、高可用、I/O隔離,元資料服務基於rocksdb legder儲存,而基於New Sql新一代分散式關係資料庫TiDb的k-v儲存節點底層也是採用 效能非常強大單機儲存引擎rocksdb,關於 BookKeeper我們本篇不做延展介紹,後續有時間再單獨闡述。
Broker作為計算層依靠Zookeeper作為生產者和消費者的橋樑,天然屬於無狀態的服務,擴容通過服務發現自動動態感知;另一方面底層是分散式儲存,因此擴容直接新增儲存節點即可,原來在Kafka擴容節點後,如果沒有屬於該節點的分割槽資料則擴容節點是無法起作用的,需要做分割槽管理或rebalance,而在Pulsar中新增加節點則會實時增加資料進來,這個得益於Pulsar的架構設計,採用分層+分片的邏輯儲存概念,每一塊儲存是可以儲存不同Topic不同分割槽的資料,然後依賴於索引系統原理實現檢索;儲存節點出現故障後由於是對等架構,分散式儲存有多副本機制,所以可繼續提供正常服務且也不需要立即進行故障轉移,可以在合適時機再做副本遷移,所以對於應用來說是無感知的
Pulsar穩定的IO質量底層機制
Ledger是一個只追加的資料結構,並且只有一個寫入器,這個寫入器負責多個BookKeeper儲存節點(就是Bookies)的寫入。 Ledger的條目會被複制到多個bookies。 Ledgers本身有著非常簡單的語義:
- Pulsar Broker可以建立ledeger,新增內容到ledger和關閉ledger。
- 當一個ledger被關閉後,除非明確的要寫資料或者是因為寫入器掛掉導致ledger關閉,這個ledger只會以只讀模式開啟。
- 最後,當ledger中的條目不再有用的時候,整個legder可以被刪除(ledger分佈是跨Bookies的)。
Ledger讀一致性
BookKeeper的主要優勢在於他能在有系統故障時保證讀的一致性。 由於Ledger只能被一個程序寫入(之前提的寫入器程序),這樣這個程序在寫入時不會有衝突,從而寫入會非常高效。 在一次故障之後,ledger會啟動一個恢復程序來確定ledger的最終狀態並確認最後提交到日誌的是哪一個條目。 在這之後,能保證所有的ledger讀程序讀取到相同的內容。
Managed ledgers
Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即訊息流的抽象,有一個寫入器程序不斷在流結尾新增訊息,並且有多個cursors 消費這個流,每個cursor有自己的消費位置。
Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:
- 在故障之後,原有的某個ledger不能再寫了,需要建立一個新的。
- A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.
日誌儲存
In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要確保描述這個更新的事務被寫到持久(非易失)儲存上面。 在bookie啟動和舊的日誌檔案大小達到上限(由
journalMaxSizeMB
引數配置)的時候,新的日誌檔案會被建立。
Palsar Schema
啟用 schema 後,Pulsar 會解析資料,即接收位元組作為輸入併發送位元組作為輸出。 雖然資料不僅是位元組,但的確需要解析這些資料,解析時還可能發生解析異常,解析異常主要出現在以下幾種情況中:
- 欄位不存在
- 欄位型別已更改(例如,將
string
更改為int
)
簡單來說,當我們使用 schema 去建立 producer 生產者則不再需要將訊息序列化為位元組,因為Pulsar schema 會在後臺幫我們執行序列化操作。
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User("Tom", 28);
producer.send(user);
Parsar Functions
Pulsar Functions 是輕量級計算流程,具有以下特點:
- 從一個或多個 Pulsar topic 中消費訊息;
- 將使用者提供的處理邏輯應用於每條訊息;
- 將執行結果釋出到另一個 topic。
- Pulsar Functions可以看做是一種程式設計模型,背後的核心目標是使您能夠輕鬆建立各種級別的複雜的的處理邏輯,而無需部署單獨的類似系統(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions are computing infrastructure of Pulsar messaging system. The core goal is tied to a series of other goals:
- 提高開發者的生產力(用開發者熟悉的語言和Pulsar Function 的函式SDK)
- 簡單的故障排查
- 操作簡單(不需要外部處理系統)
Function 抽象,計算物件是訊息,Function 將收到訊息進行計算執行業務邏輯並寫進 Output topic,Function 為開發者提供了很多便利,簡單的計算都可以通過 Function 完成。可以將 Function 結合起來,由此提出一個新概念 Function Mesh,其主要基於 K8s 開發。
package org.example.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
// This function is invoked every time a message is published to the input topic
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return null;
}
}
將上面的程式碼編譯成可部署的 JAR 檔案,可以使用如下命令列將 JAR 包部署到 Pulsar 叢集中。
$ bin/pulsar-admin functions create \
--jar target/my-jar-with-dependencies.jar \
--classname org.example.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count \
--inputs persistent://public/default/sentences \
--output persistent://public/default/count
Pulsar IO
Pulsar IO聯結器使您能夠輕鬆建立、部署和管理與外部系統互動的聯結器,如Apache Cassandra、Aerospike等。
可以通過 Connector Admin CLI並結合 sources 和 sinks 子命令來管理 Pulsar 聯結器(例如,建立、更新、啟動、停止、重啟、過載、刪除以及其他操作)。
聯結器(sources 和 sinks)和 Functions 是例項的組成部分,都在 Functions workers 上執行。 通過 Connector Admin CLI 或 Functions Admin CLI 管理 source、sink 或者 function 時,在 worker 上就啟動了一個例項。 瞭解更多資訊,參閱 Functions worker。
Pulsar SQL
Apache Pulsar 用於儲存事件資料流,事件資料結構由預定義欄位組成。 藉助 Schema Registry 的實現,你可以在 Pulsar 中儲存結構化資料,並通過使用Trino(原先叫 Presto SQL)查詢這些資料。
作為 Pulsar SQL 的核心,Presto Pulsar 聯結器支援 Presto 叢集中的 Presto worker 查詢 Pulsar 資料。
查詢效能高效且高度可擴充套件,這得益於 Pulsar 的 分層分片架構。
Pulsar 中的主題以分片形式儲存在 Apache BookKeeper 中。 每個主題分片會被複制到多個 BookKeeper 節點,可以支撐併發讀和高吞吐。 你可以配置 BookKeeper 節點的數量,預設節點數是 3
。 在 Presto Pulsar 聯結器中,資料直接從 BookKeeper 讀取,所以 Presto worker 能從水平擴充套件的 BookKeeper 節點中併發讀取資料。
Tiered Storage
Pulsar 的分層儲存 功能允許將歷史 backlog 資料從 BookKeeper 中轉移到更加低廉的儲存介質中並且允許客戶端訪問無變化的 backlog 資料。
-
分層儲存通過 Apache jclouds 來實現在 Amazon S3 和 GCS (Google Cloud Storage) 進行歸檔儲存。
通過 jclouds,可以在未來便捷地去擴充套件對其他雲端儲存的支援。
-
分層儲存通過 Apache Hadoop 來實現在檔案系統中進行歸檔儲存。
通過 Hadoop,可以在未來便捷地去擴充套件對其他檔案系統的支援。
Transactions
Pulsar事務 (txn) 使事件流應用程式能夠在一個原子操作中消費、處理和生成訊息。
Pulsar事務支援端到端的恰好一次流處理,這意味著訊息不會從源運算元(source operator)丟失,並且訊息不會重複發給接收運算元(sink operator)。
隨著Pulsar 2.8.0中引入的事務,Pulsar Flink接收器聯結器可以通過實現指定的 TwoPhaseCommitSinkFunction
並使用Pulsar事務 API 連線 Flink 接收器訊息生命週期來支援exactly-once語義
Pulsar 周邊和生態
Pulsar 作為一個流原生訊息平臺,主要包括儲存(Stream Storage)、訊息(Messaging)、計算(Processing)三個方面的工作。
Messaging 是 Pulsar 誕生之初的一個主要方向。通過 Pulsar IO 和外部系統打。
下圖藍色 Processing 方面 Queries 的引擎比如 Presto 和 HIVE 進行深度整合,讓 Presto 和 HIVE 能夠直接讀取 Pulsar 的 topic ,再結合 Pulsar 本身自帶的 Schema,將 Pulsar topic 作為的一個表直接查詢 Pulsar topic 中的資料。
在 Streaming & Batch Processing 方面,與大資料處理引擎包括 Storm、Flink、Spark 進行深度整合。
在 Processing 方向的思路是與現有的大資料生態做深度的融合,讓大資料生態能夠更好地訪問 Pulsar,把 Pulsar 當作資料的儲存引擎。
除此之外,Pulsar 推出了 Pulsar Function — 一個輕量級的計算框架。Pulsar Function 可以減輕很多資料的傳輸,可以靠近資料端完成計算,目前很多 IoT 場景的使用者如塗鴉智慧、EMQ、中國電信及一些車聯網公司都在使用 Pulsar Function。
除了 Messaging 和 Processing ,Pulsar 擁有一個很堅實的基礎,就是擁有專門為訊息、流儲存而設計的儲存引擎 Apache BookKeeper。結合 Pulsar 對分割槽再分片的儲存特性,我們很自然地把老的分片遷移到二級儲存中,所以 Pulsar 的架構很容支援二級儲存。二級儲存的介質包括雲上的各種資源:S3、HDFS
訊息領域
很多使用者在使用 Pulsar 的過程中,會發現客戶端應用的改造和遷移會很難落地。比如 Kafka 往 Pulsar 遷移過程中,客戶很可能也會有大量基於 Kafka Clients 的應用需要更改,由於需要更改協議導致遷移很困難。
由於短時間不可能完全從 Kafka 遷移到 Pulsar,導致對後臺的運維甚至整個業務的切換帶來很大的不便捷性。
Pulsar 和 Kafka 一樣是以 topic 作為基礎,以 log 作為抽象,Pulsar 的一致性、延遲、吞吐會更優,在這個基礎上要複用 Pulsar 的儲存層,在 Broker 端實現協議的解析,使用者的切換成本更低。Pulsar Broker 端提供 Protocol Handler 外掛(現在已經實現 Kafka、AMQP、MQTT 協議的支援)的方式來支援多種協議。這種在 Broker 端做協議解析的方法,可以更方便地支援多種協議。其次還利用 Pulsar 在儲存層擁有儲存、計算分離的優勢,服務上層多種協議。
KoP(Kafka on Pulsar)
目前 StreamNative 聯合合作伙伴已推出了 KoP 專案,主要滿足想要從 Kafka 應用程式切換到 Pulsar 的使用者的強烈需求。
KoP 將 Kafka 協議處理外掛引入 Pulsar broker,從而實現 Apache Pulsar 對原生 Apache Kafka 協議的支援。將 KoP 協議處理外掛新增到現有 Pulsar 集群后,使用者不用修改程式碼就可以將現有的 Kafka 應用程式和服務遷移到 Pulsar,從而使用 Pulsar 的強大功能。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-9i0cziCO-1629624254691)(http://www.itxiaoshen.com:3001/assets/1629618989303RWFe8zct.png)]
KoP 相關特性:
- Broker 的外掛,Client 不需要做任何的改動;
- 共享訪問;
- 支援 Kafka 0.10-2.x 版本;
- 連續 Offset:增加對連續 ID 的支援。
- 效能改進:實現與 Kafka broker 類似的機制,無需 KoP 針對 Kafka 傳送的 batch 訊息進行拆包解包,將 Kafka 傳送過來的訊息直接以 Kafka 格式進行儲存,並在 Pulsar Client 增加對 kafka 協議的解析器。
- 支援 Envoy,並實現 Pulsar Schema 與 Kafka Schema 的相容。
AoP(AMQP on Pulsar)
AoP(AMQP on Pulsar)是 StreamNative 聯合中國移動共同開發推進的專案,類似 KoP,主要解決 AMQP 應用程式遷移到 Pulsar 的需求。當前 AoP 實現了對 AMQP 協議 0.9.1 版本的支援,2021 年計劃對 AMQP 1.0 協議進行整合支援。目前除了中國移動正在大規模應用 AoP 外,國外也有越來越多的使用者正在使用 AoP,希望更多小夥伴加入到 AoP 使用中來,共同豐富 AoP 場景,協作增強 AoP 功能。
MoP(MQTT on Pulsar)
MQTT 協議在物聯網應用十分廣泛,類似 KoP、AoP,當前 Pulsar 也通過 MoP 專案提供了對 MQTT 協議的支援。當前 MoP 支援 QoS level 0、QoS level 1 協議,2021 年計劃實現對 QoS level 2 協議的支援。
Apache Pulsar在日誌系統中的應用
常見日誌架構ELK
訊息佇列在日誌場景中主要作用:削峰解耦、資料分發
日誌系統常見挑戰:
更多功能性要求:
Kafka和Pulsar對比
-
Kafka僅支援user/client-id級別、broker設定;而Pulsar則支援namespace/topic級別,粒度較小
-
Kafka增加新節點需要reassign partition才能使用;而Pulsar儲存和計算分離,可以按需增加計算或儲存節點,增加即生效,不需要reassign
-
Kafka消費能力受限Topic設定的partition數量;而Pulsar消費能力不受限Topic設定的partition數量,可以通過增加消費者數量增大消費能力
-
Kafka隨著partition增多,請求下降嚴重,追加寫模式退化為隨機些;而Pulsar topic/partition僅是邏輯概念,保證追加寫模式
Pulsar引入架構V1
Pulsar引入架構V2
Pulsar引入架構V3
將ETL邏輯從LogStash遷移到Apache Pulsar Functions/IO中,從而起到降本提效,將資料offload到二級快取中,滿足等保要求
Pulsar引入架構V4
可以通過Pulsar SQL實現千萬級別的精確資料查詢,注意是不支援模擬查詢,模擬還是需要在ES中進行
應用
- Apache Pulsar在騰訊大資料場景落地實踐如TDBank-大資料實時接入平臺騰訊慧聚
- Apache Pulsar在華為物聯網(AMQP)之旅
- Apache Pulsar在電信計費系統的應用
- Apache Pulsar 在拉卡拉的技術實踐
- KoP(Kafka on Pulsar)在 BIGO 的效能優化實踐