[原始碼解析] Flink的Slot究竟是什麼?(2)
阿新 • • 發佈:2020-09-04
# [原始碼解析] Flink 的slot究竟是什麼?(2)
[ToC]
## 0x00 摘要
Flink的Slot概念大家應該都聽說過,但是可能很多朋友還不甚瞭解其中細節,比如具體Slot究竟代表什麼?在程式碼中如何實現?Slot在生成執行圖、排程、分配資源、部署、執行階段分別起到什麼作用?本文和上文將帶領大家一起分析原始碼,為你揭開Slot背後的機理。
## 0x01 前文回顧
書接上回 [[原始碼解析] Flink 的slot究竟是什麼?(1)](https://www.cnblogs.com/rossiXYZ/p/13554085.html)。前文中我們已經從系統架構和資料結構角度來分析了Slot,本文我們將從業務流程角度來分析Slot。我們重新放出系統架構圖
![](https://img2020.cnblogs.com/blog/1850883/202008/1850883-20200824153250938-547642498.png)
和資料結構邏輯關係圖
![](https://img2020.cnblogs.com/blog/1850883/202008/1850883-20200824153310106-751657724.png)
下面我們從幾個流程入手一一分析。
## 0x02 註冊/更新Slot
有兩個途徑會註冊Slot/更新Slot狀態。
- 當TaskExecutor註冊成功之後會和RM互動進行註冊時,一併註冊Slot;
- 定時心跳時,會在心跳payload中附加Slot狀態資訊;
### 2.1 TaskExecutor註冊成功
當TaskExecutor註冊成功之後會和RM互動進行註冊。會通過如下的程式碼呼叫路徑來向ResourceManager(SlotManagerImpl)註冊Slot。SlotManagerImpl 在獲取訊息之後,會更新Slot狀態,如果此時已經有如果有pendingSlotRequest,就直接分配,否則就更新freeSlots變數。
- TaskExecutor#establishResourceManagerConnection;
- TaskSlotTableImpl#createSlotReport;建立 report
- 這時候的 report如下:
```java
slotReport = {SlotReport@9633}
0 = {SlotStatus@8969} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
slotID = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
allocationID = null
jobID = null
1 = {SlotStatus@9638} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
slotID = {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1"
resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
allocationID = null
jobID = null
```
- ResourceManager#sendSlotReport;通過RPC(resourceManagerGateway.sendSlotReport)呼叫到RM
- SlotManagerImpl#registerTaskManager;把TaskManager註冊到SlotManager
- SlotManagerImpl#registerSlot;
- SlotManagerImpl#createAndRegisterTaskManagerSlot;生成註冊了TaskManagerSlot
- 這時候程式碼 & 變數如下,我們可以看到,就是把TM的Slot資訊註冊到SlotManager中:
```java
private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
final TaskManagerSlot slot = new TaskManagerSlot(
slotId, resourceProfile, taskManagerConnection);
slots.put(slotId, slot);
return slot;
}
slot = {TaskManagerSlot@13322}
slotId = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0"
resourceProfile = {ResourceProfile@4194}
cpuCores = {CPUResource@11616} "Resource(CPU: 89884656743115785...0)"
taskHeapMemory = {MemorySize@11617} "4611686018427387903 bytes"
taskOffHeapMemory = {MemorySize@11618} "4611686018427387903 bytes"
managedMemory = {MemorySize@11619} "64 mb"
networkMemory = {MemorySize@11620} "32 mb"
extendedResources = {HashMap@11621} size = 0
taskManagerConnection = {WorkerRegistration@11121}
allocationId = null
jobId = null
assignedSlotRequest = null
state = {TaskManagerSlot$State@13328} "FREE"
```
- SlotManagerImpl#updateSlot
- SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配
- SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變數
流程結束後,SlotManager如下,可以看到此時slots個數是兩個,freeSlots也是兩個,說明都是空閒的:
```java
this = {SlotManagerImpl@11120}
scheduledExecutor = {ActorSystemScheduledExecutorAdapter@11125}
slotRequestTimeout = {Time@11127} "300000 ms"
taskManagerTimeout = {Time@11128} "30000 ms"
slots = {HashMap@11122} size = 2
{SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206}
{SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322}
freeSlots = {LinkedHashMap@11129} size = 2
{SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322}
{SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206}
taskManagerRegistrations = {HashMap@11130} size = 1
fulfilledSlotRequests = {HashMap@11131} size = 0
pendingSlotRequests = {HashMap@11132} size = 0
pendingSlots = {HashMap@11133} size = 0
slotMatchingStrategy = {AnyMatchingSlotMatchingStrategy@11134} "INSTANCE"
slotRequestTimeoutCheck = {ActorSystemScheduledExecutorAdapter$ScheduledFutureTask@11139}
```
### 2.2 心跳機制更新Slot狀態
Flink的心跳機制也會被利用來進行Slots資訊的彙報,Slot Report被包括在心跳payload中。
首先在 TE 中建立Slot Report
- TaskExecutor#heartbeatFromResourceManager
- HeartbeatManagerImpl#requestHeartbeat
- TaskExecutor$ResourceManagerHeartbeatListener # retrievePayload
- TaskSlotTableImpl # createSlotReport
程式執行到 RM,於是 SlotManagerImpl 呼叫到 reportSlotStatus,進行Slot狀態更新。
- ResourceManager#heartbeatFromTaskManager
- HeartbeatManagerImpl#receiveHeartbeat
- ResourceManager$TaskManagerHeartbeatListener#reportPayload
- SlotManagerImpl#reportSlotStatus,此時的SlotReport如下:
- ```java
slotReport = {SlotReport@8718}
slotsStatus = {ArrayList@8717} size = 2
0 = {SlotStatus@9025} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
slotID = {SlotID@9032} "d99e16d7-a30c-4e21-b270-f82884b1813f_0"
resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
allocationID = null
jobID = null
1 = {SlotStatus@9026} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}"
slotID = {SlotID@9029} "d99e16d7-a30c-4e21-b270-f82884b1813f_1"
resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}"
allocationID = null
jobID = null
```
- SlotManagerImpl#updateSlot
- SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配
- SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變數
- ```java
freeSlots.put(freeSlot.getSlotId(), freeSlot);
```
## 0x03 生成ExecutionGraph階段
當Job提交之後,經過一系列處理,Scheduler會建立ExecutionGraph。ExecutionGraph 是 JobGraph 的並行版本。而通過一系列的分析,才可以最終把任務分發到相關的任務槽中。槽會根據CPU的數量提前指定出來,這樣可以最大限度的利用CPU的計算資源。如果Slot耗盡,也就意味著新分發的作業任務是無法執行的。
`ExecutionGraph`:`JobManager`根據`JobGraph`生成的分散式執行圖,是排程層最核心的資料結構。
一個JobVertex / ExecutionJobVertex代表的是一個operator,而具體的ExecutionVertex則代表了一個Task。
在生成StreamGraph時候,`StreamGraph.addOperator`方法就已經確定了operator是什麼型別,比如OneInputStreamTask,或者SourceStreamTask等。
假設`OneInputStreamTask.class`即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,最後一直傳到Task中。
本系列程式碼執行序列如下:
- JobMaster#createScheduler
- DefaultSchedulerFactory#createInstance
- DefaultScheduler#init
- SchedulerBase#init
- SchedulerBase#createAndRestoreExecutionGraph
- SchedulerBase#createExecutionGraph
- ExecutionGraphBuilder#buildGraph
- ExecutionGraph#attachJobGraph
- ExecutionJobVertex#init,這裡根據並行度來確定要建立多少個Task,即多少個ExecutionVertex。
- ```java
int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
this.taskVertices = new ExecutionVertex[numTaskVertices];
```
- ExecutionVertex#init,這裡會生成Execution。
- ```java
this.currentExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
this, 0, initialGlobalModVersion, createTimestamp, timeout);
```
## 0x04 排程階段
任務的流程就是通過作業分發到TaskManager,然後再分發到指定的Slot進行執行。
這部分排程階段的程式碼只是利用CompletableFuture把程式執行架構搭建起來,可以把認為是自頂之下進行操作 。
Job開始排程之後,程式碼執行序列如下:
- JobMaster#startJobExecution
- JobMaster#resetAndStartScheduler
- Future操作
- JobMaster#startScheduling
- SchedulerBase#startScheduling
- DefaultScheduler#startSchedulingInternal
- LazyFromSourcesSchedulingStrategy#startScheduling,這裡開始針對Vertices進行資源分配和部署
- ```java
allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());
```
- LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices,這裡會遍歷ExecutionVertex,篩選出Create狀態的 & 輸入Ready的節點。
- ```java
private void allocateSlotsAndDeployExecutionVertices(
final Iterable> vertices) {
// 取出狀態是CREATED,且輸入Ready的 ExecutionVertex
f