1. 程式人生 > 實用技巧 >Flink例項(六十五): Flink的任務鏈實操(一)

Flink例項(六十五): Flink的任務鏈實操(一)

https://blog.csdn.net/Accelerating/article/details/107894474

Flink的任務鏈

Flink的任務鏈

Flink 中的每個運算元都可以設定並行度,每個運算元的一個並行度例項就是一個 subTask。由於 Flink 的 TaskManager 執行 Task 的時候是每個 Task 採用一個單獨的執行緒,這會帶來很多執行緒切換和資料交換的開銷,進而影響吞吐量。
為了避免資料在網路或執行緒之間傳輸導致的開銷,Flink 會在 JobGraph 階段,將程式碼中可以優化的運算元優化成一個運算元鏈(Operator Chains)以放到一個 Task 中執行。
使用者也可以自己指定相應的鏈條,將相關性非常強的轉換操作繫結在一起,這樣能夠讓轉換過程中上下游的 Task 在同一個 Pipeline 中執行,進而避免因為資料在網路或者執行緒間傳輸導致的開銷,提高整體的吞吐量和延遲。
一般情況下,Flink 在 Map 操作中預設開啟 TaskChain,以提高 Flink 作業的整體效能。
如圖1,Source 和 Map 在優化後,組成一個運算元鏈,作為一個 task 執行在一個執行緒上,其簡圖如 Condensed view 所示,並行圖如 parellelized view 所示。

Flink提供了更細粒度的任務鏈控制方法,使用者可根據需求建立任務鏈或禁止任務鏈。

禁用全域性任務鏈

evn.disableOperatorChaining();
  • 1

關閉全域性任務鏈後,建立對應Operator Chain,需要使用者先指定操作符,然後再呼叫startNewChain()方法建立。

dataStream.keyBy(0).filter().map().startNewChain().map();
  • 1

startNewChain方法建立的鏈條只對呼叫方法的前一個操作符和後一個操作符有效,不影響其他的。比如示例中新建的鏈條只有map->map,對前面的filter無效。
禁用全域性任務鏈會影響整體任務執行的情況,禁用前,要清楚任務執行的流程,否則可能造成非預期的結果。

禁用區域性任務鏈

如果不想關閉整體運算元上的鏈條,只是想關閉部分運算元上鍊條繫結,可以使用disableChaining()方法禁用當前操作符上的鏈條。

dataStream.keyBy(0).filter().map().disableChaining()
  • 1

上述程式碼只會禁用map操作上的任務鏈,不會影響其他操作符。

[參考文獻] (https://cloud.tencent.com/developer/article/1543570)