Flink原始碼解析(standalone)之taskmanager啟動
1、簡單粗暴,flink-daemon.sh指令碼可知taskmanager執行類為:org.apache.flink.runtime.taskmanager.TaskManager
2、main方法裡面,最主要的就是啟動taskmanager
try { SecurityUtils.getInstalledContext.runSecured(new Callable[Unit] { override def call(): Unit = { //執行taskmanager,記住classOf[TaskManager],這是taksManagerActor的啟動類,生命週期方法在此類中 selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, classOf[TaskManager]) } }) }
3、selectNetworkInterfaceAndRunTaskManager裡面主要做了三件事:
a、建立高可用服務
b、給taskmanager分配主機、埠範圍
c、啟動taskmanager
def selectNetworkInterfaceAndRunTaskManager( configuration: Configuration, resourceID: ResourceID, taskManagerClass: Class[_ <: TaskManager]) : Unit = { val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, Executors.directExecutor(), AddressResolution.TRY_ADDRESS_RESOLUTION) //選擇網路介面和埠範圍 val (taskManagerHostname, actorSystemPortRange) = selectNetworkInterfaceAndPortRange( configuration, highAvailabilityServices) try { //啟動taksmanager runTaskManager( taskManagerHostname, resourceID, actorSystemPortRange, configuration, highAvailabilityServices, taskManagerClass) } finally { try { highAvailabilityServices.close() } catch { case t: Throwable => LOG.warn("Could not properly stop the high availability services.", t) } } }
4、進入runTaskManager方法,裡面主要是根據上面分配的埠範圍,找到可用的埠分配給taskmanager通訊使用,然後呼叫過載的runTaskManager方法啟動taskmanager
def runTaskManager( taskManagerHostname: String, resourceID: ResourceID, actorSystemPortRange: java.util.Iterator[Integer], configuration: Configuration, highAvailabilityServices: HighAvailabilityServices, taskManagerClass: Class[_ <: TaskManager]) : Unit = { //通過建立socket,找到可用的埠 val result = AkkaUtils.retryOnBindException({ // Try all ports in the range until successful val socket = NetUtils.createSocketFromPorts( actorSystemPortRange, new NetUtils.SocketFactory { override def createSocket(port: Int): ServerSocket = new ServerSocket( // Use the correct listening address, bound ports will only be // detected later by Akka. port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress)) }) val port = if (socket == null) { throw new BindException(s"Unable to allocate port for TaskManager.") } else { try { socket.getLocalPort() } finally { socket.close() } } runTaskManager( taskManagerHostname, resourceID, port, configuration, highAvailabilityServices, taskManagerClass) }, { !actorSystemPortRange.hasNext }, 5000) result match { case scala.util.Failure(f) => throw f case _ => } }
5、進入過載的runTaskManager
5.1、建立一個taskManagerActorSystem
val taskManagerSystem = BootstrapTools.startActorSystem(
configuration,
taskManagerHostname,
actorSystemPort,
LOG.logger)
5.2、建立一個MetricRegistry,並啟動初始化服務
val metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration))
metricRegistry.startQueryService(taskManagerSystem, resourceID)
5.3、啟動taskmanager元件和taskmanagerActor
val taskManager = startTaskManagerComponentsAndActor(
configuration,
resourceID,
taskManagerSystem,
highAvailabilityServices,
metricRegistry,
taskManagerHostname,
Some(TaskExecutor.TASK_MANAGER_NAME),
localTaskManagerCommunication = false,
taskManagerClass)
5.3.1、啟動taskmanagerActor後,進入生命週期方法prestart,裡面主要就是啟動了一個檢索leader jobmanager的檢索器,因為是standalone模式,所以直接告知leader jobmanager地址
leaderRetrievalService.start(this)
//檢視StandaloneLeaderRetrievalService的start方法
public void start(LeaderRetrievalListener listener) {
checkNotNull(listener, "Listener must not be null.");
synchronized (startStopLock) {
checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
started = true;
// 直接通知監聽器,告知leader jobmanager地址
listener.notifyLeaderAddress(leaderAddress, leaderId);
}
}
5.3.2 進入taskmanager的notifyLeaderAddress方法,裡面給taskmanagerActor傳送了JobManagerLeaderAddress訊息
override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
}
5.3.3 進入taskmanagerActor的handleMessage方法,找到JobManagerLeaderAddress,處理邏輯如下:
1、如果taskmanager中已儲存的有leader jobmanager地址(即已經與一個leader jobmanager保持著連線),則先與舊的leader jobmanager斷開連線
2、觸發taskmanager到jobmanager中註冊
case JobManagerLeaderAddress(address, newLeaderSessionID) =>
handleJobManagerLeaderAddress(address, newLeaderSessionID)
private def handleJobManagerLeaderAddress(
newJobManagerAkkaURL: String,
leaderSessionID: UUID)
: Unit = {
currentJobManager match {
case Some(jm) =>
Option(newJobManagerAkkaURL) match {
case Some(newJMAkkaURL) =>
//與舊的leader jobmanager斷開連線
handleJobManagerDisconnect(s"JobManager $newJMAkkaURL was elected as leader.")
case None =>
handleJobManagerDisconnect(s"Old JobManager lost its leadership.")
}
case None =>
}
this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
this.leaderSessionID = Option(leaderSessionID)
if (this.leaderSessionID.isDefined) {
// 觸發taskmanager註冊
triggerTaskManagerRegistration()
}
}
5.3.4 給taskmanagerActor傳送一個註冊訊息TriggerTaskManagerRegistration
self ! decorateMessage(
TriggerTaskManagerRegistration(
jobManagerAkkaURL.get,
new FiniteDuration(
config.getInitialRegistrationPause().getSize(),
config.getInitialRegistrationPause().getUnit()),
deadline,
1,
currentRegistrationRun)
)
5.3.5 註冊邏輯:
case message: RegistrationMessage => handleRegistrationMessage(message)
5.3.5.1、如果已經註冊過,列印日誌
if (isConnected) {
// this may be the case, if we queue another attempt and
// in the meantime, the registration is acknowledged
log.debug(
"TaskManager was triggered to register at JobManager, but is already registered")
}
5.3.5.2、如果在指定直接內沒有註冊成功則放棄註冊
else if (deadline.exists(_.isOverdue())) {
// we failed to register in time. that means we should quit
log.error("Failed to register at the JobManager within the defined maximum " +
"connect time. Shutting down ...")
// terminate ourselves (hasta la vista)
self ! decorateMessage(PoisonPill)
}
5.3.5.3、向jobmanagerActor傳送註冊訊息
val jobManager = context.actorSelection(jobManagerURL)
jobManager ! decorateMessage(
RegisterTaskManager(
resourceID,
location,
resources,
numberOfSlots)
)
5.3.5.3.1 jobmanagerActor收到taskmanager的註冊訊息(jobmanager.handleMessage方法中),如果resourcemanager已經在jobmanager中註冊,則通知resourcemanager在給定的資源容器中啟動taskmanager(同步通訊),如果resourcemanager啟動正常,則回一個確認該taskmanager已經資源註冊的訊息
currentResourceManager match {
case Some(rm) =>
val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout)
future.onFailure {
case t: Throwable =>
t match {
case _: TimeoutException =>
log.info("Attempt to register resource at ResourceManager timed out. Retrying")
case _ =>
log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
}
self ! decorateMessage(
new ReconnectResourceManager(
rm,
currentResourceManagerConnectionId))
}(context.dispatcher)
case None =>
log.info("Task Manager Registration but not connected to ResourceManager")
}
5.3.5.3.2 如果已經註冊過了,則發訊息給taskmanagerActor,表示該taskmanager已經存在了
if (instanceManager.isRegistered(resourceId)) {
val instanceID = instanceManager.getRegisteredInstance(resourceId).getId
taskManager ! decorateMessage(
AlreadyRegistered(
instanceID,
blobServer.getPort))
}
5.3.5.3.3 如果沒有註冊過,則註冊,並返回確認註冊的訊息給taskmanagerActor
taskManager ! decorateMessage(
AcknowledgeRegistration(instanceID, blobServer.getPort))
5.3.5.3.3.1 taskmanagerActor在接收到反饋的訊息後主要做了幾件事:
1、啟動了BLOB快取
2、監聽jobmanager,在jobmanager掛掉後能及時知道
3、啟動和jobmanager直接的心跳機制
5.3.5.3.4 監聽改註冊的taskmanagerActor,taskmanager掛掉後能及時知道
context.watch(taskManager)
5.3.5.4 定義一個指定時間後註冊的定時排程任務,防止因為網路等原因沒有註冊上,類似遞迴操作,一直到註冊成功或者超過指定的註冊截止日期放棄為止。
val nextTimeout = (timeout * 2).min(new FiniteDuration(
config.getMaxRegistrationPause().toMilliseconds,
TimeUnit.MILLISECONDS))
// schedule a check to trigger a new registration attempt if not registered
// by the timeout
scheduledTaskManagerRegistration = Option(context.system.scheduler.scheduleOnce(
timeout,
self,
decorateMessage(TriggerTaskManagerRegistration(
jobManagerURL,
nextTimeout,
deadline,
attempt + 1,
registrationRun)
))(context.dispatcher))
5.4、啟動一個taskmanagerActor監測,在taskmanagerActor掛掉後kill掉JVM程序
taskManagerSystem.actorOf(
Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
"TaskManager_Process_Reaper")