1. 程式人生 > >spark任務之Task失敗監控

spark任務之Task失敗監控

需求

spark應用程式中,只要task失敗就傳送郵件,並攜帶錯誤原因。 我的GitHub,猛戳我

背景

在spark程式中,task有失敗重試機制(根據 spark.task.maxFailures 配置,預設是4次),當task執行失敗時,並不會直接導致整個應用程式down掉,只有在重試了 spark.task.maxFailures 次後任然失敗的情況下才會使程式down掉。另外,spark on yarn模式還會受yarn的重試機制去重啟這個spark程式,根據 yarn.resourcemanager.am.max-attempts 配置(預設是2次)。

即使spark程式task失敗4次後,受yarn控制重啟後在第4次執行成功了,一切都好像沒有發生,我們只有通過spark的監控UI去看是否有失敗的task,若有還得去查詢看是哪個task由於什麼原因失敗了。基於以上原因,我們需要做個task失敗的監控,只要失敗就帶上錯誤原因通知我們,及時發現問題,促使我們的程式更加健壯。

捕獲Task失敗事件

順藤摸瓜,task在Executor中執行,跟蹤原始碼看task在失敗後都幹了啥?

1.在executor中task執行完不管成功與否都會向execBackend報告task的狀態;

 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

2.在CoarseGrainedExecutorBackend中會向driver傳送StatusUpdate狀態變更資訊;

override def statusUpdate(taskId: Long, state: TaskState
, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }

3.CoarseGrainedSchedulerBackend收到訊息後有呼叫了scheduler的方法;

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        ......

4.由於程式碼繁瑣,列出了關鍵的幾行程式碼,巢狀呼叫關係,這裡最後向eventProcessLoop傳送了CompletionEvent事件;

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
taskSetManager.handleFailedTask(tid, taskState, reason)
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo)) 

5.在DAGSchedulerEventProcessLoop處理方法中 handleTaskCompletion(event: CompletionEvent)有著最為關鍵的一行程式碼,這裡listenerBus把task的狀態發了出去,凡是監聽了SparkListenerTaskEnd的listener都可以獲取到對應的訊息,而且這個是帶了失敗的原因(event.reason)。其實第一遍走原始碼並沒有注意到前面提到的sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)方法,後面根據SparkUI的page頁面往回追溯才發現。

 listenerBus.post(SparkListenerTaskEnd(
       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

自定義監聽器

需要獲取到SparkListenerTaskEnd事件,得繼承SparkListener類並重寫onTaskEnd方法,
在方法中獲取task失敗的reason,傳送郵件給對應的負責人。這樣我們就可以第一時間知道哪個task是以什麼原因失敗了。

import cn.i4.utils.MailUtil
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

class I4SparkAppListener(conf: SparkConf) extends SparkListener with Logging {

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    val info = taskEnd.taskInfo
    // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
    // completion event is for. Let's just drop it here. This means we might have some speculation
    // tasks on the web ui that's never marked as complete.
    if (info != null && taskEnd.stageAttemptId != -1) {
      val errorMessage: Option[String] =
        taskEnd.reason match {
          case kill: TaskKilled =>
            Some(kill.toErrorString)
          case e: ExceptionFailure =>
            Some(e.toErrorString)
          case e: TaskFailedReason =>
            Some(e.toErrorString)
          case _ => None
        }
      if (errorMessage.nonEmpty) {
        if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
          val args = Array("********@qq.com", "spark任務監控", errorMessage.get)
          try {
            MailUtil.sendMail(args)
          } catch {
            case e: Exception =>
          }
        }
      }
    }
  }
}

注意這裡還需要在我們的spark程式中註冊好這個listener:

.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "cn.i4.monitor.streaming.I4SparkAppListener")

總結

這裡只是實現了一個小demo,可以做的更完善使之更通用,比如加上應用程式的名字、host、stageid、taskid等,單獨達成jia包放到classPath,並把該listener的註冊放到預設配置檔案中永久有效,只需控制enableSendEmailOnTaskFail控制是否啟用。

我的GitHub,猛戳我