1. 程式人生 > 其它 >Flink執行時的元件

Flink執行時的元件

Flink 程式的基礎構建模組是流(Streams)和轉換(Transformations),每一個數據流起始於一個或多個 Source,並終止於一個或多個 Sink。資料流類似於有向無環圖(DAG)。

在分散式執行環境中,Flink 提出了運算元鏈的概念,Flink 將多個運算元放在一個任務中,由同一個執行緒執行,減少執行緒之間的切換、訊息的序列化/反序列化、資料在緩衝區的交換,減少延遲的同時提高整體的吞吐量。

官網中給出的例子如下,在並行環境下,Flink 將多個 operator 的子任務連結在一起形成了一個task,每個 task 都有一個獨立的執行緒執行。

Flink 執行時架構主要包括四個不同的元件,它們會在執行流處理應用程式時協同工作:作業管理器(JobManager)、資源管理器(ResourceManager)、工作管理員(TaskManager),以及分發器(Dispatcher)。因為 Flink 是用 Java 和 Scala 實現的,所以所有元件都會執行在Java 虛擬機器上。每個元件的職責如下 :

  • 作業管理器(JobManager)

控制一個應用程式執行的主程序,也就是說,每個應用程式都會被一個不同的JobManager 所控制執行。 JobManager 會先接收到要執行的應用程式, 這個應用程式會包括:作業圖(JobGraph)、邏輯資料流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的 JAR 包。 JobManager 會把 JobGraph 轉換成一個物理層面的資料流圖,這個圖被叫做“執行圖”(ExecutionGraph),包含了所有可以併發執行的任務。 JobManager 會向資源管理器(ResourceManager)請求執行任務必要的資源,也就是工作管理員(TaskManager)上的插槽( slot)。一旦它獲取到了足夠的資源,就會將執行圖分發到真正執行它們的TaskManager 上。而在執行過程中, JobManager 會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。

  • 資源管理器(ResourceManager)

主要負責管理工作管理員(TaskManager)的插槽(slot), TaskManger 插槽是 Flink 中定義的處理資源單元。 Flink 為不同的環境和資源管理工具提供了不同資源管理器,比如YARN、 Mesos、 K8s,以及 standalone 部署。當 JobManager 申請插槽資源時, ResourceManager會將有空閒插槽的 TaskManager 分配給 JobManager。如果 ResourceManager 沒有足夠的插槽來滿足 JobManager 的請求,它還可以向資源提供平臺發起會話,以提供啟動 TaskManager程序的容器。另外, ResourceManager 還負責終止空閒的 TaskManager,釋放計算資源。

  • 工作管理員(TaskManager)

Flink 中的工作程序。通常在 Flink 中會有多個 TaskManager 執行,每一個 TaskManager 都包含了一定數量的插槽(slots)。插槽的數量限制了 TaskManager 能夠執行的任務數量。啟動之後, TaskManager 會向資源管理器註冊它的插槽;收到資源管理器的指令後,TaskManager 就會將一個或者多個插槽提供給 JobManager 呼叫。 JobManager 就可以向插槽分配任務(tasks)來執行了。在執行過程中,一個 TaskManager 可以跟其它運行同一應用程式的 TaskManager 交換資料。

  • 分發器(Dispatcher)

都包含了一定數量的插槽(slots)。插槽的數量限制了 TaskManager 能夠執行的任務數量。啟動之後, TaskManager 會向資源管理器註冊它的插槽;收到資源管理器的指令後,TaskManager 就會將一個或者多個插槽提供給 JobManager 呼叫。 JobManager 就可以向插槽分配任務(tasks)來執行了。在執行過程中,一個 TaskManager 可以跟其它運行同一應用程式的 TaskManager 交換資料。

Flink 中每一個 worker(TaskManager)都是一個 JVM 程序,它可能會在獨立的執行緒上執行一個或多個 subtask。為了控制一個 worker 能接收多少個 task, worker 通
過 task slot 來進行控制(一個 worker 至少有一個 task slot)。

每個 task slot 表示 TaskManager 擁有資源的一個固定大小的子集。假如一個TaskManager 有三個 slot,那麼它會將其管理的記憶體分成三份給各個 slot。資源 slot
化意味著一個 subtask 將不需要跟來自其他 job 的 subtask 競爭被管理的記憶體,取而代之的是它將擁有一定數量的記憶體儲備。需要注意的是,這裡不會涉及到 CPU 的隔
離, slot 目前僅僅用來隔離 task 的受管理的記憶體。

