1. 程式人生 > >Kafka 在華泰證券的探索與實踐

Kafka 在華泰證券的探索與實踐

本文轉自公眾號:上交所技術服務,https://mp.weixin.qq.com/s/q5aKSXEQDSxFh2wkwGfbLw,由樊建、谷正亮、陸俊發表在《交易技術前沿》第二十九期 (2017年12月),點選下面原文連結即可進入

引言

Apache Kafka 發源於 LinkedIn,於 2011 年成為 Apache 的孵化專案,隨後於 2012 年成為 Apache 的頂級專案之一。按照官方定義,Kafka 是一個分散式流平臺,具備流資料的釋出及訂閱(與訊息佇列或企業級訊息系統類似)能力、容錯方式的流資料儲存能力以及流資料的實時處理能力。Kafka 的優勢在於:

  • 可靠性:具有分割槽機制、副本機制和容錯機制的分散式訊息系統。

  • 可擴充套件性:訊息系統支援叢集規模的熱擴充套件。

  • 高效能:在資料釋出和訂閱過程中都能保證資料的高吞吐量。即便在 TB 級資料儲存的情況下,仍然能保證穩定的效能。


目前,Kafka 在網際網路、金融、傳統行業等各種型別公司內部廣泛使用,已成為全球範圍內實時資料傳輸和處理領域的事實標準。

基本原理及概念

一個典型的 Kafka 叢集中包含:(1)若干 Producer,用於生產資料;(2)若干 Broker,構成叢集吞吐資料;(3)若干 Consumer 消費資料;(4)一個 Zookeeper 叢集,進行全域性控制和管理。Kafka 的拓撲結構如下圖所示:

640?wxfrom=5&wx_lazy=1
圖1 kafka 架構圖

Kafka 通過 Zookeeper 管理叢集配置、選舉 leader,以及在 Consumer Group 發生變化時進行再平衡(rebalance)。Producer 使用 push 模式將訊息釋出到 broker,Consumer 使用 pull 模式從 broker 訂閱並消費訊息並更新消費的偏移量值(offset)。

基本概念:

  • Broker(代理):Kafka 叢集的伺服器節點稱為 broker。

  • Topic(主題):在 Kafka 中,使用一個類別屬性來劃分資料的所屬類,劃分資料的這個類稱為 topic。一個主題可以有零個、一個或多個消費者去訂閱寫到這個主題裡面的資料。

  • Partition(分割槽):主題中的資料分割為一個或多個 partition,分割槽是一個有序、不變序列的記錄集合,通過不斷追加形成結構化的日誌。

  • Producer(生產者):資料的釋出者,該角色將訊息釋出到 Kafka 的 topic 中。生產者負責選擇哪個記錄分配到指定主題的哪個分割槽中。

  • Consumer(消費者):從 broker 中讀取資料,消費者可以消費多個 topic 中的資料。

  • Consumer Group(消費者組):每個 consumer 都屬於一個特定的 group 組,一個 group 組可以包含多個 consumer,但一個組中只會有一個 consumer 消費資料。

主題和分割槽:

Topic 的本質就是一個目錄,由一些 Partition Logs(分割槽日誌)組成,其組織結構如下圖所示。每個 Partition 中的訊息都是有序的,生產的訊息被不斷追加到 Partition log 上,其中的每一個訊息都被賦予了一個唯一的 offset 值。

640?
圖 2 Kafka分割槽資料儲存示意圖

對於傳統的 message queue 而言,一般會刪除已經被消費的訊息,Kafka 叢集會儲存所有的訊息,不管訊息有沒有被消費。Kafka 提供兩種策略刪除舊資料:(1)基於時間;(2)基於 Partition 檔案大小。只有過期的資料才會被自動清除以釋放磁碟空間。
Kafka 需要維持的元資料只有“已消費訊息在 Partition 中的 offset 值”,Consumer 每消費一個訊息,offset 就會加 1。其實訊息的狀態完全是由 Consumer 控制的,Consumer 可以跟蹤和重設這個 offset 值,這樣 Consumer 就可以讀取任意位置的訊息。


資料備份機制:

Kafka 允許使用者為每個 topic 設定副本數量,副本數量決定了有幾個 broker 來存放寫入的資料。如果你的副本數量設定為 3,那麼一份資料就會被存放在 3 臺不同的機器上,那麼就允許有 2 個機器失敗。一般推薦副本數量至少為 2,這樣就可以保證增減、重啟機器時不會影響到資料消費。如果對資料持久化有更高的要求,可以把副本數量設定為 3 或者更多。

