Spark-原始碼-Spark-StartAll Master Worler啟動流程
阿新 • • 發佈:2018-11-19
Spark start-all>> """Master啟動流程""" Master類 class Master( host: String, port: Int, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable Master端 def main(){ val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination() } Master端 def startSystemAndActor(System, Int, Int, Option[Int]) = { //呼叫AkkaUtils建立ActorSystem val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) //建立屬於Master的actor, 在建立actor的同時, 會使用classOf[Master]初始化Master val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) } Master端 """初始化Master時由於Master繼承了 trait Actor 重寫了preStart方法, Actor的初始化會啟動preStart方法 因此找到Master的 override def preStart() preStart屬於生命週期方法, 在構造器之後, receiver之前""" override def preStart() { // 啟動一個定時器, 定時檢查超時的Worker, WORKER_TIMEOUT:每六十秒檢查一次, // self:先對著自己來一下(檢查)試試 context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) // 呼叫 timeOutDeadWorkers() 方法, override def receiveWithLogging = { case CheckForWorkerTimeOut => { timeOutDeadWorkers() } } // 用來檢查並移除所有超時的workers def timeOutDeadWorkers(){ // 事實上是移除了一個存有WorkInfo的HashSet[WrokInfo]中的物件 val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray for (worker <- toRemove) { if (worker.state != WorkerState.DEAD) { removeWorker(worker) } } } def removeWorker(worker: WorkerInfo){ // 刪除記憶體裡的workInfo idToWorker -= worker.id addressToWorker -= worker.endpoint.address } } """之後執行receive方法(1.3版本), 在後來的1.6版本中叫 def receive: PartialFunction[Any, Unit]""" Master端 override def receiveWithLogging () {} 會不斷的接收actor傳送過來的請求 """Worker啟動流程""" Worker類 class Worker( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterAkkaUrls: Array[String], actorSystemName: String, actorName: String, workDirPath: String = null, val conf: SparkConf, val securityMgr: SecurityManager) extends Actor def preStart() => { registerWithMaster() } // 向Master註冊的方法 def registerWithMaster() { // 向所有的Master註冊Worker tryRegisterAllMasters() // 其中內容 def tryRegisterAllMasters()=>{ // 通過Master的Url獲取Master的actor val actor = context.actorSelection(masterAkkaUrl) // 向Master傳送註冊資訊 actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } } Master端 // 接收Worker傳送的註冊資訊 override def receiveWithLogging = { case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>{ // 判斷是否是StandBy狀態, doNothing idToWorker.contains(id), 已經註冊過, doNothing 正常情況下(Active狀態, 且沒有註冊過):{ // 把傳送來的 WorkerInfo 新增到 Master的 WorkerInfo中 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) } // 如果將Worker Info存入記憶體成功, 則呼叫持久化引擎, 將資訊存入磁碟中, // 目的是防止資料丟失. 如果Master宕機, 記憶體中會丟失資料, // 切換狀態(Standby和Active)後, 需要切換的節點拿不到WorkerInfo, Worker會再次註冊, 非常消耗資源, 存在磁碟則可以直接去磁碟拿取資料不需要重新註冊 if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } // 向worker響應註冊成功資訊 sender ! RegisteredWorker(masterUrl, masterWebUiUrl) // 開始排程資源, 排程資源不僅僅是叢集啟動的時候調動資源, 執行Job的時候也會排程資源, 其有兩種方式 一種是儘量分散, 一種是儘量集中 schedule() } } Worker端 // 接收註冊成功的資訊, 其實是將 Active Master 的Url和rWebUiUrl傳回並更新, 之後向他傳送心跳~ def receiveWithLogging() = { case RegisteredWorker(masterUrl, masterWebUiUrl) =>{ //更新MasterUrl changeMaster(masterUrl, masterWebUiUrl) //向Master傳送心跳資訊, HEARTBEAT_MILLIS =15秒, 每十五秒傳送一次心跳資訊, 傳送邏輯為 SendHeartbeat context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) } //向Master傳送心跳資訊, 實際上是將自己的WorkerId傳送給Master case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } } Master端 def receiveWithLogging() = { case Heartbeat(workerId) => { //正常情況下, 更新上次心跳時間 workerInfo.lastHeartbeat = System.currentTimeMillis() //啟動完成 } }