Flink原始碼系列——JobManager處理SubmitJob的過程
接《Flink原始碼系列——獲取JobGraph的過程》,在獲取到JobGraph後,客戶端會封裝一個SubmitJob訊息,並將其提交給JobManager,本文就接著分析,JobManager在收到SubmitJob訊息後,對其處理邏輯。JobManager是一個Actor,其對接受到的各種訊息的處理入口是handleMessage這個方法,其中對SubmitJob的處理入口如下:
override def handleMessage: Receive = {
…
case SubmitJob(jobGraph, listeningBehaviour) =>
val client = sender()
val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), jobGraph.getSessionTimeout)
submitJob(jobGraph, jobInfo)
…
}
這裡構造了一個JobInfo例項,其是用來儲存job的相關資訊的,如提交job的客戶端、客戶端監聽模式、任務提交的開始時間、會話超時時間、以及結束時間、耗時等資訊。
其中監聽模式有三種,三種模型下關心的訊息內容依次增加,解釋如下:
a、DETACHED —— 只關心job提交的確認訊息
b、EXECUTION_RESULT —— 還關心job的執行結果
c、EXECUTION_RESULT_AND_STATE_CHANGES —— 還關心job的狀態變化
然後就進入了真正的處理邏輯subminJob()方法中了,這個方法的程式碼稍微有點長,這裡就分段進行分析,另外submitJob這個方法除了上述的jobGraph和jobInfo兩個入參外,還有一個isRecovery的布林變數,預設值是false,用來標識當前處理的是否是一個job的恢復操作。這個邏輯根據jobGraph是否為null分為兩個大的分支,先看下jobGraph為null的情況,處理邏輯就是構造一個job提交異常的訊息,然後通知客戶端,告訴客戶端jobGraph不能為null。
jobInfo.notifyClients(
decorateMessage(JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(null, "JobGraph must not be null.")))))
重點還是分析jobGraph不為null的情況下的處理邏輯,這部分的邏輯也可以分為兩大部分。
a、根據jobGraph構建ExecutionGraph
b、對構建好的ExecutionGraph進行排程執行
在構建ExecutionGraph這部分,會進行一些初始化的工作,如果在這過程中,發生異常,會將初始化過程做的操作進行回滾操作。
1、構建ExecutionGraph
1.1、總覽
在開始ExecutionGraph的構建之前,會先獲取構建所需的引數,如下:
/** 將job所需jar相關資訊註冊到library管理器中,如果註冊失敗,則丟擲異常 */
try {
libraryCacheManager.registerJob(
jobGraph.getJobID, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths)
}
catch {
case t: Throwable =>
throw new JobSubmissionException(jobId,
"Cannot set up the user code libraries: " + t.getMessage, t)
}
/** 獲取使用者類載入器,如果獲取的類載入器為null,則丟擲異常 */
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
if (userCodeLoader == null) {
throw new JobSubmissionException(jobId,
"The user code class loader could not be initialized.")
}
/** 判斷{@code JobGraph}中的{@code StreamNode}的個數, 如果為0, 則說明是個空任務,丟擲異常 */
if (jobGraph.getNumberOfVertices == 0) {
throw new JobSubmissionException(jobId, "The given job is empty")
}
/** 優先採用JobGraph配置的重啟策略,如果沒有配置,則採用JobManager中配置的重啟策略 */
val restartStrategy =
Option(jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy())
.map(RestartStrategyFactory.createRestartStrategy)
.filter(p => p != null) match {
case Some(strategy) => strategy
case None => restartStrategyFactory.createRestartStrategy()
}
log.info(s"Using restart strategy $restartStrategy for $jobId.")
val jobMetrics = jobManagerMetricGroup.addJob(jobGraph)
/** 獲取註冊在排程器上的所有TaskManager例項的總的slot數量 */
val numSlots = scheduler.getTotalNumberOfSlots()
/** 針對jobID,看是否已經存在 ExecutionGraph,如果有,則直接獲取已有的,並將registerNewGraph標識為false */
val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
case Some((graph, currentJobInfo)) =>
executionGraph = graph
currentJobInfo.setLastActive()
false
case None =>
true
}
上面這段邏輯主要做一些準備工作,如jar包註冊,類載入器,重啟策略等,這些準備好之後,就可以開始ExecutionGraph的構建,呼叫如下:
/** 通過{@link JobGraph}構建出{@link ExecutionGraph} */
executionGraph = ExecutionGraphBuilder.buildGraph(
executionGraph,
jobGraph,
flinkConfiguration,
futureExecutor,
ioExecutor,
scheduler,
userCodeLoader,
checkpointRecoveryFactory,
Time.of(timeout.length, timeout.unit),
restartStrategy,
jobMetrics,
numSlots,
blobServer,
log.logger)
/** 如果還沒有註冊過, 則進行註冊 */
if (registerNewGraph) {
currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
}
/** 註冊job狀態變化監聽器 */
executionGraph.registerJobStatusListener(
new StatusListenerMessenger(self, leaderSessionID.orNull))
jobInfo.clients foreach {
/** 如果客戶端關心執行結果和狀態變化,則為客戶端在executiongraph中註冊相應的監聽器 */
case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
val listener = new StatusListenerMessenger(client, leaderSessionID.orNull)
executionGraph.registerExecutionListener(listener)
executionGraph.registerJobStatusListener(listener)
case _ => // 如果不關心,則什麼都不做
}
在ExecutionGraph構建好只有,就會設定相應的監聽器,用來監聽其後續的排程執行情況。
另外這段程式碼的執行會被整個的進行了try…catch,看下catch中的邏輯。
/** 如果異常, 則進行回收操作 */
case t: Throwable =>
log.error(s"Failed to submit job $jobId ($jobName)", t)
/** 進行jar包的註冊回滾 */
libraryCacheManager.unregisterJob(jobId)
blobServer.cleanupJob(jobId)
/** 移除上面註冊的graph */
currentJobs.remove(jobId)
/** 如果executionGraph不為null,還需要執行failGlobal操作 */
if (executionGraph != null) {
executionGraph.failGlobal(t)
}
/** 構建JobExecutionException移除 */
val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
t
} else {
new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
}
/** 通知客戶端,job失敗了 */
jobInfo.notifyClients(
decorateMessage(JobResultFailure(new SerializedThrowable(rt))))
/** 退出submitJob方法 */
return
可見catch中,主要進行一些回滾操作,這樣可以確保在出現異常的情況下,可以讓已經上傳的jar等被刪除掉。
1.2、ExecutionGraph
ExecutionGraph是JobGraph的並行模式,是基於JobGraph構建出來的,主要構建邏輯都在ExecutionGraphBuilder這個類中,而且該方法的建構函式是private的,且該類只有兩個static方法,buildGraph()和idToVertex(),而ExecutionGraph的構造邏輯都在buildGraph()方法中。
在buildGraph()方法中,先是對executionGraph進行一些基礎的設定,如果有需要,則對各個JobVertex進行初始化操作,然後就是將JobVertex轉化成ExecutionGraph中的元件,轉化成功後,則開始設定checkpoint相關的配置。
這裡主要JobVertex轉化的邏輯,程式碼如下:
/** 1、構建有序拓撲列表 */
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
/** 2、轉化JobVertex */
executionGraph.attachJobGraph(sortedTopology);
主要的轉換程式碼就兩行,先是將jobGraph中的所有的JobVertex,從資料來源開始的有序拓撲節點列表,然後就是將這個有序集合轉化到executionGraph中。
1.2.1 構建有序拓撲列表
有序拓撲列表的構建邏輯在JobGraph類中,如下:
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
/** 節點集合為空時,可以快速退出 */
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
/** 從source開始的,排好序的JobVertex列表 */
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
/** 還沒有進入sorted集合,等待排序的JobVertex集合,初始值就是JobGraph中所有JobVertex的集合 */
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
/** 找出資料來源節點,也就是那些沒有輸入的JobVertex,以及指向獨立資料集的JobVertex */
{
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
/** 如果該節點沒有任何輸入,則表示該節點是資料來源,新增到sorted集合,同時從remaining集合中移除 */
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
/** sorted集合中開始遍歷的起始位置,也就是從第一個元素開始遍歷 */
int startNodePos = 0;
/** 遍歷已經新增的節點,直到找出所有元素 */
while (!remaining.isEmpty()) {
/**
* 處理一個節點後,startNodePos就會加1,
* 如果startNodePos大於sorted的集合中元素個數,
* 則說明經過一次處理,並沒有找到新的JobVertex新增到sorted集合中,這表明在graph中存在了迴圈,這是不允許的
*/
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
/** 獲取當前要處理的JobVertex */
JobVertex current = sorted.get(startNodePos++);
/** 遍歷當前JobVertex的下游節點 */
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
上述邏輯就是首先從JobGraph的所有JobVertex集合中,找出所有的source節點,然後在從這些source節點開始,依次遍歷其下游節點,當一個節點的所有輸入都已經被新增到sorted集合中時,它自身就可以新增到sorted集合中了,同時從remining集合中移除。
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
/** 遍歷start節點的所有輸出中間資料集合 */
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
/** 對於每個中間資料集合,遍歷其所有的輸出JobEdge */
for (JobEdge edge : dataSet.getConsumers()) {
/** 如果一個節點的所有輸入節點都不在"remaining"集合中,則將這個節點新增到target集合中 */
/** 如果目標節點已經不在remaining集合中,則continue */
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
/** 一個JobVertex是否還有輸入節點在remaining集合中的標識 */
boolean hasNewPredecessors = false;
/**
* 如果節點v,其所有輸入節點都已經不在remaining集合中,
* 則說明其輸入節點都已經被新增到sorted列表,則hasNewPredecessors為false,
* 否則hasNewPredecessors的值為true,表示節點v還有輸入節點在remaining集合中。
*/
for (JobEdge e : v.getInputs()) {
/** 跳過上層迴圈中遍歷到的JobEdge,也就是edge變數 */
if (e == edge) {
continue;
}
/** 只要有一個輸入還在remaining集合中,說明當前它還不能新增到target集合,直接結束這層內迴圈 */
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
/**
* 如果節點v已經沒有輸入節點還在remaining集合中,則將節點v新增到sorted列表中,
* 同時從remaining集合中刪除,
* 然後開始遞迴遍歷節點v的下游節點。
*/
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}
對於具體的某個JobVertex的遍歷邏輯如上,詳見註釋。
1.2.2 JobVertex的轉化
在獲取了排序後的拓撲的JobVertex集合後,就可以開始將其轉換成ExecutionGraph中的ExecutionJobVertex。
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(), tasks.size(), intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
/** 依次順序遍歷排好序的JobVertex集合 */
for (JobVertex jobVertex : topologiallySorted) {
/** 對於ExecutionGraph來說,只要有一個不能停止的輸入源JobVertex,那ExecutionGraph就是不可停止的 */
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
/** 建立jobVertex對應的ExecutionJobVertex,其中的第三個構造引數1,就是預設的並行度 */
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcCallTimeout,
globalModVersion,
createTimestamp);
/** 將新建的ExecutionJobVertex例項, 與其前置處理器建立連線 */
ejv.connectToPredecessors(this.intermediateResults);
/** 將構建好的ejv,記錄下來,如果發現對一個的jobVertexID已經存在一個ExecutionJobVertex,則需要拋異常 */
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
/** 將這個ExecutionGraph中所有臨時結果IntermediateResult, 都儲存到intermediateResults這個map */
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
/** 將ejv按建立順序記錄下來 */
this.verticesInCreationOrder.add(ejv);
/** 統計所有ejv的並行度 */
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
terminationFuture = new CompletableFuture<>();
failoverStrategy.notifyNewVertices(newExecJobVertices);
}
上述的邏輯是比較清晰的,就是依次遍歷排好序的JobVertex集合,並構建相應的ExecutionJobVertex例項,並設定ExecutionGraph中的部分屬性。
在ExecutionJobVertex的建構函式中,會根據並行度,構造相應的ExecutionVertex陣列,該陣列的索引就是子任務的索引號;而在ExecutionVertex的建構函式中,會構造出一個Execution例項。
2、ExecutionGraph的排程執行
在前面的準備工作都完成,ExecutionGraph也構建好之後,接下來就可以對ExecutionGraph進行排程執行。這部分的操作是比較耗時的,所以整個被包在一個futrue中進行非同步執行。
a、如果isRecovery為true,則先進行恢復操作;
b、如果isRecovery為false,則進行checkpoint設定,並將jobGraph的相關資訊進備份操作。
上述兩步完成之後,則會通知客戶端,job已經提交成功了。
jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
1
接下來就是判斷當前JobManager是否是leader,如果是,則開始對executionGraph進行排程執行,如果不是leader,則告訴JobManager自身,去進行remove操作,邏輯如下:
/** 根據當前JobManager是否是leader,執行不同的操作 */
if (leaderElectionService.hasLeadership) {
log.info(s"Scheduling job $jobId ($jobName).")
/** executionGraph進行排程執行 */
executionGraph.scheduleForExecution()
} else {
/** 移除這個job */
self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
"this. I am not scheduling the job for execution.")
}
接下里就看下executionGraph的排程執行邏輯。
public void scheduleForExecution() throws JobException {
/** 將狀態從'CREATED’轉換為’RUNNING' */
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
/** 根據排程模式,執行不同的排程策略 */
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
scheduleLazy(slotProvider);
break;
case EAGER:
scheduleEager(slotProvider, scheduleAllocationTimeout);
break;
default:
throw new JobException("Schedule mode is invalid.");
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}
上述邏輯就是先將ExecutionGraph的狀態從’CREATED’轉換為’RUNNING’,狀態轉換成功,會給狀態監聽者傳送狀態變化的訊息,然後就根據排程的不同模式,進行不同的排程。排程模式分為兩種:
a、LAZY_FROM_SORUCES —— 該模式下,從source節點開始部署執行,成功後,在部署其下游節點,以此類推;
b、EAGER —— 該模式下,所有節點同時部署執行;
這裡繼續分析’EAGER’模式下的排程。
private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
/** 走到這裡了,需要再次確認下當前的狀態是否是'RUNNING' */
checkState(state == JobStatus.RUNNING, "job is not running currently");
/** 標識在無法立即獲取部署資源時,是否可以將部署任務入佇列 */
final boolean queued = allowQueuedScheduling;
/** 用來維護所有槽位申請的future */
final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
/** 獲取每個ExecutionJobGraph申請槽位的future */
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL);
allAllocationFutures.addAll(allocationFutures);
}
/** 將上面的所有future連線成一個future,只有所有的future都成功,才算成功,否則就是失敗的 */
final ConjunctFuture<Collection<Execution>> allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures);
/** 構建一個定時任務,用來檢查槽位分配是否超時 */
final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
@Override
public void run() {
int numTotal = allAllocationsComplete.getNumFuturesTotal();
int numComplete = allAllocationsComplete.getNumFuturesCompleted();
String message = "Could not allocate all requires slots within timeout of " +
timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
/** 如果超時,則以異常的方式結束分配 */
allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
}
}, timeout.getSize(), timeout.getUnit());
/** 根據槽位分配,進行非同步呼叫執行 */
allAllocationsComplete.handleAsync(
(Collection<Execution> executions, Throwable throwable) -> {
try {
/** 取消上面的超時檢查任務 */
timeoutCancelHandle.cancel(false);
if (throwable == null) {
/** 成功後去所需槽位, 現在開始部署 */
for (Execution execution : executions) {
execution.deploy();
}
}
else {
/** 丟擲異常, 讓異常控制代碼處理這個 */
throw throwable;
}
}
catch (Throwable t) {
failGlobal(ExceptionUtils.stripCompletionException(t));
}
return null;
},
futureExecutor);
}
整個處理邏輯分為兩大步驟:
a、先進行槽位的分配,獲取分配的future;
b、成功獲取槽位之後,進行部署,這步也是非同步的;
另外,在槽位分配上,加上了超時機制,如果達到設定時間,槽位還沒有分配好,則進行fail操作。
2.1、槽位的申請分配
槽位的申請分配邏輯如下:
public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
SlotProvider resourceProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
final ExecutionVertex[] vertices = this.taskVertices;
final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
/** 為ExecutionJobVertex中的每個Execution嘗試申請一個slot,並返回future */
for (int i = 0; i < vertices.length; i++) {
final Execution exec = vertices[i].getCurrentExecutionAttempt();
final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
resourceProvider,
queued,
locationPreferenceConstraint);
slots[i] = allocationFuture;
}
/** 很好, 我們請求到了所有的slots */
return Arrays.asList(slots);
}
上述邏輯就是為ExecutionJobVertex中的每個Execution申請一個slot,然後具體的申請邏輯,是放在Execution中的,繼續向下看。
public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) throws IllegalExecutionStateException {
checkNotNull(slotProvider);
/** 獲取在構建JobVertex時已經賦值好的SlotSharingGroup例項和CoLocationConstraint例項,如果有的話 */
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
/** 位置約束不為null, 而共享組為null, 這種情況是不可能出現的, 出現了肯定就是異常了 */
if (locationConstraint != null && sharingGroup == null) {
throw new IllegalStateException(
"Trying to schedule with co-location constraint but without slot sharing allowed.");
}
/** 只有狀態是 'CREATED' 時, 這個方法才能正常工作 */
if (transitionState(CREATED, SCHEDULED)) {
/** ScheduleUnit 例項就是在這裡構造出來的 */
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint);
/** 獲取當前任務分配槽位所在節點的"偏好位置集合",也就是分配時,優先考慮分配在這些節點上 */
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
return preferredLocationsFuture
.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) ->
/** 在獲取輸入節點的位置之後,將其作為偏好位置集合,基於這些偏好位置,申請分配一個slot */
slotProvider.allocateSlot(
toSchedule,
queued,
preferredLocations))
.thenApply(
(SimpleSlot slot) -> {
if (tryAssignResource(slot)) {
/** 如果slot分配成功,則返回這個future */
return this;
} else {
/** 釋放slot */
slot.releaseSlot();
throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
}
});
}
else {
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
上述的邏輯還是很清晰的,
a、將狀態從’CREATED’成功轉換成’SCHEDULED’;
b、根據LocationPreferenceConstraint的設定,為這個Execution指定優先分配槽位所在的TaskManager;
c、基於上述步驟獲取的偏好位置,進行slot分配;
d、在slot分配成功後,將slot設定給當前Execution,如果設定成功,則返回相應的slot,否則是否slot,然後丟擲異常。
其中LocationPreferenceConstraint有兩種取值:
a、ALL —— 需要確認其所有的輸入都已經分配好slot,然後基於其輸入所在的TaskManager,作為其偏好位置集合;
b、ANY —— 只考慮那些slot已經分配好的輸入所在的TaskManager,作為偏好位置集合;
某個Execution的偏好位置的計算邏輯,是先由其對應的ExecutionVertex基於所有輸入,獲取偏好位置集合,然後根據LocationPreferenceConstraint的策略不同,刪選出一個子集,作為這個Execution的偏好位置集合。
這裡就只看下ExecutionVertex基於輸入獲取偏好集合的邏輯。
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
// 如果沒有輸入,則返回空集合,否則,基於輸入連線確定偏好位置
if (inputEdges == null) {
return Collections.emptySet();
}
else {
Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
// 遍歷所有inputs
for (int i = 0; i < inputEdges.length; i++) {
inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
// 遍歷所有輸入源
for (int k = 0; k < sources.length; k++) {
// 查詢輸入源的分配slot
CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
inputLocations.add(locationFuture);
// 如果某個輸入源有太多的節點分佈,則不考慮這個輸入源的節點位置了
if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
inputLocations.clear();
break;
}
}
}
// 保留具有最少分佈位置的輸入的位置
if (locations.isEmpty() || // 當前還沒有分配的位置
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// 當前的輸入具有更少的偏好位置
locations.clear();
locations.addAll(inputLocations);
}
}
return locations.isEmpty() ? Collections.emptyList() : locations;
}
}
邏輯拆分如下:
a、如果沒有輸入源,則返回空集合,對於資料來源節點來說,就是返回空集合;
b、如果有輸入源,則對每個輸入源,都找出其所有分割槽所在的TaskManager的位置,如果某個輸入源的分割槽所在位置超過MAX_DISTINCT_LOCATIONS_TO_CONSIDER(預設值為8),則不考慮這個輸入源,直接跳過,然後將滿足條件的輸入源中,分割槽位置分佈做少的那個資料來源對應的TaskManager的位置集合,作為計算結果返回。
2.2、部署
在槽位分配成功後,就開始各個Execution的部署操作。
public void deploy() throws JobException {
final SimpleSlot slot = assignedResource;
checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
/** 檢查slot是否alive */
if (!slot.isAlive()) {
throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}
/**
* 確保在正確的狀態的情況下進行部署呼叫
* 注意:從 CREATED to DEPLOYING 只是用來測試的
*/
ExecutionState previous = this.state;
if (previous == SCHEDULED || previous == CREATED) {
if (!transitionState(previous, DEPLOYING)) {
/**
* 競態條件,有人在部署呼叫上擊中我們了(其實就是衝突了)
* 這個在真實情況下不該發生,如果發生,則說明有地方發生衝突了
*/
throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
}
}
else {
// vertex 可能已經被取消了,或者已經被排程了
throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
}
try {
// 很好,走到這裡,說明我們被允許部署了
if (!slot.setExecutedVertex(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
// 雙重校驗,是我們 失敗/取消 ? 我們需要釋放這個slot?
if (this.state != DEPLOYING) {
slot.releaseSlot();
return;
}
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
attemptNumber, getAssignedResourceLocation().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskState,
attemptNumber);
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
/** 這裡就是將task提交到{@code TaskManager}的地方 */
final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
/** 根據提交結果進行處理,如果提交失敗,則進行fail處理 */
submitResultFuture.whenCompleteAsync(
(ack, failure) -> {
// 只處理失敗響應
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a timeout of " + timeout, failure));
} else {
markFailed(failure);
}
}
},
executor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
上述程式碼雖然很長,但是邏輯很簡明,先是做一系列的校驗工作,然後將狀態轉換為’DEPLOYING’,然後就是TaskDeploymentDescriptor例項,然後提交給相應的TaskManager例項,這裡是非同步的,如果執行失敗,則進行fail處理。
其中提交到TaskManager的訊息結構如下:
JobManagerMessages.LeaderSessionMessage[TaskMessages.SubmitTask[TaskDeploymentDescriptor]]。
---------------------
作者:混混fly
來源:CSDN
原文:https://blog.csdn.net/qq_21653785/article/details/79582489
版權宣告:本文為博主原創文章,轉載請附上博文連結!