1. 程式人生 > 其它 >FLINK與流批一體

FLINK與流批一體

當我們談論批流一體,我們在談論什麼?

目錄

一、流計算與批計算

一)流計算與批計算

流計算:無限資料之上的計算

批計算:有限資料之上的計算

二)流計算與批計算的比較

特性 批計算 流計算
資料範圍 有界資料 無界資料
任務執行 分批執行、有終止 全部執行、無終止
延時 小時級、天級 秒級、分鐘級
資料場景 資料量超大資料、無法以流的形式交付 資料以流的形式交付
資源消耗
資料質量 要求低 要求高
業務場景 清算對賬、報表生成、特徵生成 欺詐檢測、實時風控、實時推薦
關注點 可擴充套件性、吞吐、容錯 可擴充套件性、延遲、容錯、訊息一致性、訊息永續性
處理語義 僅有一次 至少一次、至多一次、僅有一次
代表引擎 MR SPARK Storm Spark streaming Flink Kafka streaming

三)為什麼要搞流批一體

1.減少學習成本

2.減少資源消耗

3.降低架構複雜性

4.提升價值產出效率

二、流批一體的場景

一)資料整合的流批一體

​ 在大資料場景下經常需要資料同步或者資料整合,也就是將資料庫中的資料同步到大資料的數倉或者其他儲存中。上圖中的左邊是傳統的經典資料整合的模式之一,全量的同步和增量的同步實際上是兩套技術,需要定期將全量同步的資料跟增量同步資料做 merge,不斷的迭代來把資料庫的資料同步到資料倉庫中。

​ 基於 Flink 流批一體,整個資料整合的架構將不同。因為 Flink SQL 也支援資料庫(像 MySQL 和 PG)的 CDC 語義,所以可以用 Flink SQL 一鍵同步資料庫的資料到 Hive、ClickHouse、TiDB 等開源的資料庫或開源的 KV 儲存中。在 Flink 流批一體架構的基礎上,Flink 的 connector 也是流批混合的,它可以先讀取資料庫全量資料同步到數倉中,然後自動切換到增量模式,通過 CDC 讀 Binlog 進行增量和全量的同步,Flink 內部都可以自動的去協調好,這是流批一體的價值。

二)數倉架構的流批一體

​ 目前主流數倉架構都是一套典型的離線數倉和一套新的實時數倉,但這兩套技術棧是分開的。在離線數倉裡,還是習慣用 Hive 或者 Spark,在實時數倉中用 Flink 加 Kafka。有三個問題需要解決:兩套開發流程,成本高;資料鏈路冗餘,兩套鏈路將資料相關的操作做了兩遍;資料口徑的一致性難以保證,因為它是由兩套引擎算出來的。

​ 用流批一體架構來解決,以上難題將極大降低。

  • 首先,Flink 是一套 Flink SQL 開發,不存在兩套開發成本。一個開發團隊,一套技術棧,就可以做所有的離線和實時業務統計的問題。
  • 第二,資料鏈路也不存在冗餘,明細層的計算一次即可,不需要離線再算一遍。
  • 第三,資料口徑天然一致。無論是離線的流程,還是實時的流程,都是一套引擎,一套 SQL,一套 UDF,一套開發人員,所以它天然是一致的,不存在實時和離線資料口徑不一致的問題。

三)資料湖的流批一體

​ Hive 元資料的管理是瓶頸,Hive 不支援資料的實時更新。Hive 沒有辦法實時,或者準實時化地提供數倉能力。現在比較新的資料湖架構,可以解決更具擴充套件性的元資料的問題,而且資料湖的儲存支援資料的更新,是一個流批一體的儲存。資料湖儲存與 Flink 結合,就可以將實時離線一體化的數倉架構演變成實時離線一體化的資料湖架構。

四)儲存的流批一體

1.Pulsar