核心api:

Producer API:允許應用去推送一個流記錄到一個或多個 kafka 主題上。


Consumer API:允許應用去訂閱一個或多個主題,並處理流資料。Consumer 
API 包含 high level API 和 Sample api 兩套。使用 high level API 時,同一 Topic 的一條訊息只能被同一個 Consumer Group 內的一個 Consumer 消費,但多個 Consumer Group 可同時消費這一訊息。與之相對的 Sampleapi 是一個底層的 API,完全無狀態的,每次請求都需要指定 offset 值。


Streams API:允許應用作為一個流處理器,消費來自一個或多個主題的輸入流,或生產一個輸出流到一個或多個輸出主題,並可以有效地將輸入流轉換為輸出流。
其它 Kafka 的特性將在下面華泰證券的使用示例中進一步介紹。

Kafka在華泰證券背景介紹及建設現狀

長期以來,華泰證券的系統建設依賴於服務廠商,廠商之間技術方案的差異性造成了系統之間的異構化,各種型別的系統架構長期存在,在訊息中介軟體領域尤是如此。如簡訊平臺使用 IBMMQ,CRM 系統使用 ESB 架構,自營業務使用 Oracletuxedo 架構,櫃檯系統使用恆生 MessageCenter 架構等。隨著華泰證券自主研發的大規模投入,迫切需要改變這種煙囪式的系統建設方式,以統一化的服務化平臺架構來建設系統。


2015 年,我們通過對 Kafka、ActiveMQ 及 RabbitMQ 等開源訊息中介軟體進行全面的測試對比,最終從效能及高可用方面考慮,選擇 Kafka 作為了公司級訊息中介軟體,經過兩年多的探索和實踐,Kafka 平臺已承接大量重要生產業務系統,支撐了全公司業務的高速發展,積累了大量的生產實踐經驗。


經過將近三年的建設發展,目前在華泰證券內部已分別建設 0.9.0 和 0.10.1 版本的 Kafka 叢集,總體叢集數量 20 餘臺。


目前華泰內部 kafka 已為行情計算、交易回報、量化分析等核心系統提供穩定服務,同時涵蓋了日誌、資料分析等諸多運維領域的應用,日均訊息吞吐量達 2.3TB,峰值流量超 4.8Gb/s,TOPIC 數量 190 餘個,服務 30 個以上應用系統。

實踐經驗

(1)高可用雙活架構
如圖 3 所示,Kafka 高可用特性依賴於 zookeeper 來實現,由於 zookeeper 的 paxos 演算法特性,故 zookeeper 採用同城三中心部署方式,保證 zookeeper 本身高可用,通常其中兩個資料中心部署偶數臺機器,另一資料中心部署單臺機器。


Kafkabroker 跨資料中心兩節點部署,所有 topic 的 partition 保證在兩中心都有副本。如果單資料中心出現問題,另一箇中心能自動進行接管,業務系統可以無感知切換。


由於Kafka的高頻寬需求,主機採用萬兆網絡卡,並且在網絡卡做 bond0 以保證網絡卡高可用,跨資料中心之間的網路通訊採用獨立的萬兆波分通道。

640?圖 3 KAFKA 平臺部署架構圖

(2)引數調優
• 首先我們在 JVM 層面做了很多嘗試。對 Kafka 服務啟動引數進行調優,使用 G1 回收器。kafka 記憶體配置一般選擇 64G,其中 16G 給 Kafka 應用本身,剩餘記憶體全部用於作業系統本身的 page cache.


• 此外為了保證核心系統的達到最佳的讀寫效果,我們採用 SSD 硬碟,並做了 raid5 冗餘,來保證硬碟的高效 IO 讀寫能力。


• 其次我們通過調整 broker 的 num.io.threads,num.network.threads, num.replica.fetchers 等引數來保證叢集之間快速複製和吞吐。


(3)資料一致性保證

Kafka 有自己一套獨特的訊息傳輸保障機制(at least once)。當 producer 向 broker 傳送訊息時,由於副本機制(replication)的存在,一旦這條訊息被 broker 確認,它將不會丟失。但如果 producer 傳送資料給 broker 後,遇到網路問題而造成通訊中斷,那 producer 就無法判斷該條訊息是否已經被確認。這時 producer 可以重試,確保訊息已經被 broker 確認,為了保證訊息的可靠性,我們要求業務做到:

