1. 程式人生 > 程式設計 >Apache Flink 進階(六):Flink 作業執行深度解析

Apache Flink 進階(六):Flink 作業執行深度解析

作者:嶽猛 整理:毛鶴 本文根據 Apache Flink 系列直播課程整理而成,由 Apache Flink Contributor、網易雲音樂實時計算平臺研發工程師嶽猛分享。主要分享內容為 Flink Job 執行作業的流程,文章將從兩個方面進行分享:一是如何從 Program 到物理執行計劃,二是生成物理執行計劃後該如何排程和執行。

Flink 四層轉化流程

Flink 有四層轉換流程,第一層為 Program 到 StreamGraph;第二層為 StreamGraph 到 JobGraph;第三層為 JobGraph 到 ExecutionGraph;第四層為 ExecutionGraph 到物理執行計劃。通過對 Program 的執行,能夠生成一個 DAG 執行圖,即邏輯執行圖。如下:

第一部分將先講解四層轉化的流程,然後將以詳細案例講解四層的具體轉化。
  • 第一層 StreamGraph 從 Source 節點開始,每一次 transform 生成一個 StreamNode,兩個 StreamNode 通過 StreamEdge 連線在一起,形成 StreamNode 和 StreamEdge 構成的DAG。
  • 第二層 JobGraph,依舊從 Source 節點開始,然後去遍歷尋找能夠嵌到一起的 operator,如果能夠嵌到一起則嵌到一起,不能嵌到一起的單獨生成 jobVertex,通過 JobEdge 連結上下游 JobVertex,最終形成 JobVertex 層面的 DAG。
  • JobVertex DAG 提交到任務以後,從 Source 節點開始排序,根據 JobVertex 生成ExecutionJobVertex,根據 jobVertex的IntermediateDataSet 構建IntermediateResult,然後 IntermediateResult 構建上下游的依賴關係,形成 ExecutionJobVertex 層面的 DAG 即 ExecutionGraph。
  • 最後通過 ExecutionGraph 層到物理執行層。

Program 到 StreamGraph 的轉化

Program 轉換成 StreamGraph 具體分為三步:
  • 從 StreamExecutionEnvironment.execute 開始執行程式,將 transform 新增到 StreamExecutionEnvironment 的 transformations。
  • 呼叫 StreamGraphGenerator 的 generateInternal 方法,遍歷 transformations 構建 StreamNode 及 StreamEage。
  • 通過 StreamEdge 連線 StreamNode。


通過 WindowWordCount 來看程式碼到 StreamGraph 的轉化,在 flatMap transform 設定 slot 共享組為 flatMap_sg,併發設定為 4,在聚合的操作中設定 slot 共享組為 sum_sg, sum() 和 counts() 併發設定為 3,這樣設定主要是為了演示後面如何嵌到一起的,跟上下游節點的併發以及上游的共享組有關。 WindowWordCount 程式碼中可以看到,在 readTextFile() 中會生成一個 transform,且 transform 的 ID 是 1;然後到 flatMap() 會生成一個 transform, transform 的 ID 是 2;接著到 keyBy() 生成一個 transform 的 ID 是 3;再到 sum() 生成一個 transform 的 ID 是 4;最後到 counts()生成 transform 的 ID 是 5。

transform 的結構如圖所示,第一個是 flatMap 的 transform,第二個是 window 的 transform,第三個是 SinkTransform 的 transform。除此之外,還能在 transform 的結構中看到每個 transform 的 input 是什麼。 接下來介紹一下 StreamNode 和 StreamEdge。
  • StreamNode 是用來描述 operator 的邏輯節點,其關鍵成員變數有 slotSharingGroup、jobVertexClass、inEdges、outEdges以及transformationUID;
  • StreamEdge 是用來描述兩個 operator 邏輯的連結邊,其關鍵變數有 sourceVertex、targetVertex。


WindowWordCount transform 到 StreamGraph 轉化如圖所示,StreamExecutionEnvironment 的 transformations 存在 3 個 transform,分別是 Flat Map(Id 2)、Window(Id 4)、Sink(Id 5)。 transform 的時候首先遞迴處理 transform 的 input,生成 StreamNode,然後通過 StreamEdge 連結上下游 StreamNode。需要注意的是,有些 transform 操作並不會生成StreamNode 如 PartitionTransformtion,而是生成個虛擬節點。

在轉換完成後可以看到,streamNodes 有四種 transform 形式,分別為 Source、Flat Map、Window、Sink。

每個 streamNode 物件都攜帶併發個數、slotSharingGroup、執行類等執行資訊。

StreamGraph 到 JobGraph 的轉化



