Flink原始碼系列——獲取JobGraph的過程
接《Flink原始碼系列——獲取StreamGraph的過程》獲取到StreamGraph後,繼續分析,如果通過獲取到的StreamGraph來轉化為JobGraph。轉化邏輯在StreamingJobGraphGenerator這個類中,入口是createJobGraph(StreamGraph)方法。先是初始化了一個StreamingJobGraphGenerator的例項,StreamingJobGraphGenerator建構函式是私有的,只能通過這裡進行例項構造,建構函式中就是做了一些基本的初始化的工作,並初始化了一個JobGraph例項,然後呼叫內部的私有方法createJobGraph()。
public static JobGraph createJobGraph(StreamGraph streamGraph) {
return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
}
createJobGraph()方法就是jobGraph進行配置的主要邏輯,如下:
private JobGraph createJobGraph() {
/** 設定排程模式,採用的EAGER模式,既所有節點都是立即啟動的 */
jobGraph.setScheduleMode(ScheduleMode.EAGER);
/** 第一步 */
/**
* 1.1
* 廣度優先遍歷StreamGraph,並且為每個SteamNode生成雜湊值, 這裡的雜湊值產生演算法,可以保證如果提交的拓撲沒有改變,則每次生成的雜湊值都是一樣的。
* 一個StreamNode的ID對應一個雜湊值。
*/
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
/**
* 1.2
* 為向後相容性生成遺留版本雜湊
* 目前好像就是根據使用者對每個StreamNode設定的hash值,生產StreamNode對應的hash值
*/
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
/** 相連線的操作符的雜湊值對 */
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
/**
* 第二步
* 最重要的函式,生成JobVertex,JobEdge等,並儘可能地將多個節點chain在一起
*/
setChaining(hashes, legacyHashes, chainedOperatorHashes);
/**
* 第三步
* 將每個JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中
* (出邊集合已經在setChaining的時候寫入了)
*/
setPhysicalEdges();
/**
* 第四步
* 據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup,
* 以及針對 Iteration的頭尾設定 CoLocationGroup
*/
setSlotSharing();
/**
* 第五步
* 配置checkpoint
*/
configureCheckpointing();
/** 將快取檔案的配置新增到configuration中 */
for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
}
/** 設定ExecutionConfig */
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
/** 返回轉化好的jobGraph */
return jobGraph;
}
jobGraph的整個產生過程就如上所示,接下來針對其中的主要步驟進行簡單分析。
第一步、為每個節點產生雜湊值
這裡就是根據StreamGraph的配置,給StreamGraph中的每個StreamNode產生一個長度為16的位元組陣列的雜湊值,這個雜湊值是用來後續生成JobGraph中對應的JobVertex的ID。在Flink中,任務存在從checkpoint中進行狀態恢復的場景,而在恢復時,是以JobVertexID為依據的,所有就需要任務在重啟的過程中,對於相同的任務,其各JobVertexID能夠保持不變,而StreamGraph中各個StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在對資料流中的資料進行轉換的過程中,通過一個靜態的累加器生成的,比如有多個數據源時,每個資料來源新增的順序不一致,則有可能導致相同資料處理邏輯的任務,就會對應於不同的ID,所以為了得到確定的ID,在進行JobVertexID的產生時,需要以一種確定的方式來確定其值,要麼是通過使用者為每個ID直接指定對應的一個雜湊值,要麼參考StreamGraph中的一些特徵,為每個JobVertex產生一個確定的ID。
defaultStreamGraphHasher是在StreamingJobGraphGenerator建構函式中初始化的,其對應StreamGraphHasherV2的例項,這個類就是負責給StreamGraph中的每個StreamNode產生一個確定的雜湊值,其具體的實現這裡不做介紹,感興趣的可以看下它的原始碼,邏輯還是很清晰的,這裡主要介紹下其在產生一個StreamNode時,主要考慮的因素。(最好是結合著具體的程式碼看這段邏輯,會更清晰)
如果使用者對節點指定了一個雜湊值,則基於使用者指定的值,產生一個長度為16的位元組陣列;
如果使用者沒有指定,則根據當前節點所處的位置,產生一個雜湊值,考慮的因素有:
a、在當前StreamNode之前已經處理過的節點的個數,作為當前StreamNode的id,新增到hasher中;
b、遍歷當前StreamNode輸出的每個StreamEdge,並判斷當前StreamNode與這個StreamEdge的目標StreamNode是否可以進行連結,如果可以,則將目標StreamNode的id也放入hasher中,且這個目標StreamNode的id與當前StreamNode的id取相同的值;
c、將上述步驟後產生的位元組資料,與當前StreamNode的所有輸入StreamNode對應的位元組資料,進行相應的位操作,最終得到的位元組資料,就是當前StreamNode對應的長度為16的位元組陣列。
另外在StreamingJobGraphGenerator的建構函式中,legacyStreamGraphHashers這個陣列中,預設新增一個StreamGraphHasher的子類實現StreamGraphUserHashHasher。所以在上述的程式碼中,1.2步驟就是執行StreamGraphUserHashHasher這個類的邏輯。這個類的邏輯很簡單,就是判斷使用者是否設定了雜湊值,如果設定了,就為對應的StreamNode產生一個雜湊值陣列。
這裡涉及到兩個使用者設定的雜湊值,StreamingJobGraphGenerator中使用的是StreamTransformation的uid屬性,StreamGraphUserHashHasher使用的是StreamTransformation的userProvidedNodeHash屬性。這兩個屬性解析如下:
uid —— 這個欄位是使用者設定的,用來在任務重啟時,保障JobVertexID一致,一般是從之前的任務日誌中,找出對應的值而設定的;
userProvidedNodeHash —— 這個欄位也是使用者設定的,設定的使用者自己產生的雜湊值。
第二步、設定執行鏈
執行鏈的設定,就是從資料來源StreamNode,依次遍歷,如下:
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
/** 從源StreamNode進行遍歷 */
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
三個入參:
hashes和legacyHashes就是上面產生的每個StreamNode的ID對應的雜湊位元組陣列。
chainedOperatorHashes是一個map:
其key是順序連結在一起的StreamNode的起始那個StreamNode的ID,比如source->flatMap這個兩個對應的StreamNode,在這個例子中,key的值就是source對應的id,為1;
value是一個列表,包含了這個鏈上的所有操作符的雜湊值;
這個列表中的每個元素是一個二元組,這個列表的值就是{[source的主hash,source的備用hash_1],[source的主hash,source的備用hash_2],[flatMap的主hash,flatMap的備用hash_1],…},對於這裡的例子,列表中只有二個元素,為{[source的主hash,null],[flatMap的主hash,null]}
在進行分析createChain方法之前,先看一下兩個StreamNode是否可以連結到一起執行的判斷邏輯,如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
/** 獲取StreamEdge的源和目標StreamNode */
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
/** 獲取源和目標StreamNode中的StreamOperator */
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
/**
* 1、下游節點只有一個輸入
* 2、下游節點的操作符不為null
* 3、上游節點的操作符不為null
* 4、上下游節點在一個槽位共享組內
* 5、下游節點的連線策略是 ALWAYS
* 6、上游節點的連線策略是 HEAD 或者 ALWAYS
* 7、edge 的分割槽函式是 ForwardPartitioner 的例項
* 8、上下游節點的並行度相等
* 9、可以進行節點連線操作
*/
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
只有上述的9個條件都同時滿足時,才能說明兩個StreamEdge的源和目標StreamNode是可以連結在一起執行的。
createChain方法的處理邏輯就是依次遍歷StreamGraph中的所有資料來源的ID,對於這裡案例來說,只有一個數據源,其ID為1。createChain的入參解釋如下:
startNodeId —— StreamNode的鏈的起始node的id,由於從source開始,這裡就是1;
currentNodeId —— 當前處理的node的id,這裡也是1;
hashes和legacyHashes —— 這兩個就是前面產生的每個StreamNode對應的雜湊值;
chainIndex —— 表示當前節點在鏈中的位置,每個鏈都是從0開始編號,當前才處理source,所以為0;
chainedOperatorHashes —— 用來儲存每個鏈中所有操作符的雜湊值,含義前面已經解釋過,初始值是一個空map。
以上面給定的入參值,來繼續分析構建過程。
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
/** builtVertices這個集合是用來存放已經構建好的StreamNode的id */
if (!builtVertices.contains(startNodeId)) {
/**
* 過渡用的出邊集合, 用來生成最終的 JobEdge,
* 注意:存在某些StreamNode會連線到一起,比如source->map->flatMap,
* 如果這幾個StreamNode連線到一起,則transitiveOutEdges是不包括 chain 內部的邊,既不包含source->map的StreamEdge的 */
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
/** 可以與當前節點連結的StreamEdge */
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
/** 不可以與當前節點連結的StreamEdge */
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
/** 將當前節點的出邊分成 chainable 和 nonChainable 兩類 */
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
/** 對於每個可連線的StreamEdge,遞迴呼叫其目標StreamNode,startNodeId保持不變,但是chainIndex會加1 */
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
/**
* 對於每個不可連線的StreamEdge,則將對於的StreamEdge就是當前鏈的一個輸出StreamEdge,所以會新增到transitiveOutEdges這個集合中
* 然後遞迴呼叫其目標節點,注意,startNodeID變成了nonChainable這個StreamEdge的輸出節點id,chainIndex也賦值為0,說明重新開始一條鏈的建立
*/
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
/** 獲取鏈起始節點對應的操作符雜湊值列表,如果沒有,則是空列表 */
List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
/** 當前 StreamNode 對應的主雜湊值 */
byte[] primaryHashBytes = hashes.get(currentNodeId);
/** 遍歷每個備用雜湊值,並與主雜湊值,組成一個二元組,新增到列表中 */
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
/** 生成當前節點的顯示名,如:"Keyed Aggregation -> Sink: Unnamed" */
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
/**
* 1、如果當前節點是起始節點, 則直接建立 JobVertex 並返回 StreamConfig,
* 2、否則先建立一個空的 StreamConfig
*
* createJobVertex 函式就是根據 StreamNode 建立對應的 JobVertex, 並返回了空的 StreamConfig
*/
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
/**
* {@link StreamConfig}就是對{@link Configuration}的封裝,
* 所以通過{@code StreamConfig}設定的配置,最終都是儲存在{@code Configuration}中的。
*/
/**
* 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
* 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置
* 經過這一步操作後,StreamNode的相關配置會通過對{@code StreamNode}的設定介面,將配置儲存在{@code Configuration}中,
* 而{@code Configuration}是是{@link JobVertex}的屬性,也就是說經過這步操作,相關配置已經被儲存到了{@code JobVertex}中。
*/
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
/** 如果是chain的起始節點。(不是chain的中間節點,會被標記成 chain start)*/
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
/** 我們也會把物理出邊寫入配置, 部署時會用到 */
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
/** 將當前節點(headOfChain)與所有出邊相連 */
for (StreamEdge edge : transitiveOutEdges) {
/** 通過StreamEdge構建出JobEdge,建立 IntermediateDataSet ,用來將JobVertex和JobEdge相連 */
connect(startNodeId, edge);
}
/** 將chain中所有子節點的StreamConfig寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中 */
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
/** 如果是 chain 中的子節點 */
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
/** 將當前節點的StreamConfig新增到該chain的config集合中 */
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
/** 如果節點的輸出StreamEdge已經為空,則說明是鏈的結尾 */
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
/** startNodeId 如果已經構建過,則直接返回 */
return new ArrayList<>();
}
}
建立過程詳解程式碼中的註解。
如果startNodeId已經被構建完成,則直接返回一個空集合;
如果還沒有構建,則開始新的構建;
顯示遞迴構建鏈的下游節點,在下游節點都遞迴構建完成後,再構建當前節點;
如果當前節點是一個鏈的起始節點,則新建一個JobVertex,並將相關配置都通過StreamConfig提供的介面,配置到JobVertex的configuration屬性中;
如果是鏈的中間節點,則將相關配置新增到其對應的StreamConfig物件中。
在對head節點設定時,會在head節點與每個輸出StreamEdge的目標節點之間建立連線,程式碼如下:
private void connect(Integer headOfChain, StreamEdge edge) {
/** 將當前edge記錄物理邊界順序集合中 */
physicalEdgesInOrder.add(edge);
/** 獲取StreamEdge的輸出節點的id */
Integer downStreamvertexID = edge.getTargetId();
/** 通過節點id獲取到要進行連線的上下游JobVertex節點 */
JobVertex headVertex = jobVertices.get(headOfChain);
JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
/** 獲取下游JobVertex的配置屬性 */
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
/** 下游JobVertex的輸入源加1 */
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
/** 獲取StreamEdge中的分割槽器 */
StreamPartitioner<?> partitioner = edge.getPartitioner();
JobEdge jobEdge;
/** 根據分割槽器的不同子類,建立相應的JobEdge */
if (partitioner instanceof ForwardPartitioner) {
/** 向前傳遞分割槽 */
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED_BOUNDED);
} else if (partitioner instanceof RescalePartitioner){
/** 可擴充套件分割槽 */
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED_BOUNDED);
} else {
/** 其他分割槽 */
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED_BOUNDED);
}
/** 設定資料傳輸策略,以便在web上顯示 */
jobEdge.setShipStrategyName(partitioner.toString());
/** 列印除錯日誌 */
if (LOG.isDebugEnabled()) {
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
headOfChain, downStreamvertexID);
}
}
其中JobEdge是通過下游JobVertex的connectNewDataSetAsInput方法來建立的,在建立JobEdge的前,會先用上游JobVertex建立一個IntermediateDataSet例項,用來作為上游JobVertex的結果輸出,然後作為JobEdge的輸入,構建JobEdge例項,具體實現如下:
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
/** 建立輸入JobVertex的輸出資料集合 */
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
/** 構建JobEdge例項 */
JobEdge edge = new JobEdge(dataSet, this, distPattern);
/** 將JobEdge例項,作為當前JobVertex的輸入 */
this.inputs.add(edge);
/** 設定中間結果集合dataSet的消費者是上面建立的JobEdge */
dataSet.addConsumer(edge);
return edge;
}
通過上述的構建過程,就可以實現上下游JobVertex的連線,上游JobVertex ——> 中間結果集合IntermediateDataSet ——> JobEdge ——> 下游JobVertex。其中IntermediateDataSet和JobEdge是用來建立上下游JobVertex之間連線的配置;一個IntermediateDataSet有一個訊息producer,可以有多個訊息消費者JobEdge;一個JobEdge則有一個數據源IntermediateDataSet,一個目標JobVertex;一個JobVertex可以產生多個輸出IntermediateDataSet,也可以接受來自多個JobEdge的資料。
通過上述的構建過程,對於這裡的例子,source -> flatMap 組成一個鏈,構建成一個JobVertex,reduce -> sink 組成一個鏈,構建成一個JobVertex。
JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合並的操作,比如對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap滿足連結的條件時,可以可以將兩個操作符的操作放到一個執行緒並行執行,這樣可以減少網路中的資料傳輸,由於在source和flatMap之間的傳輸的資料也不用序列化和反序列化,所以也提高了程式的執行效率。
---------------------
作者:混混fly
來源:CSDN
原文:https://blog.csdn.net/qq_21653785/article/details/79510140
版權宣告:本文為博主原創文章,轉載請附上博文連結!