Spark資源調度
阿新 • • 發佈:2018-11-16
.sh 管理 方法 com -s serial boolean xtend tint
一:任務調度和資源調度的區別:
任務調度是指通過DAGScheduler,TaskScheduler,SchedulerBackend完成的job的調度
資源調度是指應用程序獲取資源的調度,他是通過schedule方法完成的
二:資源調度解密
因為master負責資源管理和調度,所以資源調度的方法schedule位於master.scala這個了類中,當註冊程序或者資源發生改變的都會導致schedule的調用,例如註冊程序的時候(包括worker,driver和application的註冊等,註意executor是向SparkDeploySchedulerBackend註冊的)
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don‘t send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
每當新的應用程序加入或者可用資源發生改變(比如exccutor或者worker增加或者減少的時候)的時候,該方法都會發生響應
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }//判斷Master的狀態是否為ALIVE,如果不是,則調度沒有任何意義
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
//將workers隨機化,有利於負載均衡
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {//判斷worker的狀態,只有Alive級別的worker才能參與資源的分配工作
for (driver <- waitingDrivers) {//循環遍歷等待中的driver,當然這裏指的是cluster模式,如果是client模式的話,driver就自動啟動了。
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//當worker的free內存和cpu比driver所需要的多的時候,將driver放到workers中隨機的一個worker,啟動driver
launchDriver(worker, driver)
waitingDrivers -= driver//將啟動的driver在等待隊列中移除。
}
}
}
startExecutorsOnWorkers()
}
schedule的代碼解析(簡單的就放在上面的代碼註釋裏了)
Random.shuffle(workers) 將worker在master緩存數據結構中的順序打亂
def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
val buf = new ArrayBuffer[T] ++= xs//構建一個臨時的緩沖數組
def swap(i1: Int, i2: Int) {//交換數組中指定下表的兩個元素
val tmp = buf(i1)
buf(i1) = buf(i2)
buf(i2) = tmp
}
for (n <- buf.length to 2 by -1) {//生成隨機數,並不停交換,打亂了數組中元素的順序
val k = nextInt(n)
swap(n - 1, k)
}
(bf(xs) ++= buf).result//返回隨機化的新集合(這裏就是workers的集合了)
}
2 waitingDrivers
private val waitingDrivers = new ArrayBuffer[DriverInfo]
可以看到這裏waitingDrivers是一個數據元素為DriverInfo的數組,DriverInfo包含了driver的信息startTime(啟動時間),id,desc(driver的描述信息),submitDate(提交日期)
private[deploy] class DriverInfo(
val startTime: Long,
val id: String,
val desc: DriverDescription,
val submitDate: Date)
extends Serializable {
其中描述信息包含了一下內容
private[deploy] case class DriverDescription(
jarUrl: String,//jar包地址
mem: Int,//內存信息
cores: Int,//CPU
supervise: Boolean//當spark-submit指定driver在cluster模式下運行的話如果設定了supervise,driver掛掉的時候回自動重啟,
command: Command) {//一些環境信息
override def toString: String = s"DriverDescription (${command.mainClass})"
}
3 launchDriver spark只有先啟動driver才能進行後面具體的調度
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)//表明driver運行的worker
driver.worker = Some(worker)//driver和worker的相互引用
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))//master通過遠程rpc發指令給worker,讓worker啟動driver。
driver.state = DriverState.RUNNING//啟動之後將driver的狀態轉為RUNNING
}
4 startExecutorsOnWorkers 先進先出的隊列方式進行簡單調度,spark默認啟動Executor的方式是FIFO的方式,只有前一個app滿足了資源分配的基礎上,才會為下一個應用程序分配資源
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {//為應用程序具體分配Executor之前會判斷當前應用程序是否還需要cores,
如果不需要則不會為應用程序分配Executor
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor//應用程序所需要的cores
// Filter out workers that don‘t have enough resources to launch an executor
//過濾掉不滿足條件的worker,條件為:worker的狀態必須是AlIVE的,worker的內存和cpu必須比每一個Executor所需要的大。
//過濾完之後,按照可用cores進行排序,並將大的放到前面,最優的最先使用。
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
//這裏采用spreadOutApps的方式來讓應用程序盡可能分散的運行在每一個Node上,這種方式往往能順便帶來更好的數據本地性,通常數據是分散的分布在各臺機器上,這種方式通常也是默認的。這方法返回的是每一個分配給每一個worker的cores的數組。具體的在分配cores的時候回盡可能的滿足當前所需的
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
//下面進行真正的分配Executors,Master通過遠程通信發指令給Worker來啟動ExecutorBackend進程,向driver發送ExecutorAdded通信。
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
Spark資源調度