1. 程式人生 > >Spark-原始碼-Spark-StartAll Master Worler啟動流程

Spark-原始碼-Spark-StartAll Master Worler啟動流程

Spark start-all>>



"""Master啟動流程"""


Master類
class Master(
    host: String,
    port: Int,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable







Master端
def main(){
	val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    actorSystem.awaitTermination()
}

Master端
def startSystemAndActor(System, Int, Int, Option[Int]) = {
	//呼叫AkkaUtils建立ActorSystem
	val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
	  securityManager = securityMgr)
	//建立屬於Master的actor, 在建立actor的同時, 會使用classOf[Master]初始化Master
	val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
}

Master端
"""初始化Master時由於Master繼承了 trait Actor 重寫了preStart方法, 
Actor的初始化會啟動preStart方法 因此找到Master的 override def preStart()
preStart屬於生命週期方法, 在構造器之後, receiver之前"""
override def preStart() {
	// 啟動一個定時器, 定時檢查超時的Worker, WORKER_TIMEOUT:每六十秒檢查一次, 
	// self:先對著自己來一下(檢查)試試
	context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
 	// 呼叫 timeOutDeadWorkers() 方法,
 	override def receiveWithLogging = { 
	 	case CheckForWorkerTimeOut => {
	      timeOutDeadWorkers()
	    }
	}

	// 用來檢查並移除所有超時的workers
	def timeOutDeadWorkers(){
		// 事實上是移除了一個存有WorkInfo的HashSet[WrokInfo]中的物件
		val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
		for (worker <- toRemove) {
	      if (worker.state != WorkerState.DEAD) {
	        removeWorker(worker)
	      }
	    }
	}

	def removeWorker(worker: WorkerInfo){
		// 刪除記憶體裡的workInfo
		idToWorker -= worker.id
	    addressToWorker -= worker.endpoint.address
	}
}
    
"""之後執行receive方法(1.3版本), 在後來的1.6版本中叫 def receive: PartialFunction[Any, Unit]"""
Master端
override def receiveWithLogging () {}
會不斷的接收actor傳送過來的請求




"""Worker啟動流程"""

Worker類
class Worker(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterAkkaUrls: Array[String],
    actorSystemName: String,
    actorName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends Actor

def preStart() => {
  registerWithMaster()
}

// 向Master註冊的方法
def registerWithMaster() {
  	// 向所有的Master註冊Worker
  	tryRegisterAllMasters()
  	
  	// 其中內容
  	def tryRegisterAllMasters()=>{
	  	// 通過Master的Url獲取Master的actor
		val actor = context.actorSelection(masterAkkaUrl)
		// 向Master傳送註冊資訊
	    actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
	}	
 }

Master端
// 接收Worker傳送的註冊資訊
override def receiveWithLogging = {
	case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>{
		// 判斷是否是StandBy狀態, doNothing
		idToWorker.contains(id), 已經註冊過, doNothing
		
		正常情況下(Active狀態, 且沒有註冊過):{
			// 把傳送來的 WorkerInfo 新增到 Master的 WorkerInfo中
			val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress)
		}
		// 如果將Worker Info存入記憶體成功, 則呼叫持久化引擎, 將資訊存入磁碟中, 
		// 目的是防止資料丟失. 如果Master宕機, 記憶體中會丟失資料, 
		// 切換狀態(Standby和Active)後, 需要切換的節點拿不到WorkerInfo, Worker會再次註冊, 非常消耗資源, 存在磁碟則可以直接去磁碟拿取資料不需要重新註冊
		if (registerWorker(worker)) {
	      persistenceEngine.addWorker(worker)
	      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
	      schedule()
	    }

		// 向worker響應註冊成功資訊
		sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
		// 開始排程資源, 排程資源不僅僅是叢集啟動的時候調動資源, 執行Job的時候也會排程資源, 其有兩種方式 一種是儘量分散, 一種是儘量集中
		schedule()
	}
}

Worker端
// 接收註冊成功的資訊, 其實是將 Active Master 的Url和rWebUiUrl傳回並更新, 之後向他傳送心跳~
def receiveWithLogging() = {
	case RegisteredWorker(masterUrl, masterWebUiUrl) =>{
		//更新MasterUrl
		changeMaster(masterUrl, masterWebUiUrl)
		//向Master傳送心跳資訊, HEARTBEAT_MILLIS =15秒, 每十五秒傳送一次心跳資訊, 傳送邏輯為 SendHeartbeat
  		context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
	}

  	//向Master傳送心跳資訊, 實際上是將自己的WorkerId傳送給Master
  	case SendHeartbeat =>
    	if (connected) { master ! Heartbeat(workerId) }
}

Master端 
def receiveWithLogging() = {
	case Heartbeat(workerId) => {
		//正常情況下, 更新上次心跳時間
		workerInfo.lastHeartbeat = System.currentTimeMillis()
		//啟動完成
	}
}