Flink執行時之生成作業圖
生成作業圖
在分析完了流處理程式生成的流圖(StreamGraph)以及批處理程式生成的優化後的計劃(OptimizedPlan)之後,下一步就是生成它們面向Flink執行時執行引擎的共同抽象——作業圖(JobGraph)。
什麼是作業圖
作業圖(JobGraph)是唯一被Flink的資料流引擎所識別的表述作業的資料結構,也正是這一共同的抽象體現了流處理和批處理在執行時的統一。
相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不完全是“靜態”的資料結構了,因為它加入了中間結果集(IntermediateDataSet)這一“動態”概念。
作業頂點(JobVertex)、中間資料集(IntermediateDataSet)、作業邊(JobEdge)是組成JobGraph的基本元素。這三個物件彼此之間互為依賴:
- 一個JobVertex關聯著若干個JobEdge作為輸入端以及若干個IntermediateDataSet作為其生產的結果集;
- 一個IntermediateDataSet關聯著一個JobVertex作為生產者以及若干個JobEdge作為消費者;
- 一個JobEdge關聯著一個IntermediateDataSet可認為是源以及一個JobVertex可認為是目標消費者;
因此一個JobGraph可能的圖形化表示如下:
那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式儲存了所有的JobVertex,鍵是JobVertexID:
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
至於其它的元素,通過JobVertex都可以根據關係找尋到。
JobGraph包含了如下這些屬性:
- 描述作業相關的資訊,比如上面的頂點、作業編號、名稱等;
- 使用者程式包相關的資訊,比如類路徑等;
- 執行的一些配置資訊,比如非同步快照的配置、會話超時、是否允許排隊排程等;
絕大部分的例項方法都是維護這些屬性的。
需要注意的是,用於迭代的反饋邊(feedback edge)當前並不體現在JobGraph中,而是被內嵌在特殊的JobVertex中通過反饋通道(feedback channel)在它們之間建立關係。
流圖生成作業圖
這篇文章我們來分析流處理程式是如何從之前的Stream生成JobGraph的。這部分的實現位於類StreamingJobGraphGenerator中,它是流處理程式的JobGraph生成器,其核心是createJobGraph方法,它體現了生成JobGraph的主幹呼叫,實現程式碼如下:
public JobGraph createJobGraph() {
//建立一個JobGraph例項物件
jobGraph = new JobGraph(streamGraph.getJobName());
//設定對task的排程模式為ALL,即所有的運算元立即同時啟動
jobGraph.setScheduleMode(ScheduleMode.ALL);
//對用於輔助生成JobGraph的一些例項變數進行初始化
init();
//給StreamGraph的每個StreamNode生成一個hash值,該hash值在節點不發生改變的情況下多次生成始終是一致的,
//可用來判斷節點在多次提交時是否產生了變化並且該值也將作為JobVertex的ID
Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();
//基於StreamGraph從所有的source開始構建task chain
setChaining(hashes);
//給頂點設定物理邊(入邊)
setPhysicalEdges();
//為每個JobVertex設定slotShareGroup,同時為迭代的source/sink對設定CoLocationGroup
setSlotSharing();
//配置檢查點
configureCheckpointing();
//配置重啟策略
configureRestartStrategy();
//傳遞執行配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
return jobGraph;
}
接下來我們挨個對幾個關鍵的方法進行分析。第一個要分析的方法是traverseStreamGraphAndGenerateHashes,它會對StreamGraph進行遍歷併為每一個StreamNode都生成其雜湊值,生成的雜湊值將用於為每個JobVertex建立JobVertexID。方法的完整實現如下:
private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
//hash函式
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();
//儲存訪問過了的節點編號
Set<Integer> visited = new HashSet<>();
//入隊即將訪問的節點物件
Queue<StreamNode> remaining = new ArrayDeque<>();
//source是一個流拓撲的起點,從source開始遍歷
//hash值的生成是順序敏感的(依賴於順序),因此首先要對source ID集合進行排序
//因為如果source的ID集合順序不固定,那意味著多次提交包含該source ID集合的程式時可能導致不同的遍歷路徑,
//從而破壞了hash生成的因素
List<Integer> sources = new ArrayList<>();
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
sources.add(sourceNodeId);
}
Collections.sort(sources);
//按照排好的順序,進行廣度遍歷,注意這不是樹結構,而是圖,因為就一個節點而言,其輸入和輸出都可能有多條路徑
for (Integer sourceNodeId : sources) {
remaining.add(streamGraph.getStreamNode(sourceNodeId));
visited.add(sourceNodeId);
}
StreamNode currentNode;
//從即將訪問的節點佇列中出隊首部的一個元素,沒有元素了則結束
while ((currentNode = remaining.poll()) != null) {
// 給當前節點生成雜湊值,並返回是否生成成功
if (generateNodeHash(currentNode, hashFunction, hashes)) {
//遍歷當前節點的所有輸出邊
for (StreamEdge outEdge : currentNode.getOutEdges()) {
//獲取輸出邊的目標頂點(該邊另一頭的頂點)
StreamNode child = outEdge.getTargetVertex();
//如果目標頂點沒被訪問過,則加入待訪問佇列和易訪問元素集合
if (!visited.contains(child.getId())) {
remaining.add(child);
visited.add(child.getId());
}
}
}
else {
//如果對當前節點的雜湊值生成操作失敗,則將其從已訪問的節點中移除,等待後續再次訪問
visited.remove(currentNode.getId());
}
}
return hashes;
}
在上面程式碼段中呼叫的generateNodeHash方法,其實現邏輯大致分為兩大部分,這兩部分對應了生成雜湊的兩種方式:
- 根據StreamTransformation的編號進行計算
- 根據一些因素來綜合計算
第二種方式對應的因素有如下三種:
- 節點相關的屬性(ID、並行度、UDF的類名)
- 連結在一起的輸出節點相關的屬性
- 輸入節點的雜湊值
這裡值得注意的是節點相關的ID屬性,它並不是StreamTransformation的ID,因為StreamTransformation的ID是一個靜態計數器,它可能會導致邏輯相同的Job最終生成的雜湊值卻不同。考慮下面的示例:
//program 1
DataStream<String> s1 = ...; //s1.ID = 1
DataStream<String> s2 = ...; //s2.ID = 2
s1.union(s2).print();
//program 2
DataStream<String> s2 = ...; //s2.ID = 1
DataStream<String> s1 = ...; //s1.ID = 2
s1.union(s2).print();
對於上面示例程式碼中的兩個語義等價的程式,當藉助StreamTransformation的ID屬性來生成雜湊值時會出現不一致。因此,Flink所使用的ID值其實是已完成雜湊值計算的節點數目。這樣就不會出現上述因為source定義的順序不同而導致語義上等價的程式產生不一致雜湊值的情況。最終traverseStreamGraphAndGenerateHashes方法將會為所有的StreamNode生成對應的雜湊值。
為了更高效得執行,Flink對DAG在排程上進行了優化,該優化稱之為運算元連結(operator chain)。它允許某些運算元可以“連結”在一起,在排程時這些被連結到一起的運算元會被視為一個任務(Task)。而在執行時,一個Task會被並行化成若干個subTask例項進行執行,一個subTask對應一個執行執行緒。運算元連結的示意圖如下:
這種優化能減少執行緒之間的切換和跨節點的資料交換從而在減少時延的同時提升吞吐量。
當運算元互相連結之後,原先存在於互相連結的運算元之間的邊就只是邏輯上存在的。而被連結的運算元整體跟其他無法與其連結的運算元之間的邊才是真正的物理邊。另外,為了方便原始碼解讀,需要對“連結”和“連線”加以區分。在當前的上下文中,“連結”指的是“運算元鏈”的形成方式,而“連線”指的是在運算元之間建立關係。
接下來我們就來分析,將運算元連結起來的setChaining方法。setChaining會沿著source生成運算元鏈(但不要被其方法名誤導,它其實還完成了很多額外的工作,比如建立JobVertex)。
setChaining會遍歷StreamGraph中的sourceID集合。為每個source呼叫createChain方法,該方法以當前source為起點向後遍歷並建立運算元鏈。createChain方法會收集當前節點所連線的物理邊,併為連結頭節點與物理邊下游的運算元建立連線關係。
/**
* @param startNodeId : 起始節點編號
* @param currentNodeId : 當前遍歷節點編號
* @param hashes : 節點編號與hash值對映表
* @return 遍歷過的邊集合
*/
private List<StreamEdge> createChain(Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
int chainIndex) {
//如果起始節點沒有被構建過,才進入分支;否則直接返回一個空List(遞迴結束條件)
if (!builtVertices.contains(startNodeId)) {
//儲存遍歷過的邊,該物件被作為最終結果返回
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
//儲存可以被連結的出邊
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
//儲存不可被連結的出邊
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//遍歷當前節點的每個出邊
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
//如果該出邊是可被連結的,則加入可被連結的出邊集合,否則加入不可被連結的出邊集合
if (isChainable(outEdge)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//遍歷每個可被連結的出邊,然後進行遞迴遍歷
for (StreamEdge chainable : chainableOutputs) {
//起始節點不變,以該可被連結的出邊的目標節點作為“當前”節點進行遞迴遍歷並將遍歷過的邊集合加入到當前集合中
//這裡值得注意的是所有可連結的邊本身並不會被加入這個集合!
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, chainIndex + 1));
}
//遍歷不可連結的出邊,同樣進行遞迴遍歷
for (StreamEdge nonChainable : nonChainableOutputs) {
//將當前不可連結的出邊加入到遍歷過的邊集合中
transitiveOutEdges.add(nonChainable);
//同樣進行遞迴遍歷,不過這裡的起始節點和當前節點都被設定為該邊的目標節點
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, 0);
}
//為當前節點建立連結的完整名稱,如果當前節點沒有可連結的邊,那麼其名稱將直接是當前節點的operator名稱
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//建立流配置物件,流配置物件針對單個作業頂點而言,包含了頂點相關的所有資訊。
//當建立配置物件的時候,如果當前節點即為起始節點(連結頭),會先為該節點建立JobVertex物件
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes)
: new StreamConfig(new Configuration());
//然後為當前節點初始化流配置物件裡的一系列屬性
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//如果當前節點是起始節點(chain頭節點)
if (currentNodeId.equals(startNodeId)) {
//設定該節點是chain的開始
config.setChainStart();
config.setChainIndex(0);
//設定不可連結的出邊
config.setOutEdgesInOrder(transitiveOutEdges);
//設定所有出邊
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
//遍歷當前節點的所有不可連結的出邊集合
for (StreamEdge edge : transitiveOutEdges) {
//給當前節點到不可連結的出邊之間建立連線
//通過出邊找到其下游流節點,根據邊的分割槽器型別,構建下游流節點跟輸入端上游流節點(也即起始節點)
//的連線關係。在這個構建的過程中也就建立了IntermediateDataSet及JobEdge並跟當前節點的JobVertex
//三者建立了關聯關係
connect(startNodeId, edge);
}
//將當前節點的所有子節點的流配置物件進行序列化
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else { //如果當前節點是chain中的節點,而非chain的頭節點
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
//將當前節點的流配置物件加入到chain頭節點點相關的配置中
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
//返回所有不可連結的邊
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
上面的程式碼段中會先將當前節點的出邊按照它們是否是可被連結進行分類,isChainable方法包含了判斷邏輯,一個出邊如果是可連結的,它需要滿足的條件如下:
return downStreamVertex.getInEdges().size() == 1 //如果邊的下游流節點的入邊數目為1(也即其為單輸入運算元)
&& outOperator != null //邊的下游節點對應的運算元不為null
&& headOperator != null //邊的上游節點對應的運算元不為null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //邊兩端節點有相同的槽共享組名稱
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //邊下游運算元的連結策略為ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)//上游運算元的連結策略為HEAD或者ALWAYS
&& (edge.getPartitioner() instanceof ForwardPartitioner) //邊的分割槽器型別是ForwardPartitioner
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //上下游節點的並行度相等
&& streamGraph.isChainingEnabled(); //當前的streamGraph允許連結的
在createChain中會呼叫createJobVertex為連結頭節點或者無法連結的節點建立JobVertex物件,建立完成之後會將它加入JobGraph併為當前的這個JobVertex建立流配置物件(StreamConfig)。
對於無法連結的物理邊,Flink會將鏈頭(chain header)與這些物理邊(以及物理邊所連線著的目標運算元)進行連線(程式碼段中的connect方法),連線的過程也是建立JobEdge與IntermediateDataSet並跟它們建立關係的過程。
現在讓我們回到createJobGraph方法的上下文中來,在setChaining方法呼叫中找出了物理出邊以及從源到目的節點之間建立了連線。接著,會呼叫setPhysicalEdges從目標節點向源節點之間建立入邊的連線。
接下來,為相關的節點設定槽共享組(SlotSharingGroup)以及同位組(CoLocationGroup),這兩種機制都用於限制運算元的部署。其中,CoLocationGroup主要用於迭代運算元的執行。
當用戶的Flink程式配置了檢查點資訊,那麼需要將檢查點相關的配置加入到JobGraph中去,這部分邏輯通過方法configureCheckpointing來完成,它將JobVertex劃分成三類:
- triggerVertices:儲存接收“觸發檢查點”訊息的JobVertex集合,當前只收集source頂點;
- ackVertices:收集需要應答檢查點訊息的JobVertex集合,當前收集所有的JobVertex;
- commitVertices:儲存接收“提交檢查點”訊息的JobVertex集合,當前收集所有JobVertex;
這些資訊都被封裝在JobSnapshottingSettings物件中,然後被儲存到JobGraph。
基本生成JobGraph的主要步驟就是這些。接下來,我們將分析批處理程式在優化器生成的OptimizedPlan的基礎之上如何生成的JobGraph。
優化後的計劃生成作業圖
分析完了流圖如何生成作業圖,下面我們來分析批處理程式經過優化後的計劃如何生成作業圖。其核心程式碼位於flink-clients模組下的ClusterClient類中的getJobGraph方法中:
JobGraphGenerator gen = new JobGraphGenerator(this.config);
job = gen.compileJobGraph((OptimizedPlan) optPlan);
這裡的JobGraphGenerator位於optimizer模組中(注意跟流處理中生成JobGraph的StreamingJobGraphGenerator進行區別),它用於將優化器優化後的OptimizedPlan編譯成JobGraph。編譯的過程不作任何決策與假設,也就是說作業最終如何被執行早已被優化器確定,而編譯也是在此基礎上做確定性的對映。
JobGraphGenerator實現了Visitor介面,因此它是一個遍歷器,遍歷的物件是計劃節點(PlanNode)。
關於遍歷器、計劃節點等更多的細節請參考“優化器”相關的文章。
compileJobGraph方法在內部呼叫OptimizedPlan的accept方法遍歷它,而遍歷訪問器就是JobGraphGenerator自身:
program.accept(this);
在OptimizedPlan中,accept會挨個在每個sink上呼叫accept:
public void accept(Visitor<PlanNode> visitor) {
for (SinkPlanNode node : this.dataSinks) {
node.accept(visitor);
}
}
批處理中的計劃是以sink作為起始點,然後通過遍歷訪問器逆向遍歷直至source。
從sink開始的逆向遍歷是符合特定的模式的:
public void accept(Visitor<PlanNode> visitor) {
//前置遍歷,如果返回值為true,才會進行更進一步的後續操作
if (visitor.preVisit(this)) {
//獲取到當前sink的輸入端繼續遍歷,該呼叫會引發遞迴呼叫
this.input.getSource().accept(visitor);
//獲得所有的廣播輸入通道,對所有的廣播輸入通道源進行遍歷
for (Channel broadcastInput : getBroadcastInputs()) {
broadcastInput.getSource().accept(visitor);
}
//進行後置遍歷
visitor.postVisit(this);
}
}
先來分析一下preVisit方法,它是遍歷時的“前進”方法,它會對要遍歷的PlanNode的具體型別進行列舉推斷,針對不同的型別為其建立對應的JobVertex物件,接著為JobVertex物件設定相關屬性,最後將其加入到一個公共的PlanNode與JobVertex的對映字典中去。
接下來是postVisit方法,它可以看成是遍歷時的“後退”方法,當在某個節點上呼叫到postVisit方法時,表明該節點的前任(從正常的source往sink方向)都已經遍歷完成。因此該方法在這裡用來將當前節點與其前任建立連線。
postVisit方法同樣會判斷節點的型別,特殊節點特殊處理。例如,如果節點的型別是IterationPlanNode,那麼它將立即遍歷迭代路徑中的節點。這裡有可能存在遞迴遍歷,所以使用了一個“棧”結構來儲存當前節點。
if (this.currentIteration != null) {
this.iterationStack.add(this.currentIteration);
}
this.currentIteration = (IterationPlanNode) node;
this.currentIteration.acceptForStepFunction(this);
if (this.iterationStack.isEmpty()) {
this.currentIteration = null;
} else {
this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1);
}
回到compileJobGraph方法的上下文中,在對OptimizedPlan進行遍歷之後,會對收集到的迭代節點進行處理。通過遍歷迭代描述符(IterationDescriptor)並判斷其代表的節點屬於哪種迭代型別來進行特定的處理:
for (IterationDescriptor iteration : this.iterations.values()) {
if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
finalizeBulkIteration(iteration);
} else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {
finalizeWorksetIteration(iteration);
} else {
throw new CompilerException();
}
}
到此,遍歷工作已經完成。下面會把連結任務的配置寫入其父節點(也就是容器節點)的配置中。接著新建JobGraph物件並進行一系列設定,比如新增JobVertex、為JobVertex設定SlotSharingGroup等。然後將之前註冊的快取檔案加入到Job的配置中,釋放相關資源後返回JobGraph物件。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)