Pulsar的元件架構圖

  • 首先在計算層,Pulsar Broker 不儲存任何狀態資料、不做任何資料儲存,稱之為服務層。
  • 其次,Pulsar 擁有一個專門為訊息和流設計的儲存引擎 BookKeeper,稱之為資料層。
  • 如果要支援更多的 Producer 和 Consumer,可擴充上面無狀態的 Broker 層;
  • 如果要支援更多的資料儲存,可單獨擴充底層儲存層。

Pulsar的流批概念

​ 這種分層的架構為做批流融合打好了基礎。因為它原生分成了兩層,可以根據使用者的使用場景和批流的不同訪問模式,來提供兩套不同的 API。

  • 如果是實時資料的訪問,可以通過上層 Broker 提供的 Consumer 介面;

  • 如果是歷史資料的訪問,可以跳過 Broker,用儲存層的 reader 介面,直接訪問底層儲存層。

2.Hologres

1)Hologres的架構圖

Hologres的架構從下往上看,最底層是統一的儲存系統,可以是阿里雲統一的Pangu、業務的HDFS或者OSS、S3等,儲存上面是計算層,提供類似的MMP架構計算服務,再往上是FE層,根據查詢資訊將Plan分發到各個計算節點,再往上就是PostgreSQL生態的對接,只要有JDBC/ODBC Driver就能對Hologres做查詢。

Hologres的架構是完全是儲存計算分離,計算完全部署在K8s上,儲存可以使用共享儲存,可以根據業務需求選擇HDFS或者雲上的OSS,這樣使用者就能根據業務需求對資源做彈性擴縮容,完美解決資源不夠帶來的併發問題。

儲存優勢

  • 全非同步:支援高併發寫入,能夠將CPU最大化利用;

  • 無鎖:寫入能力隨資源線性擴充套件,直到將CPU全部寫滿;

  • 記憶體管理:提供資料cache,支援高併發查詢。

計算優勢

  • 高效能混合負載:慢查詢和快查詢混合一起跑,通過內部的排程系統,避免慢查詢影響快查詢;

  • 向量化計算:列式資料通過向量化計算達到查詢加速的能力;

  • 儲存優化:能夠定製查詢引擎,但是對儲存在Hologres資料查詢效能會更優。

問題提出

大致根據查詢併發度要求或者查詢Latency要求,將Patterns分為四類:

  • Batch:離線計算
  • Analytical:互動式分析
  • Servering:高QPS的線上服務
  • Transaction:與錢相關的傳統資料庫(絕大多數業務並不需要)

目前市面上都在說HTAP,經過調研HTAP是個偽命題,因為A和T的優化方向不一樣。為了做T,寫入鏈路將非常複雜,QPS無法滿足需求。若是對T的要求降低一點,就會發現Analytical和Severing的聯絡非常緊密,這兩塊的技術是可以共用的,所以放棄了T就相當於放棄了Transaction,於是提出新的一個架構叫做HSAP,需要做的就是把提供服務和分析的資料儲存在一個系統裡,通過一套分析引擎來做處理。

2)Hologres的流批一體

​ 資料實時寫入至Flink,經由Flink做實時預處理,比如實時ETL或者實時訓練,把處理的結果直接寫入Hologres,Hologres提供維表關聯點查、結果快取、複雜實時互動、離線查詢和聯邦查詢等,這樣整個業務系統只需要通過Hologres來做唯一的資料入口,線上系統可以通過PostgreSQL生態在Hologres中訪問資料,無需對接其他系統,這樣也能解決之前傳統架構的各種查詢、儲存問題。

三、Flink中的流批一體

2020 年,Flink 在流批一體上走出了堅實的一步,可以抽象的總結為 Flink 1.10 和 1.11 這兩個大的版本,主要是完成 SQL 層的流批一體化和實現生產可用性。實現了統一的流批一體的 SQL 和 Table 的表達能力,以及統一的 Query Processor,統一的 Runtime。在1.12 版本中,對 DataStream API 進行了流批一體化。在 DataStream 原生的流的運算元上增加批的運算元,也就是說 DataStream 也可以有兩種執行模式,批模式和流模式裡面也可以混合批運算元和流運算元。在1.13 的版本中,實現 DataStream 流批一體化的運算元,整個的計算框架和 SQL 一樣,完全都是流批一體化的計算能力。這樣一來,原來 Flink 中的 DataSet 這套老的 API 就可以去掉,完全實現真正的流批一體的架構。

