Flink原始碼系列——TaskManager處理SubmitTask的過程
接《Flink原始碼系列——JobManager處理SubmitJob的過程》,在從JobManager中,將SubmitTask提交到TaskManager後,繼續分析TaskManager的處理邏輯。
TaskManager是個Actor,混入了LeaderSessionMessageFilter這個trait,所以在從JobManager接收到JobManagerMessages.LeaderSessionMessage[TaskMessages.SubmitTask[TaskDeploymentDescriptor]]這樣的一個封裝訊息後,會先在LeaderSessionMessageFilter這個trait的receive方法中,進行訊息的過濾,過濾邏輯如下:
abstract override def receive: Receive = {
case leaderMessage @ LeaderSessionMessage(msgID, msg) =>
leaderSessionID match {
case Some(leaderId) =>
if (leaderId.equals(msgID)) {
super.receive(msg)
} else {
handleDiscardedMessage(leaderId, leaderMessage)
}
case None =>
handleNoLeaderId(leaderMessage)
}
case msg: RequiresLeaderSessionID =>
throw new Exception(s"Received a message $msg without a leader session ID, even though" +
s" the message requires a leader session ID.")
case msg =>
super.receive(msg)
}
邏輯拆分如下:
a、接收到的是一個LeaderSessionMessage訊息
a.1、當前TaskManager中有leaderSessionID
a.1.1、TaskManager所屬的JobManager的sessionID和訊息中的sessionID相同,則呼叫父類的receive方法
a.1.2、兩個sessionID不同,則說明是一個過期訊息,忽視該訊息a.2、當前TaskManager沒有leaderSessionID,則列印個日誌,不做任何處理
b、接收到的是一個RequiresLeaderSessionID訊息,說明訊息需要leaderSessionID,但其又沒有封裝在LeaderSessionMessage中,屬於異常情況,丟擲異常
c、其他訊息,呼叫父類的receive方法
對於從JobManager接收到的上述訊息,經過上述處理邏輯後,就變成TaskMessages.SubmitTask[TaskDeploymentDescriptor],並作為handleMessage方法的入參,SubmitTask是TaskMessage的子類,所以在handleMessage中的處理邏輯如下:
override def handleMessage: Receive = {
...
case message: TaskMessage => handleTaskMessage(message)
...
}
然後會就進入handleTaskMessage方法,如下:
private def handleTaskMessage(message: TaskMessage): Unit = {
...
case SubmitTask(tdd) => submitTask(tdd)
...
}
經過上述兩步轉化後,就會進入submitTask方法中,且入參就是TaskDeploymentDescriptor。
submitTask()方法的程式碼很長,但是邏輯不復雜,分塊說明如下:
/** 獲取當前JobManager的actor */
val jobManagerActor = currentJobManager match {
case Some(jm) => jm
case None =>
throw new IllegalStateException("TaskManager is not associated with a JobManager.")
}
/** 獲取library快取管理器 */
val libCache = libraryCacheManager match {
case Some(manager) => manager
case None => throw new IllegalStateException("There is no valid library cache manager.")
}
/** 獲取blobCache */
val blobCache = this.blobCache match {
case Some(manager) => manager
case None => throw new IllegalStateException("There is no valid BLOB cache.")
}
/** 槽位編號校驗 */
val slot = tdd.getTargetSlotNumber
if (slot < 0 || slot >= numberOfSlots) {
throw new IllegalArgumentException(s"Target slot $slot does not exist on TaskManager.")
}
/** 獲取一些連結相關 */
val (checkpointResponder,
partitionStateChecker,
resultPartitionConsumableNotifier,
taskManagerConnection) = connectionUtils match {
case Some(x) => x
case None => throw new IllegalStateException("The connection utils have not been " +
"initialized.")
}
這部分邏輯就是獲取一些處理控制代碼,如果獲取不到,則丟擲異常,並校驗當前任務的槽位編號是否在有效範圍,以及一些連結資訊。
/** 構建JobManager的gateway */
val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
/** 部分資料可能由於量較大,不方便通過rpc傳輸,會先持久化,然後在這裡再載入回來 */
try {
tdd.loadBigData(blobCache.getPermanentBlobService);
} catch {
case e @ (_: IOException | _: ClassNotFoundException) =>
throw new IOException("Could not deserialize the job information.", e)
}
/** 獲取jobInformation */
val jobInformation = try {
tdd.getSerializedJobInformation.deserializeValue(getClass.getClassLoader)
} catch {
case e @ (_: IOException | _: ClassNotFoundException) =>
throw new IOException("Could not deserialize the job information.", e)
}
/** 校驗jobID資訊 */
if (tdd.getJobId != jobInformation.getJobId) {
throw new IOException(
"Inconsistent job ID information inside TaskDeploymentDescriptor (" +
tdd.getJobId + " vs. " + jobInformation.getJobId + ")")
}
/** 獲取taskInformation */
val taskInformation = try {
tdd.getSerializedTaskInformation.deserializeValue(getClass.getClassLoader)
} catch {
case [email protected](_: IOException | _: ClassNotFoundException) =>
throw new IOException("Could not deserialize the job vertex information.", e)
}
/** 統計相關 */
val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId,
jobInformation.getJobName,
taskInformation.getJobVertexId,
tdd.getExecutionAttemptId,
taskInformation.getTaskName,
tdd.getSubtaskIndex,
tdd.getAttemptNumber)
val inputSplitProvider = new TaskInputSplitProvider(
jobManagerGateway,
jobInformation.getJobId,
taskInformation.getJobVertexId,
tdd.getExecutionAttemptId,
new FiniteDuration(
config.getTimeout().getSize(),
config.getTimeout().getUnit()))
/** 構建task */
val task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId,
tdd.getAllocationId,
tdd.getSubtaskIndex,
tdd.getAttemptNumber,
tdd.getProducedPartitions,
tdd.getInputGates,
tdd.getTargetSlotNumber,
tdd.getTaskStateHandles,
memoryManager,
ioManager,
network,
bcVarManager,
taskManagerConnection,
inputSplitProvider,
checkpointResponder,
blobCache,
libCache,
fileCache,
config,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
context.dispatcher)
log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
上述邏輯還是在獲取各種資料,主要的目的根據以上獲取的變數,構建一個Task例項。
val execId = tdd.getExecutionAttemptId
// 將task新增到map
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
// 對於ID已經存在一個task,則恢復回來,並報告一個錯誤
runningTasks.put(execId, prevTask)
throw new IllegalStateException("TaskManager already contains a task for id " + execId)
}
// 一切都好,我們啟動task,讓它開始自己的初始化
task.startTaskThread()
sender ! decorateMessage(Acknowledge.get())
這裡的邏輯就是將新建的task加入到runningTasks這個map中,如果發現相同execID,已經存在執行的task,則先回滾,然後丟擲異常。
一切都執行順利的話,則啟動task,並給sender傳送一個ack訊息。
task的啟動,就是執行Task例項中的executingThread這個變量表示的執行緒。
public void startTaskThread() {
executingThread.start();
}
而executingThread這個變數的初始化是在Task的建構函式的最後進行的。
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
並且將Task例項自身作為其執行物件,而Task實現了Runnable介面,所以最後就是執行Task中的run()方法。
run方法的邏輯,先是進行狀態的初始化,就是進入一個while迴圈,根據當前狀態,執行不同的操作,有可能正常退出迴圈,進行向下執行,有可能直接reture,有可能丟擲異常,邏輯如下:
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
/** 如果是CREATED狀態, 則先將狀態轉換為DEPLOYING, 然後退出迴圈 */
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
/** 如果成功, 則說明我們可以開始啟動我們的work了 */
break;
}
}
else if (current == ExecutionState.FAILED) {
/** 如果當前狀態是FAILED, 則立即執行失敗操作, 告訴TaskManager, 我們已經到達最終狀態了, 然後直接返回 */
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
/** 如果是CANCELING狀態, 則告訴TaskManager, 我們到達最終狀態了, 然後直接返回 */
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
}
else {
/** 如果是其他狀態, 則丟擲異常 */
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
}
}
當從這個while迴圈正常退出後,繼續向下執行,就是一個try-catch-finally的結構。
這裡主要分析一下try塊中的邏輯。
1、任務引導
// activate safety net for task thread
LOG.info("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
blobService.getPermanentBlobService().registerJob(jobId);
/**
* 首先, 獲取一個 user-code 類載入器
* 這可能涉及下載作業的JAR檔案和/或類。
*/
LOG.info("Loading JAR files for task {}.", this);
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
if (executionConfig.getTaskCancellationInterval() >= 0) {
/** 嘗試取消task時, 兩次嘗試之間的時間間隔, 單位毫秒 */
taskCancellationInterval = executionConfig.getTaskCancellationInterval();
}
if (executionConfig.getTaskCancellationTimeout() >= 0) {
/** 取消任務的超時時間, 可以在flink的配置中覆蓋 */
taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
}
/**
* 例項化AbstractInvokable的具體子類
* {@see StreamGraph#addOperator}
* {@see StoppableSourceStreamTask}
* {@see SourceStreamTask}
* {@see OneInputStreamTask}
*/
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
/** 如果當前狀態'CANCELING'、'CANCELED'、'FAILED', 則丟擲異常 */
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
這部分就是載入jar包,超時時間等獲取,然後例項化AbstractInvokable的一個具體子類,目前主要是StoppableSourceStreamTask、SourceStreamTask、OneInputStreamTask 這三個子類。
並且會對狀態進行檢查,如果處於’CANCELING’、’CANCELED’、’FAILED’其中的一個狀態,則丟擲CancelTaskException異常。
2、相關注冊
LOG.info("Registering task at network: {}.", this);
network.registerTask(this);
// add metrics for buffers
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
// register detailed network metrics, if configured
if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
MetricGroup outputGroup = networkGroup.addGroup("Output");
MetricGroup inputGroup = networkGroup.addGroup("Input");
// output metrics
for (int i = 0; i < producedPartitions.length; i++) {
ResultPartitionMetrics.registerQueueLengthMetrics(
outputGroup.addGroup(i), producedPartitions[i]);
}
for (int i = 0; i < inputGates.length; i++) {
InputGateMetrics.registerQueueLengthMetrics(
inputGroup.addGroup(i), inputGates[i]);
}
}
/** 接下來, 啟動為分散式快取進行檔案的後臺拷貝 */
try {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
DistributedCache.readFileInfoFromConfig(jobConfiguration))
{
LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
throw new Exception(
String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
e);
}
/** 再次校驗狀態 */
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
這裡最後,也會進行狀態校驗,以便可以快速執行取消操作。
3、使用者程式碼初始化
TaskKvStateRegistry kvStateRegistry = network
.createKvStateTaskRegistry(jobId, getJobVertexId());
Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, taskInfo,
jobConfiguration, taskConfiguration, userCodeClassLoader,
memoryManager, ioManager, broadcastVariableManager,
accumulatorRegistry, kvStateRegistry, inputSplitProvider,
distributedCacheEntries, writers, inputGates,
checkpointResponder, taskManagerConfig, metrics, this);
/** 讓task程式碼建立它的readers和writers */
invokable.setEnvironment(env);
// the very last thing before the actual execution starts running is to inject
// the state into the task. the state is non-empty if this is an execution
// of a task that failed but had backuped state from a checkpoint
if (null != taskStateHandles) {
if (invokable instanceof StatefulTask) {
StatefulTask op = (StatefulTask) invokable;
op.setInitialState(taskStateHandles);
} else {
throw new IllegalStateException("Found operator state for a non-stateful task invokable");
}
// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
// we clear the reference to the state handle
//noinspection UnusedAssignment
taskStateHandles = null;
}
4、真正執行
/** 在我們將狀態切換到'RUNNING'狀態時, 我們可以方法cancel方法 */
this.invokable = invokable;
/** 將狀態從'DEPLOYING'切換到'RUNNING', 如果失敗, 已經是在同一時間, 發生了 canceled/failed 操作。 */
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
/** 告訴每個人, 我們切換到'RUNNING'狀態了 */
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
/** 設定執行緒上下文類載入器 */
executingThread.setContextClassLoader(userCodeClassLoader);
/** run,這裡就是真正開始執行處理邏輯的地方 */
invokable.invoke();
/** 確保, 如果task由於被取消而退出了invoke()方法, 我們可以進入catch邏輯塊 */
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
其中的 invokable.invoke() 這句程式碼就是真正邏輯開始執行的地方,且一般會阻塞在這裡,直至任務執行完成,或者被取消,發生異常等。
5、結尾
/** 完成生產資料分割槽。如果這裡失敗, 我們也任務執行失敗 */
for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
}
/**
* 嘗試將狀態從'RUNNING'修改為'FINISHED'
* 如果失敗, 那麼task是同一時間被執行了 canceled/failed 操作
*/
if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
notifyObservers(ExecutionState.FINISHED, null);
}
else {
throw new CancelTaskException();
}
這裡就是做收尾操作,以及把狀態從’RUNNING’轉換為’FINISHED’,並通知相關觀察者。
相關推薦
Flink原始碼系列——TaskManager處理SubmitTask的過程
接《Flink原始碼系列——JobManager處理SubmitJob的過程》,在從JobManager中,將SubmitTask提交到TaskManager後,繼續分析TaskManager的處理邏輯。 TaskManager是個Actor,混入了Leade
Flink原始碼系列——JobManager處理SubmitJob的過程
接《Flink原始碼系列——獲取JobGraph的過程》,在獲取到JobGraph後,客戶端會封裝一個SubmitJob訊息,並將其提交給JobManager,本文就接著分析,JobManager在收到SubmitJob訊息後,對其處理邏輯。JobManager是一個Acto
Flink 原始碼解析 —— TaskManager 處理 SubmitJob 的過程
TaskManager 處理 SubmitJob 的過程 https://t.zsxq.com/eu7mQZj 部落格 1、Flink 從0到1學習 —— Apache Flink 介紹 2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門 3、Flink
Flink原始碼系列——獲取StreamGraph的過程
接著《Flink原始碼系列——一個簡單的資料處理功能的實現過程》一文的結尾繼續分析,在完成對資料流的轉換操作之後,需要執行任務,這時會呼叫如下程式碼: env.execute("Socket Window WordCount"); 在StreamExecutionEnvir
Flink原始碼系列——獲取JobGraph的過程
接《Flink原始碼系列——獲取StreamGraph的過程》獲取到StreamGraph後,繼續分析,如果通過獲取到的StreamGraph來轉化為JobGraph。轉化邏輯在StreamingJobGraphGenerator這個類中,入口是createJobGraph(
Flink原始碼系列——Flink中一個簡單的資料處理功能的實現過程
在Flink中,實現從指定主機名和埠接收字串訊息,對接收到的字串中出現的各個單詞,每隔1秒鐘就輸出最近5秒內出現的各個單詞的統計次數。 程式碼實現如下: public class SocketWindowWordCount { public static void
SpringMVC原始碼--控制器Handler處理請求過程
DispatcherServlet類的doDispatch()方法中,真正去處理請求的關鍵步驟是: HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
Flink原始碼系列——指標監測
1、Metric簡介 Flink對於指標監測有一套自己的實現,指標的統計方式有四種,這些指標都實現了Metric這個介面,而Metric這個介面只是一個標識,本身並沒有定義如何方法介面,部分子類的繼承關係如下所示。 從圖中可以看出,Metric這個介面有
Flink原始碼解析(standalone)之taskmanager啟動
1、簡單粗暴,flink-daemon.sh指令碼可知taskmanager執行類為:org.apache.flink.runtime.taskmanager.TaskManager 2、main方法裡面,最主要的就是啟動taskmanager try {
Spark原始碼系列(九)Spark SQL初體驗之解析過程詳解
首先宣告一下這個版本的程式碼是1.1的,之前講的都是1.0的。 Spark支援兩種模式,一種是在spark裡面直接寫sql,可以通過sql來查詢物件,類似.net的LINQ一樣,另外一種支援hive的HQL。不管是哪種方式,下面提到的步驟都會有,不同的是具體的執行過程。下面
雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002
三、基於socket的wordcount 1.傳送資料 1.傳送資料命令 nc -lk 9999 2.傳送資料內容 good good study day day
【雲星資料---Apache Flink實戰系列(精品版)】:Apache Flink實戰基礎002--flink特性:流處理特性介紹
第二部分:flink的特性 一、流處理特性 1.高吞吐,低延時 有圖有真相,有比較有差距。且看下圖: 1.flink的吞吐量大 2.flink的延時低 3.flink的配置少
zookeeper原始碼 — 五、處理寫請求過程
目錄 處理寫請求總體過程 客戶端發起寫請求 follower和leader互動過程 follower傳送請求給客戶端 處理寫請求總體過程 zk為了保證分散式資料一致性,使用ZAB協議,在客戶端發起一次寫請求的時候時候,假設該請求請求到的是follower,follower不會直接處理這個請求,而是轉發給l
Flink 原始碼解析 —— Flink TaskManager 有什麼作用?
TaskManager 有什麼作用 <!--more--> https://t.zsxq.com/RZbu7yN 部落
【Yii系列】處理請求
入口 實現 官方 cookie this sender att 只需要 ota 緣起 這一章是Yii系列的第三章,前兩章給大夥講解了Yii2.0的安裝與Yii2.0的基本框架及基礎概念,傳送門: 【Yii2.0的安裝與調試】:http://www.cnblogs.com/r
SQL系列學習 存儲過程&事物語法
bsp ima unique reat 學習 tab soft 很多 存儲 /*學習事物基本語法*/ /*增加課室名的唯一索引*/ALTER table class add constraint uni_ClassName unique(name) /*創建存儲過程,其
Linux發行版CentOS系列系統的安裝過程
CentOS系列系統安裝步驟Linux系統CentOS發行版的安裝流程: 內核空間的引導啟動過程:POST(加電自檢) --> BootSequence(BIOS) 【MBR引導,順序啟動階段BootSequence】--> BootLoader(GRUB(stage1--stage1_5--st
Http請求處理整個過程
admin 轉發 速度 客戶端 OS 有效 施工 功能實現 。net 一,服務器接受http請求的實際處理過程 二,當客戶端將請求通過網絡傳送到服務器時,HTTP.SYS會在內核模式下實時監聽當前的http請求。Http.sys功能如下描述:
一次服務器被挖礦的處理解決過程
amp 命令 刪除 root密碼 pos 服務器 exc 感染 oot 內網一臺服務器cpu爆滿,第6感猜測中了挖礦病毒,以下為cpu爆滿監控圖表趕緊ssh進系統,top了下,一個./x3e536747 進程占用了大量的cpu,cpu load average超過了cpu內
Spark原始碼系列:RDD repartition、coalesce 對比
在上一篇文章中 Spark原始碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。 RDD重新分割槽的手段與Da