Flink例項(六十五): Flink的任務鏈實操(一)
https://blog.csdn.net/Accelerating/article/details/107894474
Flink的任務鏈
Flink的任務鏈
Flink 中的每個運算元都可以設定並行度,每個運算元的一個並行度例項就是一個 subTask。由於 Flink 的 TaskManager 執行 Task 的時候是每個 Task 採用一個單獨的執行緒,這會帶來很多執行緒切換和資料交換的開銷,進而影響吞吐量。
為了避免資料在網路或執行緒之間傳輸導致的開銷,Flink 會在 JobGraph 階段,將程式碼中可以優化的運算元優化成一個運算元鏈(Operator Chains)以放到一個 Task 中執行。
使用者也可以自己指定相應的鏈條,將相關性非常強的轉換操作繫結在一起,這樣能夠讓轉換過程中上下游的 Task 在同一個 Pipeline 中執行,進而避免因為資料在網路或者執行緒間傳輸導致的開銷,提高整體的吞吐量和延遲。
一般情況下,Flink 在 Map 操作中預設開啟 TaskChain,以提高 Flink 作業的整體效能。
如圖1,Source 和 Map 在優化後,組成一個運算元鏈,作為一個 task 執行在一個執行緒上,其簡圖如 Condensed view 所示,並行圖如 parellelized view 所示。
Flink提供了更細粒度的任務鏈控制方法,使用者可根據需求建立任務鏈或禁止任務鏈。
禁用全域性任務鏈
evn.disableOperatorChaining();
- 1
關閉全域性任務鏈後,建立對應Operator Chain,需要使用者先指定操作符,然後再呼叫startNewChain()
方法建立。
dataStream.keyBy(0).filter().map().startNewChain().map();
- 1
startNewChain
方法建立的鏈條只對呼叫方法的前一個操作符和後一個操作符有效,不影響其他的。比如示例中新建的鏈條只有map->map
,對前面的filter
無效。
禁用全域性任務鏈會影響整體任務執行的情況,禁用前,要清楚任務執行的流程,否則可能造成非預期的結果。
禁用區域性任務鏈
如果不想關閉整體運算元上的鏈條,只是想關閉部分運算元上鍊條繫結,可以使用disableChaining()
方法禁用當前操作符上的鏈條。
dataStream.keyBy(0).filter().map().disableChaining()
- 1
上述程式碼只會禁用map操作上的任務鏈,不會影響其他操作符。
[參考文獻] (https://cloud.tencent.com/developer/article/1543570)