Alink漫談(二) : 從原始碼看機器學習平臺Alink設計和架構
阿新 • • 發佈:2020-05-10
# Alink漫談(二) : 從原始碼看機器學習平臺Alink設計和架構
[TOC]
## 0x00 摘要
Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習演算法平臺,是業界首個同時支援批式演算法、流式演算法的機器學習平臺。本文是漫談系列的第二篇,將從原始碼入手,帶領大傢俱體剖析Alink設計思想和架構為何。
因為Alink的公開資料太少,所以均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。
## 0x01 Alink設計原則
前文中 [Alink漫談(一) : 從KMeans演算法實現看Alink設計思想](https://www.cnblogs.com/rossiXYZ/p/12831175.html) 我們推測總結出Alink部分設計原則
- 演算法的歸演算法,Flink的歸Flink,儘量遮蔽AI演算法和Flink之間的聯絡。
- 採用最簡單,最常見的開發語言和思維方式。
- 儘量借鑑市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。
- 構建一套戰術打法(middleware或adapter),即遮蔽了Flink,又可以利用好Flink,還能讓使用者快速開發演算法。
下面我們就針對這些設計原則,從上至下看看Alink如何設計自己這套戰術打法。
為了能讓大家更好理解,先整理一個概要圖。因為Alink系統主要可以分成三個層面(頂層流水線, 中間層演算法元件, 底層迭代計算框架),再加上一個Flink runtime,所以下圖就是分別從這四個層面出發來看程式執行流程。
```java
如何看待 pipeline.fit(data).transform(data).print();
// 從頂層流水線角度看
訓練流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeans(Estimator)]
| // KMeans.fit之後,會生成一個KMeansModel用來轉換
|
轉換流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeansModel(Transformer)]
// 從中間層演算法元件角度看
訓練演算法元件 +-----> [MapBatchOp] -----> [KMeansTrainBatchOp]
| // VectorAssemblerMapper in MapBatchOp 是業務邏輯
|
轉換演算法元件 +-----> [MapBatchOp] -----> [ModelMapBatchOp]
// VectorAssemblerMapper in MapBatchOp 是業務邏輯
// KMeansModelMapper in ModelMapBatchOp 是業務邏輯
// 從底層迭代計算框架角度看
訓練by框架 +-----> [VectorAssemblerMapper] -----> [KMeansPreallocateCentroid / KMeansAssignCluster / AllReduce / KMeansUpdateCentroids in IterativeComQueue]
| // 對映到Flink的各種運算元進行訓練
|
轉換(直接) +-----> [VectorAssemblerMapper] -----> [KMeansModelMapper]
// 對映到Flink的各種運算元進行轉換
// 從Flink runtime角度看
訓練 +-----> map, mapPartiiton...
| // VectorAssemblerMapper.map等會被呼叫
|
轉換 +-----> map, mapPartiiton...
// 比如呼叫 KMeansModelMapper.map 來轉換
```
## 0x02 Alink例項程式碼
示例程式碼還是用之前的KMeans演算法部分模組。
### 演算法呼叫
```java
public class KMeansExample {
public static void main(String[] args) throws Exception {
......
BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);
VectorAssembler va = new VectorAssembler()
.setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
.setOutputCol("features");
KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
.setPredictionCol("prediction_result")
.setPredictionDetailCol("prediction_detail")
.setReservedCols("category")
.setMaxIter(100);
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();
}
}
```
### 演算法主函式
```java
public final class KMeansTrainBatchOp extends BatchO