StreamGraph 到 JobGraph 的轉化步驟:
  • 設定排程模式,Eager 所有節點立即啟動。
  • 廣度優先遍歷 StreamGraph,為每個 streamNode 生成 byte 陣列型別的 hash 值。
  • 從 source 節點開始遞迴尋找嵌到一起的 operator,不能嵌到一起的節點單獨生成 jobVertex,能夠嵌到一起的開始節點生成 jobVertex,其他節點以序列化的形式寫入到 StreamConfig,然後 merge 到 CHAINED_TASK_CONFIG,再通過 JobEdge 連結上下游 JobVertex。
  • 將每個 JobVertex 的入邊(StreamEdge)序列化到該 StreamConfig。
  • 根據 group name 為每個 JobVertext 指定 SlotSharingGroup。
  • 配置 checkpoint。
  • 將快取檔案存檔案的配置新增到 configuration 中。
  • 設定 ExecutionConfig。
從 source 節點遞迴尋找嵌到一起的 operator 中,嵌到一起需要滿足一定的條件,具體條件介紹如下:
  • 下游節點只有一個輸入。
  • 下游節點的操作符不為 null。
  • 上游節點的操作符不為 null。
  • 上下游節點在一個槽位共享組內。
  • 下游節點的連線策略是 ALWAYS。
  • 上游節點的連線策略是 HEAD 或者 ALWAYS。
  • edge 的分割槽函式是 ForwardPartitioner 的例項。
  • 上下游節點的並行度相等。
  • 可以進行節點連線操作。


JobGraph 物件結構如上圖所示,taskVertices 中只存在 Window、Flat Map、Source 三個 TaskVertex,Sink operator 被嵌到 window operator 中去了。

為什麼要為每個 operator 生成 hash 值?

Flink 任務失敗的時候,各個 operator 是能夠從 checkpoint 中恢復到失敗之前的狀態的,恢復的時候是依據 JobVertexID(hash 值)進行狀態恢復的。相同的任務在恢復的時候要求 operator 的 hash 值不變,因此能夠獲取對應的狀態。

每個 operator 是怎樣生成 hash 值的?

如果使用者對節點指定了一個雜湊值,則基於使用者指定的值能夠產生一個長度為 16 的位元組陣列。如果使用者沒有指定,則根據當前節點所處的位置,產生一個雜湊值。 考慮的因素主要有三點:
  • 一是在當前 StreamNode 之前已經處理過的節點的個數,作為當前 StreamNode 的 id,新增到 hasher 中;
  • 二是遍歷當前 StreamNode 輸出的每個 StreamEdge,並判斷當前 StreamNode 與這個 StreamEdge 的目標 StreamNode 是否可以進行連結,如果可以,則將目標 StreamNode 的 id 也放入 hasher 中,且這個目標 StreamNode 的 id 與當前 StreamNode 的 id 取相同的值;
  • 三是將上述步驟後產生的位元組資料,與當前 StreamNode 的所有輸入 StreamNode 對應的位元組資料,進行相應的位操作,最終得到的位元組資料,就是當前 StreamNode 對應的長度為 16 的位元組陣列。

JobGraph 到 ExexcutionGraph 以及物理執行計劃



JobGraph 到 ExexcutionGraph 以及物理執行計劃的流程:
  • 將 JobGraph 裡面的 jobVertex 從 Source 節點開始排序。
  • 在 executionGraph.attachJobGraph(sortedTopology)方法裡面,根據 JobVertex 生成 ExecutionJobVertex,在 ExecutionJobVertex 構造方法裡面,根據 jobVertex 的 IntermediateDataSet 構建 IntermediateResult,根據 jobVertex 併發構建 ExecutionVertex,ExecutionVertex 構建的時候,構建 IntermediateResultPartition(每一個 Execution 構建 IntermediateResult 數個IntermediateResultPartition );將建立的 ExecutionJobVertex 與前置的 IntermediateResult 連線起來。
  • 構建 ExecutionEdge ,連線到前面的 IntermediateResultPartition,最終從 ExecutionGraph 到物理執行計劃。

Flink Job 執行流程

Flink On Yarn 模式



基於 Yarn 層面的架構類似 Spark on Yarn 模式,都是由 Client 提交 App 到 RM 上面去執行,然後 RM 分配第一個 container 去執行 AM,然後由 AM 去負責資源的監督和管理。需要說明的是,Flink 的 Yarn 模式更加類似 Spark on Yarn 的 cluster 模式,在 cluster 模式中,dirver 將作為 AM 中的一個執行緒去執行。Flink on Yarn 模式也是會將 JobManager 啟動在 container 裡面,去做個 driver 類似的任務排程和分配,Yarn AM 與 Flink JobManager 在同一個 Container 中,這樣 AM 可以知道 Flink JobManager 的地址,從而 AM 可以申請 Container 去啟動 Flink TaskManager。待 Flink 成功執行在 Yarn 叢集上,Flink Yarn Client 就可以提交 Flink Job 到 Flink JobManager,並進行後續的對映、排程和計算處理。