一)流批一體的DataStream

1.目前的SDK

  • Table/SQL 是一種 Relational 的高階 SDK,主要用在一些資料分析的場景中,既可以支援 Bounded 也可以支援 Unbounded 的輸入。Table/SQL 可以支援 Batch 和 Streaming 兩種執行模式。Relatinal SDK 功能雖然強大,但也存在一些侷限:不支援對 State、Timer 的操作。

  • DataStream 屬於一種 Physical SDK。DataStream 是一種 Imperative SDK,所以對物理執行計劃有很好的“掌控力”。

  • DataSet 是一種僅支援 Bounded 輸入的 Physical SDK,會根據 Bounded 的特性對某些運算元進行做一定的優化,但是不支援 EventTime 和 State 等操作。

​ 利用已有的 Physical SDK ,無法寫出流批一體的application。另外,兩套SDK的學習和理解的成本比較高,兩套SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對於使用者來說,理解兩套機制的門檻也不小;並且這兩 SDK 不相容。

2.期望的SDK

  • 為什麼選擇了 DataStream 統一 DataSet ?DataSet 在社群的影響力逐漸下降。DataSet 運算元的實現,在流的場景完全無法複用,例如 Join 等。而對於 DataStream 則不然,可以進行大量的複用。

  • 提升DataStream的批效率。DataStream 是給 Unbounded 的場景下使用的,而 Unounded 一個主要的特點就是亂序,解決亂序引起了大量的序列化、反序列化和隨機磁碟讀寫;DataSet 中,資料有的,通過優化避免隨機磁碟 I/O 訪問,同時也對序列化和反序列化做優化。通過單 Key 的 BatchStateBackend 幾乎完全避免了對所有運算元重寫,同時還得到了非常不錯的效果。

  • DataStream一致性的相容,DataStream 寫的 Application 都採用 Streaming 的執行模式,一致性依賴 Flink Checkpoint 機制的 2PC 協議,但這種模式的弊端是資源消耗大、容錯成本高。提出了一個全新 Unified Sink API,從而讓開發者提供 What to commit 和 How to commit,系統應該根據不同的執行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。

二)流體一體的DAG Scheduler

​ Flink 有兩種排程的模式:一種是流的排程模式,在這種模式下,Scheduler 會申請到一個作業所需要的全部資源,然後同時排程這個作業的全部 Task,所有的 Task 之間採取 Pipeline 的方式進行通訊。一種是批的排程模式,所有 Task 都是可以獨立申請資源,Task 之間都是通過 Batch Shuffle 進行通訊。這種方式的好處是容錯代價比較小,不足是Task 之間的資料都是通過磁碟來進行互動,引發了大量的磁碟 IO。

基於 Pipeline Region 的統一排程

​ Unified DAG Scheduler 允許在一個 DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進行通訊。這些由 Pipeline 的資料交換方式連線的 Task 被稱為一個 Pipeline Region。基於以上概念,Flink 引入 Pipeline Region 的概念,不管是流作業還是批作業,都是按照 Pipeline Region 粒度來申請資源和排程任務。

​ 在 Flink 中,不同 Task 之間有兩種連線方式,一種是 All-to-All 的連線方式,上游 Task 會和下游的所有的 Task 進行連線;一種是 PointWise 的連結方式,上游的 Task 只會和下游的部分 Task 進行連線。Flink Planner 可以根據實際執行場景,定製哪些 Task 之間採取 Pipeline 的傳輸方式,哪些 Task 之間採取 Batch 的傳輸方式方式。

自適應排程

