Kafka ETL 的應用及架構解析|告別 Kafka Streams,讓輕量級流處理更加簡單
作者:竹恩、歲月、不周
關鍵詞:Kafka ETL,高彈性、免運維、低成本
引言:阿里雲訊息佇列 Kafka 版提供相容 Apache Kafka 生態的全託管服務,徹底解決開源產品長期的痛點,是大資料生態中不可或缺的產品之一。隨著 Kafka 越來越流行,最初只是作為簡單的訊息匯流排,後來逐漸成為資料整合系統,Kafka 可靠的傳遞能力讓它成為流式處理系統可靠的資料來源。在大資料工程領域,Kafka 在承接上下游、串聯資料流管道方面發揮了重要作用,Kafka 應用流式框架處理訊息也逐漸成為趨勢。
訊息流處理框架選型
說到流計算,常用的便是 Storm、Spark Streaming 和 Flink,目前這些框架都已經完美的支援流計算,並且都有相應的使用案例,但這些框架使用起來門檻相對較高,首先要學習框架和各種技術、規範的使用,然後要將業務遷移到這些框架中,最後線上使用、運維這些流計算框架,對於簡單的流處理應用來說,可能較為複雜。
但由於 Kafka Streams 本身是一個 Java 客戶端庫,需要開發人員自行打包和部署;同時 Kafka Streams 是開源版本,可靠性和可用性不能得到很好的保障,也不能實現按需使用;此外使用過程中需要用到流的程式設計,使用的門檻也相對較高。
訊息流處理框架主要面臨的問題
通過前面對常見的訊息流處理的介紹,不論是傳統的流處理架構還是 Kafka Streams,對於開發人員來說都會面臨一些問題,尤其是在面對 70% 以上簡單流場景的需求,原有的方案弊端被不斷放大,客戶仍然需要投入較大的人力成本和較高的資源,同時整個架構也很複雜。總體來說,目前面臨主要是四個方面的問題:
1、運維成本較大,研發團隊自行編寫程式碼,後期持續維護,運維成本較大;
2、技術成本較大,對於很多輕量或簡單計算需求,需要進行技術選型,引入一個全新元件的技術成本過高;
3、學習成本不可預期,在某元件選定後,需要研發團隊進行學習並持續維護,這就帶來了不可預期的學習成本;
4、自行選用開源元件後,可靠性和可用性不能得到有效保證。
阿里雲的解決方案 - Kafka ETL
Kafka ETL 簡介
阿里雲訊息佇列 Kafka 版推出更低成本的 Kafka –ETL 元件,是一款免運維的流計算元件,主要特性是支援配置化流式處理訊息。Kafka ETL 元件主要提供的是非時間視窗相關的流計算服務,客戶可以配置,甚至簡單寫入幾行程式碼就能滿足包括格式轉換、內容富化、本地聚合、路由分發等常用的資料處理需求。
Kafka ETL 在使用上拆分成有向無環圖,在計算節點轉換時,把 Topic 作為一個儲存,在 Topic 裡進行有狀態的計算,還可以支援訊息的轉儲。
目前 Kafka ETL 已支援的模版包括:
1)資料清洗:規則過濾;
2)轉換模版:字串替換,新增前後綴、字串大小寫轉換、空格去除數;
3)資料富化模版:資料富化;
4)Split 模版:Topic Split;
5)路由模版:Topic 路由。
Kafka ETL 優勢
通過對 Kafka ETL 基礎應用及功能的介紹可以看到,相比於 Storm、Spark Streaming、Flink、Kafka Streams,Kafka ETL 的優勢主要體現在以下四個方面:
1)開箱即用,免運維;
2)節省成本,不用額外購買其他流計算產品,目前 Kafka ETL 仍處於公測免費階段;
3)低程式碼,支援快速上線,學習成本低,一站式體驗,技術投入小,時間成本節省 80%;
4)便於監控排查,控制檯上相關日誌資訊比較全面。
Kafka ETL 操作
通過以上 Kafka ETL 應用和優勢的介紹可以看到 Kafka ETL 在使用中的具備輕量、低成本等特性,不僅如此,Kafka ETL 的操作也比較簡單,僅需三步便可完成 ETL 操作。
1)第一步:建立任務
選擇 Kafka 來源例項、來源 Topic,以及對應的選擇 Kafka 目標例項、目標 Topic。並配置訊息初始位置、失敗處理以及建立資源方式。
選擇 Python 3 作為函式語言。這裡提供了多種資料清洗、資料轉化模板,比如規則過濾、字串替換、新增前/字尾等常用函式。
Kafka ETL 應用場景
基於 Kafka ETL 的功能和優勢,目前 Kafka ETL 主要應用在下面這些場景中:
1)轉儲場景,支援格式化資料,以方便資料進行轉儲;
2)流式處理場景,流式計算,支援訊息的流式處理,主要提供的是非時間視窗相關的流計算服務;
3)實時行為計算場景,包括風控,金融,電商等需要實時行為計算場景;
4)還支援其他一些場景,包括實時報表,自動化運營場景等。
Kafka ETL 的架構解析
通過前三部分介紹,想必大家對阿里雲 Kafka ETL 有了一定了解,本節的主要內容是對 Kafka ETL 的架構進行解析,幫助大家對 Kafka ETL 有更深入的理解。Kafka ETL 是基於 Kafka connect + 函式計算,為雲上的使用者提供一套資料流轉和計算的一站式解決方案。
在當今的大資料、雲端計算時代,一個複雜的大型系統一般都會由許多處理特定任務的子系統構成。各個子系統一般會由不同的團隊開發,因此,各系統中的資料在內容和格式上,存在天然的不一致性。當資料在各個子系統之間流轉的時候,需要對資料進行格式處理,以消除各系統資料之間格式的不同。此外,還可能需要收集來自各個子系統中的異構資料,對採集到的資料做一些加工和計算,然後投遞到資料倉庫進行後續的資料分析。在這個過程中,可以抽象出兩個典型場景:資料流轉場景和資料計算場景。
資料流轉場景主要面對的問題是,異構系統間資料如何流轉?
資料計算場景主要面對的問題是,如何在資料流轉過程中,進行資料的加工計算?
下面就展開對這兩個主要場景進行介紹。
資料流轉場景
在資料流轉場景中,可能需要將各種關係型和非關係型資料庫中的資料匯入到資料倉庫;或是將 mysql 的資料匯入到 ElasticSearch,用來提高查詢體驗;此外一些資料還會匯入到圖形資料庫。這些場景面臨的主要問題是:
1)各種不同源之間的資料如何拷貝;
2)如何滿足傳遞的實時性。
比如,mysql 裡的一個變更,希望馬上能在 ElasticSearch 中反映出來,不然就會導致後臺資料變更了,使用者卻查不出最新的資料。除此之外,還需要保證資料拷貝的高可用、可伸縮性以及可擴充套件性。
為應對這一問題,傳統的方案可能是:為各資料來源之間都專門做一個數據拷貝工具。這種方案會帶來以下問題:
1)首先是工作量問題,需要為每個場景都寫一個專門的工具,工作量會非常大;
2)業務耦合嚴重,比如想監聽價格變化,就需要在所有變化價格的業務裡,都加上一個 producer。假設上層 schema 發生了變化,下層就需要修改程式碼,因此上層需要感知到所有下層的存在。
專門的工具看起來不太可行,那麼,是否做一個完全通用的工具,讓它支援任意資料來源之間資料拷貝。這個聽起來不錯,但是實際卻不可行,正因為它要求太通用了,很難去制定各種規範。
1)通過訊息中介軟體做非同步解耦,所有系統只用和訊息中介軟體通訊;
2)需要開發的解析工具數量,也從原來的 n 平方個,變成線性的 2*n 個。
Kafka connect 則用於連線訊息系統和資料來源,根據資料的流向不同,連線可以分為 source connector 和 sink connector。其原理也很簡單,souce connector 負責解析來源資料,轉換成標準格式的訊息,通過 Kafka producer 傳送到 Kafka broker 中。同理,sink connector 則通過 Kafka consumer 消費對應的 Topic,然後投遞到目標系統中。在整個過程中,Kafka connect 統一解決了任務排程、與訊息系統互動、自動擴縮容、容錯以及監控等問題,大大減少了重複勞動。但是,如何將來源系統的資料解析成 message、或是將 message 解析成目標系統資料,這兩件事情是需要根據不同的資料系統而做不同實現的。對於目前主流的系統,各大廠商均有提供相應的 connector 實現。
資料計算場景
Kafka connect 解決了異構資料來源之間資料同步的問題,雖然也提供了 transformer,解決部分資料轉換需求,但是依舊缺乏實時計算能力。為應對以上場景的資料實時處理需求,市場上出現了許多優秀的處理工具,從最初的 Hadoop,Hive 到 Spark,Flink 以及 Kafka streams 等,都提供了對應的元件模組和上下游解決方案。
- 首先處理框架比較重,佔用資源多。比如當下流行的 Spark 和 Flink 都需要先搭建一個叢集,叢集本身執行起來就要不少資源。叢集規模一般按照流量峰值配置,在大多數時候,資源是浪費的。
- 其次在諸多框架中,需要根據實際需求做技術選型,後期可能需要專門的團隊或者人去運維,這個過程需要較大的學習成本和後期維護成本。
針對部分無狀態的簡單計算,函式計算或許是一個很好的選擇。阿里雲上的函式計算,是事件驅動的全託管計算服務。使用函式計算時,使用者無需採購與管理伺服器等基礎設施,只需編寫並上傳程式碼即可。函式計算會幫助使用者準備好計算資源,彈性地、可靠地執行任務,並提供日誌查詢、效能監控和報警等功能。可以看到,函式計算以簡單易用的方式給使用者的許多場景提供了計算能力。
Kafka+Kafka connect+函式計算的雲原生資料應用解決方案,通過 Kafka connect 作為實時處理任務觸發器,能夠實時接收到新發送到訊息佇列叢集的資料,然後轉發到函式計算,觸發實時資料處理任務的執行。在這個資料流轉階段,將大量異構系統中的資料以各種方式彙集到 Kafka 中,然後圍繞 Kafka 為中心,做後續的處理。作為後續資料流轉中的一環,Kafka connect 除了保障資料的實時性以外,還解決了任務排程、與訊息系統互動、自動擴縮容、容錯以及監控等問題,大大減少了重複勞動。資料到了函式計算以後,會自動觸發使用者自己編寫的資料處理邏輯,對原始訊息內容進行計算。最後,函式計算可以將加工完成的資料,投遞到使用者指定的目標端,例如投遞迴訊息佇列 Kafka,或者是投遞到 Max compute 進行下一步的資料分析。以上所說的整個任務的配置、建立、執行,都只用通過雲上的 Kafka 控制檯圖形頁面進行操作即可完成。
應用場景詳解
接下來一起來看一個 Kafka ETL 的應用示例。在這個示例中,使用者的一個大致使用場景是這樣的:從一個電商業務系統中,採集日誌,儲存到 Kafka 側,然後需要對日誌資料進行加工,最後將加工好的資料投遞到兩個目標端:一個是投遞到 MaxCompute 進行資料分析;另一個是投遞到 ElasticSearch 進行日誌檢索。
現在分節點來看,如何利用訊息佇列 Kafka 版來做這個事情:
1)第一步:採集原始日誌到訊息佇列 Kafka 版的 Topic 中
這裡可以使用一些比較成熟的開源元件例如 FileBeat、Logstash、Flume 等,將使用者應用端的日誌訊息,投遞到 Kafka 中。一般情況下,這個步驟會將原始的日誌資訊投遞到 Kafka。當然這裡也可以做一些簡單的轉換,但一般不這麼做,而是保留一份原始資訊,原始的日誌可能來自各個關聯的應用,內容和格式會存在些許差異。
在這個例子,訂單應用中生成一條日誌。日誌中包含使用者 Id、action、訂單 Id 以及當前狀態:
從支付應用中,又生成一條日誌。日誌中同樣包含以上資訊,只是格式上存在一些小差別。
2)第二步是對 Topic 中的訊息,做簡單的資料加工計算
資料到達 Kafka 的 Topic 之後,Kafka connect 會消費訊息,並投遞到函式計算中。資料到了函式計算後,需要對這個資料進行加工計算,計算的目標是抽取 UserId、Action、OrderId 以及 Status,並將資料都轉換為大寫字母。然後所有處理後的訊息發往 MaxCompute 進行分析,此外還需要篩選 Action 為 pay 的所有訊息發往 Elastic Search 中。
這個步驟,可以在 Kafka 控制檯圖形介面上建立 ETL 任務,使用者選擇資料來源 Topic:user_order_raw,然後寫一段對資料的處理程式碼。這裡,ETL 已經提供了部分模板,可以在模板的基礎上,做稍許改動即可。
本示例的程式碼如下圖所示。在這個例子中,使用者需要寫一段從不同格式的日誌中,抽取UserId、Action、OrderId 以及 Status 的程式碼,然後將所有處理過的訊息路由到目標 Topic。
函式計算將處理完的訊息,投遞迴 Kafka。經過這一步處理,所有訊息被路由到目標Topic:user_order_processed,此時這個 Topic 中會包含兩條訊息,訊息 key 為 null,value 如下所示:
這個例子中,將 Topic:user_order_processed 中所有處理完的訂單相關訊息,投遞到 MaxCompute 中進行資料分析。將 Topic: user_order_pay_info 中的支付資訊,投遞到 ElasticSearch 中進行後續搜尋。
這一步,可以一鍵建立相應的 Kafka connect 任務,將資料投遞到相應的目標端。
總結一下上述整個過程。在這個示例中,所要做的僅僅是在訊息佇列 Kafka 控制檯上配置一個 ETL 任務,寫一小段處理程式碼即可。上述步驟中,第二步處理完資料之後可以不經過第三步投遞迴 Kafka,而是在處理完之後,直接路由到 MaxCompute 和 ES 中。在該例子中採用的方式是將處理完的資料再次傳送回 Kafka 中,然後再投遞到目標系統中。這種方式可以在 Kafka 端保留一份處理後的資料,使用者還可以比較靈活地對這份資料做進一步處理或者繼續投遞到其他第三方系統中。
阿里雲訊息佇列 Kafka 版的優勢
最後,給大家額外分享一下阿里雲上的訊息佇列 Kafka 在核心層面的差異化優勢。阿里雲上的訊息佇列 Kafka 版在發展過程中除了解決易用性和穩定性方面的問題以外,還做到了有區分度,並在核心層面做出自己的核心競爭力和優勢。
阿里雲訊息佇列 Kafka 版支援雲端儲存和 Local 儲存這兩種儲存引擎。其中 Local Topic 指的就是以 Kafka 原生的方式儲存資料,保留開源 Kafka 全部特性,100%相容開源 Kafka。雲端儲存是接下來要介紹的重點,訊息佇列 Kafka 通過自研雲端儲存引擎,徹底解決了原生 Kafka 一些深層的 bug,以及因為本身架構而難以解決的問題,實現了支援海量分割槽、通過多副本技術降低儲存成本,以及支援無縫遷移彈縮性。接下來,將詳細介紹這三大特性和其中的技術細節。
支援海量分割槽
在訊息引擎中,常見的訊息儲存方式有碎片式儲存和集中式儲存。
碎片式儲存通常以 Topic 或者分割槽緯度儲存,其主要優勢是架構簡單,可以針對 Topic 或者分割槽,控制持久化的容量。Kafka 在架構上,是基於分割槽的碎片式儲存,在分割槽規模不大的情況下,可以通過磁碟的順序讀寫,獲得高效的訊息讀寫效能。通常情況下,一般規格的 Kafka 叢集可以支援到千級別的分割槽規模。如果分割槽規模持續擴大,且大部分分割槽都有讀寫請求時,由於這種設計上的問題,原本的順序讀寫就變成了隨機讀寫,從而導致 Kafka 的讀寫效能急劇下降。
不同於碎片式儲存,集中式儲存則將所有訊息集中儲存到同一個 Commit Log,然後根據 Topic 和分割槽資訊構建佇列,佇列通常作為索引使用。相比於碎片式儲存,集中式儲存的主要優勢是,支援分割槽數多,很容易通過刪除舊的 Commit Log 的形式控制磁碟水位。在阿里雲訊息佇列 Kafka 中,底層的自研雲端儲存引擎正是採用了集中式的儲存方式,雲端儲存引擎相比 Kafka 原生的儲存的主要優勢有:
1)解決了 Kafka 分割槽規模擴大時,效能急劇下降的問題,相比於原生 Kafka 千級別的分割槽規模,其支援的分割槽規模可以達到十萬級別;
2) 在大量分割槽同時寫的場景下,相比原生 Kafka 的碎片式儲存,自研雲端儲存引擎能獲得更好的效能;同時,對寫入耗時做了優化,減少了毛刺的產生。
多副本技術優化
為保證 Kafka 叢集的高可靠和高可用性,通常情況下會為所有 Topic 設定 3 副本儲存。這樣,在出現機器宕機時,Kafka 可以快速從可用的 Follower 副本中選出新的 Leader,接替宕機機器上的 Leader 繼續提供服務。訊息佇列 Kafka 在選擇塊儲存裝置時,選擇的是阿里雲上的雲盤。雲盤是阿里云為雲伺服器 ECS 提供的,資料塊級別的塊儲存產品,具有低時延、高效能、永續性、高可靠等特點。雲盤本身採用了分散式三副本機制,為 ECS 例項提供了極強的資料可靠性和可用性保證。
在這種背景下,在 Kafka 層面設定 3 副本,由於使用了雲盤,實際會有 9 個副本。同時,由於 Kafka 層面的 Follower 需要主動從 Leader 同步資料,這也會消耗叢集的計算和網路資源,將使用者的業務流量擴大至 3 倍。但是,如果在 Kafka 層面設定單副本,由於 Kafka 本身不能利用到雲盤的 3 副本能力,其高可用性就不能保證。因此,如何利用好雲盤的 3 副本能力,降低的儲存成本和網路成本,就成了面臨的一大挑戰。
阿里雲通過接入自研雲端儲存引擎,解決了儲存成本和網路成本問題。其核心原理主要是:在自研儲存引擎中引入了邏輯佇列和物理佇列兩個概念。邏輯佇列也就是暴露給使用者的概念,在這裡可以直接理解成客戶端看到的 partition,而物理佇列則用於實際儲存資料。通過對映關係,將邏輯佇列和物理佇列繫結在一起。在自研引擎中,所有的分割槽在邏輯上都是單副本的。資料的可靠性和可用性由雲盤底層的 3 副本機制保證。在正常情況下,傳送到特定邏輯 partition 的資料,都會根據對映關係,寫入到對應的物理佇列中。同理,消費也是根據對映關係從實際的物理佇列中拉取。
接下來來看雲端儲存是如何做到容錯和高可用的。例如,在節點 0 的 ECS 宕機時,可以通過 QueueMapper,秒級切換邏輯佇列 0 的對映關係到節點 1 中的已有佇列 Queue-3,或者新增一個物理佇列 Queue-4。此時,發往邏輯佇列-0 的訊息,將被路由到Queue-3 或者 Queue-4 中。這種情況下,使用者的傳送業務不會受到影響,依舊可以繼續傳送成功,並且最新的訊息也能被消費到。當然,在這種 Failover 期間,會存在一個問題:邏輯佇列-0 在節點-0 上的訊息,暫時不能消費;但是,對大多數應用場景來說,短暫的部分訊息消費延遲並不是大問題,只要不影響傳送就能滿足要求。
在節點-0 的 ECS 宕機後,阿里雲備用 ECS 會迅速生成新的機器替換節點-0,掛載原有云盤,分鐘級時間內恢復節點-0 服務。在節點-0 恢復後,只用重新將邏輯佇列-0 的對映關係切回 Queue-0,系統又重新恢復了原有狀態。此時,傳送/消費依舊能保持原生Kafka 的特性。
通過以上方式,將儲存成本節省到原生 Kafka 的大約三分之一。同時,由於在 Kafka 層面,副本數是 1,從而避免了 Follower 從 Leader 中同步資料的操作,網路流量也節省到原生 Kafka 的大約三分之一。
水平擴容,秒級資料均衡
彈性擴縮能力是訊息佇列的核心能力之一。由於 Kafka 服務端節點是有狀態的,因此新增了若干節點之後,需要重新均衡各個 Topic 的佇列,使得客戶端往叢集中傳送或是消費的流量,能均衡地打到後端各個服務節點上。
開源 Kafka 在水平擴充套件了機器之後,做資料均衡的主要方式有兩種:
第一種是在新的 broker 中新增佇列。這種方式主要的痛點是:
1)系統狀態發生改變,這種情況下一些多語言客戶端的早期版本,需要客戶端主動重啟,否則無法消費新分割槽;
2) 第二是 Kafka 設計上的問題,分片數無法下降,導致後續無法縮容。
1)流量複製,產生網路風暴,干擾正常使用;
2)均衡與資料量有關,如果資料量巨大,可能要花費幾天來遷移。
前文提到訊息佇列 Kafka 引入了兩級佇列:第一級為邏輯佇列,第二級為物理佇列,也就是阿里雲自研雲端儲存佇列。邏輯佇列對外暴露,物理佇列則用於儲存實際資料。通過 QueueMapper 模組維護邏輯佇列與物理佇列之間的對映關係,如下圖所示。
一個邏輯佇列可以由多個物理佇列拼接而成,通過位點分段對映,保證順序。擴容時,只需要將邏輯佇列指向新機器上的物理佇列即可,這樣新寫入的訊息就可以根據新的對映關係,直接寫入到新加的機器。同樣的,在消費時,可以根據位點分段對映關係,找到實際的物理佇列,然後從物理佇列中讀取訊息。
可以看到,通過兩級佇列分段對映,解決了訊息佇列彈縮和遷移問題,具有如下優點:
1)服務端擴縮容後,不變更佇列數量,保持系統狀態不變;
2)擴縮容時無需遷移資料,耗時短,可以在秒級時間內完成 Topic 佇列重新均衡;
3)兼顧了吞吐與擴充套件性,不影響原有訊息佇列的效能。
總結
簡單對主要介紹內容進行總結,Kafka 在流式處理場景中,傳統方案一般會採用 Storm、Spark Streaming、Flink 和 Kafka Streams 等流式處理框架,但是開發人員在使用的過程中會遇到不少問題,尤其是在面對 70% 以上簡單流場景的需求,會遇到運維成本較高、技術成本較大、學習成本不可預期和可用性、可靠性較低等痛點問題,阿里雲訊息佇列 Kafka 釋出 Kafka ETL 元件,是一款免運維的流計算元件,通過 Kafka+Kafka connect+函式計算的架構,能夠很好的應對資料轉儲+實時計算問題,具備免運維、低成本、低程式碼、易監控等優勢。
本文為阿里雲原創內容,未經允許不得轉載。