通過調整 task slot 的數量,允許使用者定義 subtask 之間如何互相隔離。如果一個TaskManager 一個 slot,那將意味著每個 task group 執行在獨立的 JVM 中(該 JVM可能是通過一個特定的容器啟動的),而一個 TaskManager 多個 slot 意味著更多的subtask 可以共享同一個 JVM。而在同一個 JVM 程序中的 task 將共享 TCP 連線(基
於多路複用)和心跳訊息。它們也可能共享資料集和資料結構,因此這減少了每個task 的負載。

與此同時,Flink 還允許將不能形成運算元鏈的兩個操作,比如下圖中的 flatmap 和 key&sink 放在一個 TaskSlot 裡執行以達到資源共享的目的。

預設情況下, Flink 允許子任務共享 slot,即使它們是不同任務的子任務(前提是它們來自同一個 job) 。 這樣的結果是,一個 slot 可以儲存作業的整個管道。Task Slot 是靜態的概念,是指 TaskManager 具有的併發執行能力,可以通過引數 taskmanager.numberOfTaskSlots 進行配置; 而並行度 parallelism 是動態概念,即 TaskManager 執行程式時實際使用的併發能力,可以通過引數 parallelism.default進行配置。
也就是說,假設一共有 3 個 TaskManager,每一個 TaskManager 中的分配 3 個TaskSlot,也就是每個 TaskManager 可以接收 3 個 task,一共 9 個 TaskSlot,如果我
們設定 parallelism.default=1,即執行程式預設的並行度為 1, 9 個 TaskSlot 只用了 1個,有 8 個空閒,因此,設定合適的並行度才能提高效率。

Flink 在誕生之初,就以它獨有的特點迅速風靡整個實時計算領域。在此之前,實時計算領域還有 Spark Streaming 和 Storm等框架,那麼為什麼 Flink 能夠脫穎而出?我們將分別在架構、容錯、語義處理等方面進行比較。

架構

Stom 的架構是經典的主從模式,並且強依賴 ZooKeeper;Spark Streaming 的架構是基於 Spark 的,它的本質是微批處理,每個 batch 都依賴 Driver,我們可以把 Spark Streaming 理解為時間維度上的 Spark DAG。

Flink 也採用了經典的主從模式,DataFlow Graph 與 Storm 形成的拓撲 Topology 結構類似,Flink 程式啟動後,會根據使用者的程式碼處理成 Stream Graph,然後優化成為 JobGraph,JobManager 會根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 才是 Flink 真正能執行的資料結構,當很多個 ExecutionGraph 分佈在叢集中,就會形成一張網狀的拓撲結構。

容錯

Storm 在容錯方面只支援了 Record 級別的 ACK-FAIL,傳送出去的每一條訊息,都可以確定是被成功處理或失敗處理,因此 Storm 支援至少處理一次語義。

針對以前的 Spark Streaming 任務,我們可以配置對應的 checkpoint,也就是儲存點。當任務出現 failover 的時候,會從 checkpoint 重新載入,使得資料不丟失。但是這個過程會導致原來的資料重複處理,不能做到“只處理一次”語義。

Flink 基於兩階段提交實現了精確的一次處理語義,我們將會在後面的課時中進行完整解析。

反壓(BackPressure)

反壓是分散式處理系統中經常遇到的問題,當消費者速度低於生產者的速度時,則需要消費者將資訊反饋給生產者使得生產者的速度能和消費者的速度進行匹配。

Stom 在處理背壓問題上簡單粗暴,當下遊消費者速度跟不上生產者的速度時會直接通知生產者,生產者停止生產資料,這種方式的缺點是不能實現逐級反壓,且調優困難。設定的消費速率過小會導致叢集吞吐量低下,速率過大會導致消費者 OOM。

Spark Streaming 為了實現反壓這個功能,在原來的架構基礎上構造了一個“速率控制器”,這個“速率控制器”會根據幾個屬性,如任務的結束時間、處理時長、處理訊息的條數等計算一個速率。在實現控制資料的接收速率中用到了一個經典的演算法,即“PID 演算法”。

Flink 沒有使用任何複雜的機制來解決反壓問題,Flink 在資料傳輸過程中使用了分散式阻塞佇列。我們知道在一個阻塞佇列中,當佇列滿了以後傳送者會被天然阻塞住,這種阻塞功能相當於給這個阻塞佇列提供了反壓的能力。

名詞解釋

1.Dataflow:Flink程式在執行的時候會被對映成一個數據流模型

2.Operator:資料流模型中的每一個操作被稱作Operator,Operator分為:Source/Transform/Sink