​ 排程的本質是給物理執行計劃進行資源分配的決策過程。對於批作業來說靜態生成物理執行計劃存在一些問題,配置人力成本高,需要手動調整批作業的併發度,一旦業務邏輯發生變化,又要不斷的重複這個過程,也可能會出現誤判的情況導致無法滿足使用者 SLA;資源利用率低,中低優先順序的作業以預設值作為併發度,造成資源的浪費;高優先順序的作業不及時調低併發讀,也造成大量的資源浪費現象;

​ 為批作業引入了自適應排程功能,和原來的靜態物理執行計劃相比,利用這個特性可以大幅提高使用者資源利用率。 Adaptive Scheduler 可以根據一個 JobVertex 的上游 JobVertex 的執行情況,動態決定當前 JobVertex 的併發度。未來,也可以根據上游 JobVertex 產出的資料,動態決定下游採用什麼樣的運算元。

三)流批一體的Shuffle

​ Shuffle 本質上是為了對資料進行重新劃分(re-partition),目標是提供一套統一的 Shuffle 架構,既可以滿足不同 Shuffle 在策略上的定製,同時還能避免在共性需求上進行重複開發。批作業和流作業的 Shuffle 有差異也有共性,共性主要體現在:資料的 Meta 管理,所謂 Shuffle Meta 是指邏輯資料劃分到資料物理位置的對映;資料傳輸,在分散式系統中,對資料的重新劃分都涉及到跨執行緒、程序、機器的資料傳輸。

流批一體的 Shuffle 架構

​ Unified Shuffle 架構抽象出三個元件: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個元件互動完成運算元間的資料的重新劃分。通過這三個元件可以滿足不同Shuffle外掛在具體策略上的差異:

  • Shuffle Master 資源申請和資源釋放。也就是說外掛需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
  • Shuffle Writer 上游的運算元利用 Writer 把資料寫入 Shuffle Service——Streaming Shuffle 會把資料寫入記憶體;External/Remote Batch Shuffle 可以把資料寫入到外部儲存中;
  • Shuffle Reader 下游的運算元可以通過 Reader 讀取 Shuffle 資料;

​ 同時,為流批 Shuffle 的共性——Meta 管理、資料傳輸、服務部署——提供了架構層面的支援,從而避免對複雜元件的重複開發。高效穩定的資料傳輸,是分散式系統最複雜的子系統之一,例如在傳輸中都要解決上下游反壓、資料壓縮、記憶體零拷貝等問題。

四)流批一體的容錯

​ Flink 現有容錯策略以檢查點為前提,無論是單個 Task 出現失敗還是JobMaster 失敗, 都會按照最近的檢查點重啟整個作業。Flink Batch 執行模式下不會開啟檢查點,一旦出現任何錯誤,整個作業都要從頭執行。以下兩個改進就主要為了提升批作業的容錯能力。

Task的改進 Pipeline Region Failover

​ Batch 執行模式下,Flink允許 Task 之間通過 Blocking Shuffle 進行通訊。對於讀取 Blocking Shuffle 的 Task 發生失敗之後,由於 Blocking Shuffle 中儲存了這個 Task 所需要的全部資料,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可,而不需要重啟整個作業。

JM的改進 Operation Log

​ JM 是一個作業的控制中心,包含了作業的各種執行狀態,一旦 JM 發生錯誤之後,新 JM 無法判斷現有的狀態是否滿足排程下游任務的條件——所有的輸入資料都已經產生。JM Failover 的關鍵就是如何讓一個 JM“恢復記憶”,通過基於 Operation Log 機制恢復 JM 的關鍵狀態。

五)流批一體的總圖

​ 上圖是一個Flink為了實現流批一體的引擎層所規劃的框架圖,其中很多還是規劃和開發當中,在目前Flink最新版本1.14中,還沒有完全實現上述的架構,但相信繼續經過幾個版本的迭代,Flink就可以在引擎層面完成流批一體的統一。

注:
本文綜合自
https://mp.weixin.qq.com/s/AnBU9ntRVwbsWQoiDkzZHg
https://mp.weixin.qq.com/s/4w8VSUjaX7JHiaPMxaGj7g