Spark原始碼學習(二)---Master和Worker的啟動以及Actor通訊流程
阿新 • • 發佈:2019-02-02
在《Spark原始碼學習(一)》中通過Spark的啟動指令碼,我們看到Spark啟動Master的時候實際上是啟動了org.apache.spark.deploy.master.Master,下面我們就從這2個類入手,通過閱讀Spark的原始碼,瞭解Spark的啟動流程。
(2)startSystemAndActor方法的關鍵程式碼:
(3)createActorSystem關鍵程式碼:
(5)Master的receiveWithLogging方法中:
表示Master接收到檢測超時訊息後的處理,通過檢視timeOutDeadWorkers的程式碼:會把超時的Work從記憶體中移除.(6)下面我們來看一下Worker是如何註冊的:org.apache.spark.deploy.worker.Worker中我們直接從preStart()方法看起:registerWithMaster()表示向Master傳送註冊訊息,關鍵程式碼:
向自身傳送一個ReregisterWithMaster訊息;---
(7)Master接收到RegisterWorker訊息,進行處理:
(8)Worker接收到RegisteredWorker註冊成功訊息:
(9)Master接收到Worker的心跳訊息:
(10)如果Worker接收到ReconnectWorker訊息,則進行重連:
以上就是Spark的Master和Worker的啟動以及Actor通訊的主體流程!
1,首先看一下org.apache.spark.deploy.master.Master:
(1)從Master的main方法開始:
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
(2)startSystemAndActor方法的關鍵程式碼:
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
(3)createActorSystem關鍵程式碼:
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
這個函式執行後,會返回一個ActorSystem和被繫結的埠
(4)在(2)中actorSystem.actorOf的引數classOf[Master]:相當於java中的Master.class,此時會呼叫Master的構造方法和生命週期方法;----preStart()方法:
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis,
self, CheckForWorkerTimeOut)
這句話是啟動一個定時器,向自身傳送一個CheckForWorkerTimeOut(檢測Worker超時)
訊息,通過檢視原始碼,CheckForWorkerTimeOut被定義在MasterMessages中,是一個
case object CheckForWorkerTimeOut
(5)Master的receiveWithLogging方法中:
case CheckForWorkerTimeOut => { timeOutDeadWorkers() }
表示Master接收到檢測超時訊息後的處理,通過檢視timeOutDeadWorkers的程式碼:會把超時的Work從記憶體中移除.(6)下面我們來看一下Worker是如何註冊的:org.apache.spark.deploy.worker.Worker中我們直接從preStart()方法看起:registerWithMaster()表示向Master傳送註冊訊息,關鍵程式碼:
registrationRetryTimer = Some {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
向自身傳送一個ReregisterWithMaster訊息;---
case ReregisterWithMaster =>
reregisterWithMaster()
---
master ! RegisterWorker(
workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
(7)Master接收到RegisterWorker訊息,進行處理:
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
......如果當前節點未註冊過,把節點資訊記錄到記憶體,並返回註冊成功訊息,否則返回註冊失敗
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
(8)Worker接收到RegisteredWorker註冊成功訊息:
case RegisteredWorker(
//更新Master的地址資訊並定時向自身傳送心跳訊息:
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis,
self, SendHeartbeat)
----自身接收到心跳資訊,判斷如果和Master是正常連線狀態,就向Master傳送一個心跳訊息:
case SendHeartbeat =>
if (connected) { master ! Heartbeat(workerId) }
(9)Master接收到Worker的心跳訊息:
case Heartbeat(workerId) => {
......如果記憶體中存在該Worker,則更新“最近一次連線成功時間”,否則向Worker傳送一個重連
訊息:
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
sender ! ReconnectWorker(masterUrl)
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
(10)如果Worker接收到ReconnectWorker訊息,則進行重連:
case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
registerWithMaster()
以上就是Spark的Master和Worker的啟動以及Actor通訊的主體流程!