1. 程式人生 > >yarn client中的一個BUG的修復

yarn client中的一個BUG的修復

org.apache.spark.deploy.yarn.Client.scala中的monitorApplication方法:

/**

   * Report the state of an application until it has exited, either successfully or

   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,

   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,

   * or KILLED).

   *

   * @param appId ID of the application to monitor.

   * @param returnOnRunning Whether to also return the application state when it is RUNNING.

   * @param logApplicationReport Whether to log details of the application report every iteration.

   * @return A pair of the yarn application state and the final application state.

   
*/ def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null
while (true) { Thread.sleep(interval) val report: ApplicationReport = try { getApplicationReport(appId) } catch { case e: ApplicationNotFoundException => logError(s"Application $appId not found.") return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
case NonFatal(e) => logError(s"Failed to contact YARN for application $appId.", e) return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) } val state = report.getYarnApplicationState if (logApplicationReport) { logInfo(s"Application report for $appId (state: $state)") // If DEBUG is enabled, log report details every iteration // Otherwise, log them every time the application changes state if (log.isDebugEnabled) { logDebug(formatReportDetails(report)) } else if (lastState != state) { logInfo(formatReportDetails(report)) } } if (lastState != state) { state match { case YarnApplicationState.RUNNING => reportLauncherState(SparkAppHandle.State.RUNNING) case YarnApplicationState.FINISHED => // reportLauncherState(SparkAppHandle.State.FINISHED) report.getFinalApplicationStatus match { case FinalApplicationStatus.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case FinalApplicationStatus.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => reportLauncherState(SparkAppHandle.State.FINISHED) } case YarnApplicationState.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case YarnApplicationState.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => } } if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { cleanupStagingDir(appId) return (state, report.getFinalApplicationStatus) } if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") }


其中:

      if (lastState != state) {

        state match {

          case YarnApplicationState.RUNNING =>

            reportLauncherState(SparkAppHandle.State.RUNNING)

          case YarnApplicationState.FINISHED =>

//            reportLauncherState(SparkAppHandle.State.FINISHED)

            report.getFinalApplicationStatus match {

              case FinalApplicationStatus.FAILED =>

                reportLauncherState(SparkAppHandle.State.FAILED)

              case FinalApplicationStatus.KILLED =>

                reportLauncherState(SparkAppHandle.State.KILLED)

              case _ =>

                reportLauncherState(SparkAppHandle.State.FINISHED)

            }

          case YarnApplicationState.FAILED =>

            reportLauncherState(SparkAppHandle.State.FAILED)

          case YarnApplicationState.KILLED =>

            reportLauncherState(SparkAppHandle.State.KILLED)

          case _ =>

        }

      }

yarn state為finished的時候的狀態細分不夠明確,將原來的 reportLauncherState(SparkAppHandle.State.FAILED)註釋掉,改成:

report.getFinalApplicationStatus match {

              case FinalApplicationStatus.FAILED =>

                reportLauncherState(SparkAppHandle.State.FAILED)

              case FinalApplicationStatus.KILLED =>

                reportLauncherState(SparkAppHandle.State.KILLED)

              case _ =>

                reportLauncherState(SparkAppHandle.State.FINISHED)

            }

因為完成狀態的final state可能很多種狀態,KILLED、FAILED、SUCCESS都可能是final state。
如果只返回一個finished狀態給SparkLauncher的SparkAppHandle的話,其實我們在自己的程式碼中是無法知道這個spark 任務到底是成功了還是失敗了,只知道它完成了。
所以要細分一下完成狀態,自己用SparkLauncher提交JOB的時候可以監控JOB在失敗的時候報警。
此BUG在spark1.6.0中存在對應CDH5.7到CDH5.9的spark都有這個問題,新的版本中已經修復此BUG。
如果在使用CDH版本的spark,那麼就自己改一下程式碼重新編譯打包一下,部署一個自己的spark on yarn服務吧。