3.Partition:資料流模型是分散式的和並行的,執行中會形成1~n個分割槽

4.Subtask:多個分割槽任務可以並行,每一個都是獨立執行在一個執行緒中的,也就是一個Subtask子任務

5.Parallelism:並行度,就是可以同時真正執行的子任務數/分割槽數

並行度

Flink 程式的執行具有並行、分散式的特性。
在執行過程中,一個流( stream) 包含一個或多個分割槽( stream partition) ,而每一個運算元( operator)可以包含一個或多個子任務( operator subtask) ,這些子任務在不同的執行緒、不同的物理機或不同的容器中彼此互不依賴地執行。
一個特定運算元的子任務( subtask) 的個數被稱之為其並行度( parallelism) 。一般情況下, 一個流程式的並行度,可以認為就是其所有運算元中最大的並行度。一個程式中,不同的運算元可能具有不同的並行度。

Stream 在運算元之間傳輸資料的形式可以是 one-to-one(forwarding)的模式也可以是 redistributing 的模式,具體是哪一種形式,取決於運算元的種類。
One-to-one: stream(比如在 source 和 map operator 之間)維護著分割槽以及元素的順序。那意味著 map 運算元的子任務看到的元素的個數以及順序跟 source 運算元的子任務生產的元素的個數、順序相同, map、 fliter、 flatMap 等運算元都是 one-to-one 的對應關係。
➢ 類似於 spark 中的窄依賴
Redistributing: stream(map()跟 keyBy/window 之間或者 keyBy/window 跟 sink之間)的分割槽會發生改變。每一個運算元的子任務依據所選擇的 transformation 傳送數
據到不同的目標任務。例如, keyBy() 基於 hashCode 重分割槽、 broadcast 和 rebalance會隨機重新分割槽,這些運算元都會引起 redistribute 過程,而 redistribute 過程就類似於Spark 中的 shuffle 過程。
➢ 類似於 spark 中的寬依賴

任務鏈

想同並行度的 one to one 操作, Flink 這樣相連的運算元連結在一起形成一個 task,原來的運算元成為裡面的一部分。將運算元連結成 task 是非常有效的優化:它能減少執行緒之間的切換和基於快取區的資料交換,在減少時延的同時提升吞吐量。連結的行為可以在程式設計 API 中進行指定。

注意:必須還是同一個共享組

執行圖

由 Flink 程式直接對映成的資料流圖是 StreamGraph,也被稱為邏輯流圖,因為它們表示的是計算邏輯的高階檢視。為了執行一個流處理程式, Flink 需要將邏輯流
圖轉換為物理資料流圖(也叫執行圖) ,詳細說明程式的執行方式。

Flink 中的執行圖可以分成四層: StreamGraph -> JobGraph -> ExecutionGraph ->物理執行圖。
StreamGraph:是根據使用者通過 Stream API 編寫的程式碼生成的最初的圖。用來表示程式的拓撲結構。

JobGraph: StreamGraph 經過優化後生成了 JobGraph,提交給 JobManager 的資料結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這
樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。

ExecutionGraph : JobManager 根 據 JobGraph 生 成 ExecutionGraph 。ExecutionGraph 是 JobGraph 的並行化版本,是排程層最核心的資料結構。

物理執行圖: JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。

任務排程原理

客 戶 端 不 是 運 行 時 和 程 序 執 行 的 一 部 分 , 但 它 用 於 準 備 並 發 送dataflow(JobGraph)給 Master(JobManager),然後,客戶端斷開連線或者維持連線以
等待接收計算結果。

當 Flink 集 群 啟 動 後 , 首 先 會 啟 動 一 個 JobManger 和 一 個 或 多 個 的TaskManager。由 Client 提交任務給 JobManager, JobManager 再排程任務到各個
TaskManager 去執行,然後 TaskManager 將心跳和統計資訊彙報給 JobManager。TaskManager 之間以流的形式進行資料的傳輸。上述三者均為獨立的 JVM 程序。

Client 為提交 Job 的客戶端,可以是執行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後, Client 可以結束程序( Streaming 的任務),也可以不
結束並等待結果返回。

JobManager 主 要 負 責 調 度 Job 並 協 調 Task 做 checkpoint, 職 責 上 很 像Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元排程到各個 TaskManager 去執行。

TaskManager 在啟動的時候就設定好了槽位數( Slot),每個 slot 能啟動一個Task, Task 為執行緒。從 JobManager 處接收需要部署的 Task,部署啟動後,與自
己的上游建立 Netty 連線,接收資料並處理。