• 保證傳送端成功
當 producer 向 leader 傳送資料時,可以通過 request.required.acks 引數來設定資料可靠性的級別:

1(預設)leader 已成功收到的資料並得到確認後傳送下一條 message。如果 leader 宕機,則會丟失資料。
0送端無需等待來自 broker 的確認而繼續傳送下一批訊息。這種情況下資料傳輸效率最高,但是資料可靠性確是最低的。
-1(ALL)傳送端需要等待 ISR 列表中所有列表都確認接收資料後才算一次傳送完成,可靠性最高。

• 保證消費者消費成功(at least once)

我們要求消費者關閉自動提交(enable.auto.commit:false),同時當消費者每次 poll 處理完業務邏輯後必須完成手動同步提交(commitSync),如果消費者在消費過程中發生 crash,下次啟動時依然會從之前的位置開始消費,從而保證每次提交的內容都能被消費。

• 訊息去重 

考慮到 producer,broker,consumer 之間都有可能造成訊息重複,所以我們要求接收端需要支援訊息去重的功能,最好藉助業務訊息本身的冪等性來做。其中有些大資料元件,如 hbase,elasticsearch 天然就支援冪等操作。

640?圖 4Kafka 訊息可靠性機制

場景事例:行情資料 hbase 儲存
在華泰內部使用 kafka 來快取一段時間的行情資料,並做相應處理為了保證 kafka 中資料的完整性,傳送端 
API 引數配置:

props.put(“acks”, “all”);

為了防止某條傳送影響後續的訊息傳送,採用帶非同步回撥的模式傳送

640?在接收端,啟動專門的消費者拉取 kafka 資料存入 hbase。hbase 的 rowkey 的設計主要包括 SecurityId(股票id)和 timestamp(行情資料時間)。消費執行緒從 kafka 拉取資料後反序列化,然後批量插入 hbase,只有插入成功後才往 kafka 中持久化 offset。這樣的好處是,如果在中間任意一個階段發生報錯,程式恢復後都會從上一次持久化 offset 的位置開始消費資料,而不會造成資料丟失。如果中途有重複消費的資料,則插入 hbase 的 rowkey 是相同的,資料只會覆蓋不會重複,最終達到資料一致。


所以,從根本上說,kafka 上的資料傳輸也是資料最終一致性的典型場景。

640?
圖 5hbase 持久化邏輯

(4)ACL安全


目前華泰內部通過配置 allow.everyone.if.no.acl.found 引數(:true)讓 Kafka 叢集同時具備 
ACL 和非 ACL 的能力,避免資源的浪費。我們選用 SASL 作為 Kafka 鑑權方式,因為 SASL 雖然簡單,但已滿足需求,而 Kerberos 使用過重,過度複雜元件會給 Kafka 帶來更多不確定的因素,如示例所示,根據部門劃分來分配使用者。

示例:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
ser_dep1=“ password 1”
user_dep2=“ password 2”
user_dep3=“ password 3”;
};


服務啟動後,通過 Kafka 的 command line 介面,配置基於使用者、ip、topic、groupid 等的 acl 許可權來保證各業務之間的隔離。

未來規劃

隨著業務的不斷髮展,Kafka 在華泰證券內部已成為核心元件。未來重點進行 PaaS 平臺建設,建立分級保障和 ACL 許可權管控,對重點業務進行獨立管理。
目前 Kafka 的 topic 一般只有 2 個副本,在某些特殊場景下存在資料丟失的風險,未來我們會通過升級擴容,基於業務的重要程度提升副本數,強化叢集的高可用性。
後續我們還會深入研究 Kafka1.0,與 KafkaStreaming、KQL、Storm、Spark、Flink 等流式計算引擎相結合,依託 Kafka 打造公司級流式計算平臺。



猜你喜歡

歡迎關注本公眾號:iteblog_hadoop:

0、回覆 電子書獲取 本站所有可下載的電子書

11、更多大資料文章歡迎訪問https://www.iteblog.com及本公眾號(iteblog_hadoop)12、Flink中文文件:http://flink.iteblog.com
640?wx_fmt=jpeg

本部落格微信小程式:

640?wx_fmt=jpeg