1. 程式人生 > 其它 >flink taskmanager&slots&並行度&任務鏈&task分配詳解

flink taskmanager&slots&並行度&任務鏈&task分配詳解

TaskManger與Slots

Flink中每一個worker(TaskManager)都是一個JVM程序,它可能會在獨立的執行緒上執行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制(一個worker至少有一個task slot)。

每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那麼它會將其管理的記憶體分成三份給各個slot。資源slot化意味著一個subtask將不需要跟來自其他job的subtask競爭被管理的記憶體,取而代之的是它將擁有一定數量的記憶體儲備。需要注意的是,這裡不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的記憶體。

通過調整task slot的數量,允許使用者定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味著每個task group執行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM。而在同一個JVM程序中的task將共享TCP連線(基於多路複用)和心跳訊息。它們也可能共享資料集和資料結構,因此這減少了每個task的負載。

預設情況下,Flink允許子任務共享slot,即使它們是不同任務的子任務(前提是它們來自同一個job)。 這樣的結果是,一個slot可以儲存作業的整個管道。

Task Slot是靜態的概念,是指TaskManager具有的併發執行能力,可以通過引數taskmanager.numberOfTaskSlots進行配置;而並行度parallelism是動態概念,即TaskManager執行程式時實際使用的併發能力,可以通過引數parallelism.default進行配置。

也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設定parallelism.default=1,即執行程式預設的並行度為1,9個TaskSlot只用了1個,有8個空閒,因此,設定合適的並行度才能提高效率。

flink 中並行任務的分配

  • Flink 中每一個 TaskManager 都是一個JVM程序,它可能會在獨立的執行緒上執行一個或多個 subtask
  • 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)

slot 主要隔離記憶體,cpu 是slot之間共享的。也就是說4核的機器 ,記憶體足夠,可以把slot設定為8。最多能同時執行8個任務。建議一個核心數分配一個slot

這種圖中 source、map 合成的task的並行度為6
keyby 、window、apply合成的task的並行度為6
sink的並行度為1
總共有13個task
但是不是需要13個slot才能滿足這個並行度的要求

不同的運算元操作複雜度不同
我們可以稱像source map sink 這種 計算不復雜的運算元稱為非資源密集型的運算元 aggregate reduce sum window 這種計算複雜的運算元稱為為資源密集型的運算元

如果把這兩種運算元的優先順序看作相同,平等的分配到slo中,當資料流source 來的資料速率相同時,會造成有些slot一直在跑複雜的運算元,一直在執行中,當時一直跑簡單運算元的slot就會很空閒。

flink 這裡是 非資源密集型的 運算元和資源密集型的運算元可以分配到同一個slot中 ,這樣所有的slot之間任務就會平等,不會存在一直空閒一直高負載。

一個task的並行度是6 就會分為6個並行的task來跑,這六個task不能分配到同一個slot中必須一個slot只有一個。 也就是說 當你的叢集的slot只有6 ,你不能設定運算元的 並行度超過6。

flink 也能做到把非資源密集型和資源密集型的運算元分到不同的slot中 這裡需要設定共享組,非資源 密集型 的運算元在一個共享組,資源密集 型的運算元在一個共享組,這樣這兩種運算元就不會共享的使用slot。預設情況下算有運算元都屬於同一個共享組,共享所有slot。

預設情況下,Flink 允許子任務共享 slot,即使它們是不同任務的子任務但是可以分配到同一個slot上。 這樣的結果是,一個 slot 可以儲存多個作業的整個管道
Task Slot 是靜態的概念,是指 TaskManager 具有的併發執行能力 。
下面看幾個例子

並行可以分為兩個方面

  • 資料並行

source 並行拉資料 map 並行處理資料

  • 計算並行

source 在拉新資料,map 在處理source 之前拉的資料
兩個 job 的並行執行

一個特定運算元的 子任務(subtask)的個數被稱之為其並行度(parallelism)。
一般情況下,一個 stream 的並行度,可以認為就是其所有運算元中最大的並行度


idea裡執行flink程式預設並行度是執行程式機器的核心數量。

每一個運算元都可以單獨設定並行。

.map((_, 1)).setParallelism(2)

也可以全域性指定並行度。

val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2)
此時不支援並行的運算元 比如env.readTextFile(inputpath) 就會報錯
具體情況調整source和sink的並行度

val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2)
此時不支援並行的運算元 比如env.readTextFile(inputpath) 就會報錯
具體情況調整source和sink的並行度

三個位置可以配置並行度

  • flink配置檔案中
  • 程式碼裡
  • flink任務提交時

優先順序

程式碼>提交>配置檔案

程式碼裡設定用程式碼裡的,程式碼裡沒設定用提交時設定的,都沒設定用配置檔案中的配置。
程式碼裡運算元單獨設定優先順序高於全域性設定優先順序

可以設定共享組 把 task 儘量均勻的分配到整個叢集中

任務鏈
合理的設定並行度

  • 減少本地通訊的開銷
  • 減少序列化和反序列化

把多個運算元合併為一個task,原本的運算元成為裡面的subtask

滿足任務鏈需要一下條件

  • 運算元具有相同並行度(具有相同的分割槽數)
  • 運算元屬於one-to-one

one-to-one :stream維護著分割槽以及元素的順序(比如source和map之間)。這意味著map 運算元的子任務看到的元素的個數以及順序跟 source 運算元的子任務生產的元素的個數、順序相同。map、fliter、flatMap等運算元都是one-to-one的對應關係。

Redistributing:stream的分割槽會發生改變。每一個運算元的子任務依據所選擇的transformation傳送資料到不同的目標任務。例如,keyBy 基於 hashCode 重分割槽、而 broadcast 和 rebalance 會隨機重新分割槽,這些運算元都會引起redistribute過程,而 redistribute 過程就類似於 Spark 中的 shuffle 過程。

並行度不同的運算元之前傳遞資料會進行重分割槽,Redistributing型別的運算元也會進行重分割槽。

例子

配置檔案中預設並行度設定為2 ,提交程式碼是並行度設定為2
socket source 並行度只能是1
flatmap fliter map 並行度都是2 且屬於one-to-one 合成任務鏈
keyby 屬於redistrubuting hash 重分割槽
sum print 並行度為2 屬於one-to-one
執行圖如下

當然還可以禁止掉合成任務鏈

單個運算元不參與合成任務鏈

.flatMap(_.split(" ")).disableChaining()

從單個運算元開啟一個新的任務鏈

.startNewChain()

全域性不合成任務鏈

env.disableOperatorChaining()

下面是一個全域性不合成任務鏈的job執行圖,只是在上一個例子的基礎上添加了全域性不合成任務鏈。

運算元設定並行度

  • source 檔案保證數順序需要並行度為 1
  • sink 只輸出到一個檔案需要並行度為 1
  • socketsource 並行度只能為1