spark任務之Task失敗監控
需求
spark應用程式中,只要task失敗就傳送郵件,並攜帶錯誤原因。 我的GitHub,猛戳我
背景
在spark程式中,task有失敗重試機制(根據 spark.task.maxFailures
配置,預設是4次),當task執行失敗時,並不會直接導致整個應用程式down掉,只有在重試了 spark.task.maxFailures
次後任然失敗的情況下才會使程式down掉。另外,spark on yarn模式還會受yarn的重試機制去重啟這個spark程式,根據 yarn.resourcemanager.am.max-attempts
配置(預設是2次)。
即使spark程式task失敗4次後,受yarn控制重啟後在第4次執行成功了,一切都好像沒有發生,我們只有通過spark的監控UI去看是否有失敗的task,若有還得去查詢看是哪個task由於什麼原因失敗了。基於以上原因,我們需要做個task失敗的監控,只要失敗就帶上錯誤原因通知我們,及時發現問題,促使我們的程式更加健壯。
捕獲Task失敗事件
順藤摸瓜,task在Executor中執行,跟蹤原始碼看task在失敗後都幹了啥?
1.在executor中task執行完不管成功與否都會向execBackend報告task的狀態;
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
2.在CoarseGrainedExecutorBackend中會向driver傳送StatusUpdate狀態變更資訊;
override def statusUpdate(taskId: Long, state: TaskState , data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
3.CoarseGrainedSchedulerBackend收到訊息後有呼叫了scheduler的方法;
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
......
4.由於程式碼繁瑣,列出了關鍵的幾行程式碼,巢狀呼叫關係,這裡最後向eventProcessLoop傳送了CompletionEvent事件;
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
taskSetManager.handleFailedTask(tid, taskState, reason)
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo))
5.在DAGSchedulerEventProcessLoop
處理方法中 handleTaskCompletion(event: CompletionEvent)
有著最為關鍵的一行程式碼,這裡listenerBus把task的狀態發了出去,凡是監聽了SparkListenerTaskEnd
的listener都可以獲取到對應的訊息,而且這個是帶了失敗的原因(event.reason)。其實第一遍走原始碼並沒有注意到前面提到的sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
方法,後面根據SparkUI的page頁面往回追溯才發現。
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
自定義監聽器
需要獲取到SparkListenerTaskEnd
事件,得繼承SparkListener
類並重寫onTaskEnd
方法,
在方法中獲取task失敗的reason,傳送郵件給對應的負責人。這樣我們就可以第一時間知道哪個task是以什麼原因失敗了。
import cn.i4.utils.MailUtil
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
class I4SparkAppListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// completion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val errorMessage: Option[String] =
taskEnd.reason match {
case kill: TaskKilled =>
Some(kill.toErrorString)
case e: ExceptionFailure =>
Some(e.toErrorString)
case e: TaskFailedReason =>
Some(e.toErrorString)
case _ => None
}
if (errorMessage.nonEmpty) {
if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
val args = Array("********@qq.com", "spark任務監控", errorMessage.get)
try {
MailUtil.sendMail(args)
} catch {
case e: Exception =>
}
}
}
}
}
}
注意這裡還需要在我們的spark程式中註冊好這個listener:
.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "cn.i4.monitor.streaming.I4SparkAppListener")
總結
這裡只是實現了一個小demo,可以做的更完善使之更通用,比如加上應用程式的名字、host、stageid、taskid等,單獨達成jia包放到classPath,並把該listener的註冊放到預設配置檔案中永久有效,只需控制enableSendEmailOnTaskFail
控制是否啟用。
我的GitHub,猛戳我