1. 程式人生 > >Alink漫談(五) : 迭代計算和Superstep

Alink漫談(五) : 迭代計算和Superstep

# Alink漫談(五) : 迭代計算和Superstep [TOC] ## 0x00 摘要 Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習演算法平臺,是業界首個同時支援批式演算法、流式演算法的機器學習平臺。迭代演算法在很多資料分析領域會用到,比如機器學習或者圖計算。本文將通過Superstep入手看看Alink是如何利用Flink迭代API來實現具體演算法。 因為Alink的公開資料太少,所以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。 ## 0x01 緣由 為什麼提到 Superstep 這個概念,是因為在擼KMeans程式碼的時候,發現幾個很奇怪的地方,比如以下三個步驟中,都用到了context.getStepNo(),而且會根據其數值的不同進行不同業務操作: ```java public class KMeansPreallocateCentroid extends ComputeFunction { public void calc(ComContext context) { LOG.info("liuhao KMeansPreallocateCentroid "); if (context.getStepNo() == 1) { /** 具體業務邏輯程式碼 * Allocate memory for pre-round centers and current centers. */ } } } public class KMeansAssignCluster extends ComputeFunction { public void calc(ComContext context) { ...... if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } /** 具體業務邏輯程式碼 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ } } public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } /** 具體業務邏輯程式碼 * Update the centroids based on the sum of points and point number belonging to the same cluster. */ } ``` 檢視ComContext的原始碼,發現stepNo的來源居然是`runtimeContext.getSuperstepNumber()`。 ```java public class ComContext { private final int taskId; private final int numTask; private final int stepNo; // 對,就是這裡 private final int sessionId; public ComContext(int sessionId, IterationRuntimeContext runtimeContext) { this.sessionId = sessionId; this.numTask = runtimeContext.getNumberOfParallelSubtasks(); this.taskId = runtimeContext.getIndexOfThisSubtask(); this.stepNo = runtimeContext.getSuperstepNumber(); // 這裡進行了變數初始化 } /** * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}. * @return iteration step number. */ public int getStepNo() { return stepNo; // 這裡是使用 } } ``` 看到這裡有的兄弟可能會虎軀一震,*這不是BSP模型的概念嘛。我就是想寫個KMeans演算法,怎麼除了MPI模型,還要考慮BSP模型*。下面就讓我們一步一步挖掘究竟Alink都做了什麼工作。 ## 0x02 背景概念 ### 2.1 四層執行圖 在 Flink 中的執行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖 - StreamGraph:Stream API 編寫的程式碼生成的最初的圖。用來表示程式的拓撲結構。 - JobGraph:StreamGraph 經過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的資料結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。JobGraph是唯一被Flink的資料流引擎所識別的表述作業的資料結構,也正是這一共同的抽象體現了流處理和批處理在執行時的統一。 - ExecutionGraph:JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的並行化版本,是排程層最核心的資料結構。 - 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。 ### 2.2 Task和SubTask 因為某種原因,Flink內部對這兩個概念的使用本身就有些混亂:在Task Manager裡這個subtask的概念由一個叫Task的類來實現。Task Manager裡談論的Task物件實際上對應的是ExecutionGraph裡的一個subtask。 所以這兩個概念需要理清楚。 - Task(任務) :Task對應JobGraph的一個節點,是一個運算元Operator。Task 是一個階段多個功能相同 subTask 的集合,類似於 Spark 中的 TaskSet。 - subTask(子任務) :subTask 是 Flink 中任務最小執行單元,是一個 Java 類的例項,這個 Java 類中有屬性和方法,完成具體的計算邏輯。在ExecutionGraph裡Task被分解為多個並行執行的subtask 。每個subtask作為一個excution分配到Task Manager裡執行。 - Operator Chains(運算元鏈) :沒有 shuffle 的多個運算元合併在一個 subTask 中,就形成了 Operator Chains,類似於 Spark 中的 Pipeline。Operator subTask 的數量指的就是運算元的並行度。同一程式的不同運算元也可能具有不同的並行度(因為可以通過 setParallelism() 方法來修改並行度)。 Flink 中的程式本質上是並行的。在執行期間,每一個運算元 Operator (Transformation)都有一個或多個運算元subTask(Operator SubTask),每個運算元的 subTask 之間都是彼此獨立,並在不同的執行緒中執行,並且可能在不同的機器或容器上執行。 Task( SubTask) 是一個Runnable 物件, Task Manager接受到TDD 後會用它例項化成一個Task物件, 並啟動一個執行緒執行Task的Run方法。 TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的資料結構。 他包含了關於Task的所有描述資訊。比如: - TaskInfo : 包含該Task 執行的java 類,該類是某個 AbstractInvokable的實現類 , 當然也是某個operator的實現類 (比如DataSourceTask, DataSinkTask, BatchTask,StreamTask 等)。 - IG描述 :通常包含一個或兩個InputGateDeploymentDescriptor(IGD)。 - 目標RP的描述: ParitionId, PartitionType, RS個數等等。 ### 2.3 如何劃分 Task 的依據 在以下情況下會重新劃分task - 並行度發生變化時 - keyBy() /window()/apply() 等發生 Rebalance 重新分配; - 呼叫 startNewChain() 方法,開啟一個新的運算元鏈; - 呼叫 diableChaining()方法,即:告訴當前運算元操作不使用 運算元鏈 操作。 比如有如下操作 ```scala Da