1. 程式人生 > 其它 >flink架構,任務、子任務、運算元概念

flink架構,任務、子任務、運算元概念

資料來源:https://blog.csdn.net/zhaocuit/article/details/106588758

flink架構
Job Managers(master):作業管理器,負責任務安排、協調檢查點、協調故障恢復等
Task Managers(worker):工作管理員,接收master的任務排程,並在本地執行相關任務
在worker節點上,會啟動一個TaskManagersRunner的程序,來接收master的任務排程

一個worker包含至少一個任務槽,每個任務槽表示worker記憶體資源的固定子集。

例如,具有三個槽的worker會將其託管記憶體的1/3專用於每個槽。分配資源意味著子任務不會與其他作業的子任務競爭託管記憶體。

注意:此處沒有發生CPU隔離。當前插槽僅將任務的託管記憶體分開。

多個槽共享TaskManangerRunner的JVM記憶體以及TCP連線和心跳資訊,還會共享資料集和資料結構。

任務槽中執行的是什麼?任務?子任務?

任務、子任務、運算元
一個job的任務、子任務該怎麼劃分呢?如下taskAndSubTask方法的程式碼:

public class Test{
public static void taskAndSubTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//這個方式的source並行度為1
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
//預設為電腦邏輯核數 如4
SingleOutputStreamOperator<Tuple2<String, Integer>> flatOperator = source.flatMap(new WindowWordCount.Splitter());
//並行度 繼承前一個運算元
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = flatOperator.keyBy(0);
SingleOutputStreamOperator<Object> map = keyby.map(new MapFunction<Tuple2<String, Integer>, Object>() {
@Override
public Object map(Tuple2<String, Integer> value) throws Exception {
return null;
}
});
//並行度繼承前一個運算元
map.print();

//執行操作
env.execute("Window WordCount");
}
}

該job分為一個source運算元(並行度1)、一個flatMap運算元(並行度4)、一個keyBy運算元(並行度4)、一個keyBy後map運算元(並行度4)、一個sink運算元(並行度4)

任務
  任務的劃分:在一個job的執行計劃(資料流圖)中,從source到計算到sink,每當並行度發生變化或者資料需要分組(keyBy)時(還可通過API明確設定),就會產生任務。

  在上述程式碼中:source並行度和flatMap並行度不一樣,因此source是一個任務,flatMap是一個任務,keyBy是一個分組運算元,因此又是一個任務,而keyBy、keyBy後map運算元和sink是分組後操作且並行度未改變,因此屬於同一個任務。

  即該job有3個任務:source任務、flatMap任務、keyBy、keyBy後map運算元和sink任務

  假設一個keyBy後map運算元的並行度2,那麼任務的劃分如下:source任務、flatM任務、keyBy任務、keyBy後map以及sink算作一個任務

子任務
  子任務:一個任務的並行度為N,那麼這個任務就擁有N個子任務。假設keyBy後map以及sink運算元,他們的並行度為4,那麼flink會在任務槽中執行4個keyBy後map以及sink運算元

運算元
  flink job中用於處理資料的一個單元,如讀取資料、計算資料、儲存資料等,addSource、addSink、keyBy、map等都是一個數據處理單元。

自定義任務
上述任務劃分只是針對預設情況下的,我們可以通過程式碼讓某個任務分解成多個任務,如方法startNewChain和disableChaining

  • startNewChain:當某個運算元呼叫該方法時,那麼該運算元及其後面的且屬於原來任務的運算元將變成一個新的任務
  • disableChaining:當某個運算元呼叫該方法時,那麼該運算元將從原來的任務中分離出來,變成一個新的任務,該運算元前面的且屬於原來任務的所有運算元為一個任務,該運算元後面的且屬於原來任務的所有運算元,也將變成一個任務


自定義任務示例
假設有如下流:source(並行度1)–>flatMap(並行度4)–>filter(並行度4)–>map(並行度4)–>keyby(並行度4)–>sink(並行度4)

  • 預設情況下:source是一個任務,flatMap、filter、map組成一個任務,keyby和sink組成一個任務
  • startNewChain:假設filter呼叫了startNewChain方法,那麼任務就變成了:source是一個任務,flatMap是一個任務,filter、map組成一個任務,keyby和sink組成一個任務
  • disableChaining:假設filter呼叫了disableChaining方法,那麼任務就變成了:source是一個任務,flatMap是一個任務,filter是一個任務,map是一個任務,keyby和sink組成一個任務

如何能看到任務子任務的劃分情況呢?需要flink叢集環境,然後進入flink網頁控制檯,將job打包上傳到網頁控制檯,並啟動任務或者點選執行計劃,就可以在頁面上看到任務和子任務的劃分情況

程式碼邏輯和部署邏輯
上述程式碼中taskAndSubTask方法的程式碼邏輯為:

  • 一個source運算元(並行度1)
  • 一個flatMap運算元(並行度4)
  • 一個keyBy運算元(並行度4)
  • 一個keyBy後map運算元(並行度4)
  • 一個sink運算元(並行度4)


上述程式碼中taskAndSubTask方法的部署邏輯為:

  • 一個source子任務
  • 4個flatMap子任務
  • 4個keyBy-map-sink子任務

即3個任務,9個子任務,17個運算元,那麼他們在槽中是如何分配的呢?

假設有2個worker,共A B C D四個槽,那麼source子任務會隨機分配到一個槽中,flatMap子任務將會每個槽分配一個,keyBy-map-sink子任務每個槽分配一個

子任務在槽中的分配:儘可能讓每個槽都能執行一個完整的資料流,而不是將一個並行度為非1的某個子任務全部分配到一個槽裡,這樣才能最大化的提高效能

一個worker,包含多個槽,一個槽可以執行多個子任務,一個槽下的多個子任務共享整個槽的記憶體資源,多個槽的記憶體資源等於整個worker程序的記憶體資源

一個worker,就是一個程序,一個子任務就是該程序下某個任務槽中的一個執行緒

重啟策略
設定重啟策略:env.setRestartStrategy()

重啟策略分類:

  • 不重啟
  • 固定延遲重啟策略:env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
  • 故障率重啟策略

故障恢復策略

在flink-conf.yaml中配置jobmanager.execution.failover-strategy=full|region

  • full:重啟Job中所有的Task,即重置整個ExecutionGraph,簡單粗暴。
  • region:只重啟ExecutionGraph中對應的Region包含的Task。