Spark Master資源排程--SparkContext向所有master註冊
阿新 • • 發佈:2018-12-05
Spark Master資源排程–SparkContext向所有master註冊
更多資源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(彙總視訊線上看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube視訊分享
- Spark Master資源排程–SparkContext向所有master註冊 : https://youtu.be/AXxCnCc5Mh0
Bilibili視訊分享
- Spark Master資源排程–SparkContext向所有master註冊 :
SparkContext啟動向master傳送訊息
- ClientEndpoint向master傳送訊息: RegisterApplication
/** * Register with all masters asynchronously and returns an array `Future`s for cancellation. */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } }
master處理訊息RegisterApplication
- 建立 Application 並註冊到master上
- Application 儲存到 master 儲存引擎中
- 向driver傳送已註冊成功訊息: RegisteredApplication
case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() } }
- 過濾所有已註冊的Worker(狀態為ALIVE)
- 遍歷 waitingDrivers,如果有等待中的Drivers,給worker傳送啟動Driver訊息: LaunchDriver
- 呼叫在worker上啟動executor方法
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
- 過濾waitingApps,剛才註冊的Application已經在ArrayBuffer中
- 對已註冊的worker進行過濾
- 過濾條件狀態為ALIVE,可用cpu核心數大於等於每個executor的核心數,可用記憶體大於等於Application在每個executor需要的記憶體數
- 對可用worker進行排序(按可用核心數從大到小排序)
- 呼叫方法 scheduleExecutorsOnWorkers,worker給executor分配多少個cpu核心
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
- 進行具體的當前Application在Worker上給executor分配幾個cpu核心
/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
- 分配worker資源給executor
- 給worker傳送啟動executor訊息: LaunchExecutor
- 給driver傳送Executor已增加訊息:ExecutorAdded
/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}