spark慢task解決方法--推測式執行原理
概述
1、spark推測執行開啟,設定 spark.speculation=true即可
額外設定
1. spark.speculation.interval 100:檢測週期,單位毫秒;
2. spark.speculation.quantile 0.75:完成task的百分比時啟動推測;
3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啟動推測。
2、spark開啟推測執行的好處
推測執行是指對於一個Stage裡面執行慢的Task,會在其他節點的Executor上再次啟動這個task,如果其中一個Task例項執行成功則將這個最先完成的Task的計算結果作為最終結果,同時會幹掉其他Executor上執行的例項,從而加快執行速度
檢測是否有需要推測式執行的Task
在SparkContext建立了schedulerBackend和taskScheduler後,立即呼叫了taskScheduler 的start方法:
override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) } }
可以看到,TaskScheduler在啟動SchedulerBackend後,在非local模式前提下檢查推測式執行功能是否開啟(預設關閉,可通過spark.speculation開啟),若開啟則會啟動一個執行緒每隔SPECULATION_INTERVAL_MS(預設100ms,可通過spark.speculation.interval屬性設定)通過checkSpeculatableTasks方法檢測是否有需要推測式執行的tasks:
// Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false synchronized { shouldRevive = rootPool.checkSpeculatableTasks() } if (shouldRevive) { backend.reviveOffers() } }
然後又通過rootPool的方法判斷是否有需要推測式執行的tasks,若有則會呼叫SchedulerBackend的reviveOffers去嘗試拿資源執行推測任務。繼續看看檢測邏輯是什麼樣的:
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
shouldRevive
}
在rootPool裡又呼叫了schedulable的方法,schedulable是ConcurrentLinkedQueue[Schedulable]型別,佇列裡面放的都是TaskSetMagager,再看TaskSetMagager的checkSpeculatableTasks方法,終於找到檢測根源了:
override def checkSpeculatableTasks(): Boolean = {
// 如果task只有一個或者所有task都不需要再執行了就沒有必要再檢測
if (isZombie || numTasks == 1) {
return false
}
var foundTasks = false
// 所有task數 * SPECULATION_QUANTILE(預設0.75,可通過spark.speculation.quantile設定)
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
// 成功的task數是否超過總數的75%,並且成功的task是否大於0
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
// 過濾出成功執行的task的執行時間並排序
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
// 取這多個時間的中位數
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
// 中位數 * SPECULATION_MULTIPLIER (預設1.5,可通過spark.speculation.multiplier設定)
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
logDebug("Task length threshold for speculation: " + threshold)
// 遍歷該TaskSet中的task,取未成功執行、正在執行、執行時間已經大於threshold 、
// 推測式執行task列表中未包括的task放進需要推測式執行的列表中speculatableTasks
for ((tid, info) <- taskInfos) {
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
foundTasks = true
}
}
}
foundTasks
}
檢查邏輯程式碼中註釋很明白,當成功的Task數超過總Task數的75%(可通過引數spark.speculation.quantile設定)時,再統計所有成功的Tasks的執行時間,得到一箇中位數,用這個中位數乘以1.5(可通過引數spark.speculation.multiplier控制)得到執行時間門限,如果在執行的Tasks的執行時間超過這個門限,則對它啟用推測。簡單來說就是對那些拖慢整體進度的Tasks啟用推測,以加速整個Stage的執行。
演算法大致流程如圖:
推測式任務什麼時候被排程
在TaskSetMagager在延遲排程策略下為一個executor分配一個task時會呼叫dequeueTask方法:
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
......
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
該方法的最後一段就是在其他任務都被排程後為推測式任務進行排程,看看起實現:
protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
// 從推測式執行任務列表中移除已經成功完成的task,因為從檢測到排程之間還有一段時間,
// 某些task已經成功執行
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
// 判斷task是否可以在該executor對應的Host上執行,判斷條件是:
// task沒有在該host上執行;
// 該executor沒有在task的黑名單裡面(task在這個executor上失敗過,並還在'黑暗'時間內)
def canRunOnHost(index: Int): Boolean =
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
if (!speculatableTasks.isEmpty) {
// 獲取能在該executor上啟動的taskIndex
for (index <- speculatableTasks if canRunOnHost(index)) {
// 獲取task的優先位置
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_ match {
case e: ExecutorCacheTaskLocation => Some(e.executorId)
case _ => None
});
// 優先位置若為ExecutorCacheTaskLocation並且資料所在executor包含當前executor,
// 則返回其task在taskSet的index和Locality Levels
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
// 這裡的判斷是延遲排程的作用,即使是推測式任務也儘量以最好的本地性級別來啟動
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations.map(_.host)
if (locations.contains(host)) {
speculatableTasks -= index
return Some((index, TaskLocality.NODE_LOCAL))
}
}
}
........
}
None
}
程式碼太長只列了前面一部分,不過都是類似的邏輯,程式碼中註釋也很清晰。先過濾掉已經成功執行的task,另外,推測執行task不在和正在執行的task同一Host執行,不在黑名單executor裡執行,然後在延遲排程策略下根據task的優先位置來決定是否在該executor上以某種本地性級別被排程執行。