Fink on Yarn 的缺陷

  • 資源分配是靜態的,一個作業需要在啟動時獲取所需的資源並且在它的生命週期裡一直持有這些資源。這導致了作業不能隨負載變化而動態調整,在負載下降時無法歸還空閒的資源,在負載上升時也無法動態擴充套件。
  • On-Yarn 模式下,所有的 container 都是固定大小的,導致無法根據作業需求來調整 container 的結構。譬如 CPU 密集的作業或許需要更多的核,但不需要太多記憶體,固定結構的 container 會導致記憶體被浪費。
  • 與容器管理基礎設施的互動比較笨拙,需要兩個步驟來啟動 Flink 作業: 1.啟動 Flink 守護程式;2.提交作業。如果作業被容器化並且將作業部署作為容器部署的一部分,那麼將不再需要步驟2。
  • On-Yarn 模式下,作業管理頁面會在作業完成後消失不可訪問。
  • Flink 推薦 per job clusters 的部署方式,但是又支援可以在一個叢集上執行多個作業的 session 模式,令人疑惑。
在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新設計裡引入的一個新概念。Dispatcher 會從 Client 端接受作業提交請求並代表它在叢集管理器上啟動作業。

引入 Dispatcher 的原因主要有兩點:

  • 第一,一些叢集管理器需要一箇中心化的作業生成和監控例項;
  • 第二,能夠實現 Standalone 模式下 JobManager 的角色,且等待作業提交。在一些案例中,Dispatcher 是可選的(Yarn)或者不相容的(kubernetes)。

資源排程模型重構下的 Flink On Yarn 模式



沒有 Dispatcher job 執行過程

客戶端提交 JobGraph 以及依賴 jar 包到 YarnResourceManager,接著 Yarn ResourceManager 分配第一個 container 以此來啟動 AppMaster,Application Master 中會啟動一個 FlinkResourceManager 以及 JobManager,JobManager 會根據 JobGraph 生成的 ExecutionGraph 以及物理執行計劃向 FlinkResourceManager 申請 slot,FlinkResoourceManager 會管理這些 slot 以及請求,如果沒有可用 slot 就向 Yarn 的 ResourceManager 申請 container,container 啟動以後會註冊到 FlinkResourceManager,最後 JobManager 會將 subTask deploy 到對應 container 的 slot 中去。

在有 Dispatcher 的模式下

會增加一個過程,就是 Client 會直接通過 HTTP Server 的方式,然後用 Dispatcher 將這個任務提交到 Yarn ResourceManager 中。 新框架具有四大優勢,詳情如下:
  • client 直接在 Yarn 上啟動作業,而不需要先啟動一個叢集然後再提交作業到叢集。因此 client 再提交作業後可以馬上返回。
  • 所有的使用者依賴庫和配置檔案都被直接放在應用的 classpath,而不是用動態的使用者程式碼 classloader 去載入。
  • container 在需要時才請求,不再使用時會被釋放。
  • “需要時申請”的 container 分配方式允許不同運算元使用不同 profile (CPU 和記憶體結構)的 container。

新的資源排程框架下 single cluster job on Yarn 流程介紹



single cluster job on Yarn 模式涉及三個例項物件:
  • clifrontend
  • Invoke App code;
  • 生成 StreamGraph,然後轉化為 JobGraph;

  • YarnJobClusterEntrypoint(Master)
  • 依次啟動 YarnResourceManager、MinDispatcher、JobManagerRunner 三者都服從分散式協同一致的策略;
  • JobManagerRunner 將 JobGraph 轉化為 ExecutionGraph ,然後轉化為物理執行任務Execution,然後進行 deploy,deploy 過程會向 YarnResourceManager 請求 slot,如果有直接 deploy 到對應的 YarnTaskExecutiontor 的 slot 裡面,沒有則向 Yarn 的 ResourceManager 申請,帶 container 啟動以後 deploy。

  • YarnTaskExecutorRunner (slave)
  • 負責接收 subTask,並執行。

整個任務執行程式碼呼叫流程如下圖:

subTask 在執行時是怎麼執行的?

呼叫 StreamTask 的 invoke 方法,執行步驟如下:
* initializeState()即operator的initializeState()
* openAllOperators() 即operator的open()方法
* 最後呼叫 run 方法來進行真正的任務處理複製程式碼
我們來看下 flatMap 對應的 OneInputStreamTask 的 run 方法具體是怎麼處理的。
@Override
    protected void run() throws Exception {
        // cache processor reference on the stack,to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }複製程式碼
最終是呼叫 StreamInputProcessor 的 processInput() 做資料的處理,這裡麵包含使用者的處理邏輯。
public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.",e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
                    //處理watermark
                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        //watermark處理邏輯,這裡可能引起timer的trigger
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(),currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),currentChannel);
                        continue;
                        //處理latency watermark
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        //使用者的真正的程式碼邏輯
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            //處理資料
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }
            
            //這裡會進行checkpoint barrier的判斷和對齊,以及不同partition 裡面checkpoint barrier不一致時候的,資料buffer,

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }複製程式碼
streamOperator.processElement(record) 最終會呼叫使用者的程式碼處理邏輯,假如 operator 是 StreamFlatMap 的話,
@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(),collector);//使用者程式碼
    }複製程式碼


原文連結 本文為雲棲社群原創內容,未經允許不得轉載。