Flink原始碼解析之 --- 啟動流程
Flink原始碼走讀
啟動流程
本文以Flink原始碼中package org.apache.flink.streaming.examples.socket
SocketWindowWordCount為例,走讀Flink原始碼,解析flink任務的執行流程
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has(“hostname”) ? params.get(“hostname”) : “localhost”;
port = params.getInt(“port”);
} catch (Exception e) {
System.err.println(“No port specified. Please run ‘SocketWindowWordCount ” +
“–hostname –port ’, where hostname (localhost by default) ” +
“and port is the address of the text server”);
System.err.println(“To start a simple text server, run ‘netcat -l ’ and ” +
“type the input text into the command line”);
return;
}
上述程式碼從命令列中讀取Flink JobManager 的 hostname、port配置
final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
- 以上程式碼得到Flink執行時的基本環境,包括:
- 任務執行時的並行度(本地執行時預設為CPU核心數)
- ExecutionConfig
- 包括 執行模式: ExecutionMode : PIPELINED, PIPELINED_FORCED,BATCH,BATCH_FORCED
- 檢查點模式 : CheckpointingMode: EXACTLY_ONCE , AT_LEAST_ONCE
- 執行狀態的後臺儲存 : AbstractStateBackend: MEMORY_STATE_BACKEND_NAME = “jobmanager”; public static final String FS_STATE_BACKEND_NAME = “filesystem”; public static final String ROCKSDB_STATE_BACKEND_NAME = “rocksdb”;
DataStream text = env.socketTextStream(hostname, port, “\n”);
從env 中得到 socketTextStream, socketTextStream 是從socket中以流的形式讀取text資料
env.socketTextStream 返回的是DataStreamSource物件
介紹下DataStream和 DataStreamSource:
public class SingleOutputStreamOperator extends DataStream {
public class DataStreamSource extends SingleOutputStreamOperator {
DataStream windowCounts = text
.flatMap(new FlatMapFunction