1. 程式人生 > >修改spark thriftserver返回日誌資訊

修改spark thriftserver返回日誌資訊

      通過spark thriftserver提交sql至spark執行時,由於thriftserver預設讀取的是本地的日誌檔案,因此getQueryLog會返回空,為了支援spark執行模式下返回任務執行資訊,對getQueryLog方法進行修改。

       首先在org.apache.hive.service.cli.operation.Operation類中新增介面:       

public String getStatementId() {return StringUtils.EMPTY;}

       在org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation

類中新增實現:

override def getStatementId: String = statementId

      該方法的作用是返回Job的groupId。

      其次新建一個SparkHiveLog類:

class SparkHiveLog extends SparkListener with Logging{

    private class SparkStageInfo(val stageId: Int, val totalTask: Int) {

        var completedTask = 0

        var status = "Running"
    }

    /**
     * 組對job列表對對映
      */
    private val jobGroupMap : Map[String, Set[Int]] = Map[String, Set[Int]]()

    /**
     * job對stage列表的對映
     */
    private val jobListMap : Map[Int, Seq[Int]] = Map[Int, Seq[Int]]()


     /**
      * stageId對應StageInfo
      */
    private val stageMap: Map[Int, SparkStageInfo] = Map[Int, SparkStageInfo]()


    private var taskCPUs = 1

    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
      val groupId = jobStart.properties.getProperty("spark.jobGroup.id")
      taskCPUs = jobStart.properties.getProperty("spark.task.cpus", "1").toInt

      jobListMap += (jobStart.jobId -> jobStart.stageIds)
      if(jobGroupMap.contains(groupId)){
          jobGroupMap.get(groupId).get.add(jobStart.jobId)
      }else{
          jobGroupMap += (groupId -> Set(jobStart.jobId))
      }
    }

    override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
       val stageId = stageSubmitted.stageInfo.stageId
       val numTasks = stageSubmitted.stageInfo.numTasks

       val stageInfo = new SparkStageInfo(stageId, numTasks)
       stageMap += (stageId -> stageInfo)
    }


    override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
        val stageId = stageCompleted.stageInfo.stageId
        if(stageMap.contains(stageId)){
            val stageInfo = stageMap.get(stageId).get
            stageInfo.status = "Completed"
        }
    }


    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        val stageId = taskEnd.stageId

        if(stageMap.contains(stageId)){
            val stageInfo = stageMap.get(stageId).get
            if(taskEnd.taskInfo.successful){
                val completeTask = stageInfo.completedTask
                stageInfo.completedTask = completeTask+1
            }
         }
    }

    /**
     * 獲取stage下任務進度資訊
     * @param stageId
     * @return
     */
    private def getStageInfo(stageId: Int) : String = {
       val sb = new StringBuffer("stage")
       if(!stageMap.contains(stageId)){
          return ""
        }
        val stageInfo = stageMap.get(stageId).get
        sb.append(stageId).append("(").append(stageInfo.completedTask).append("/")
        sb.append(stageInfo.totalTask).append(")")
        sb.toString
    }

    /**
      * 獲取job下個stage進度資訊
      * @param jobId
      * @return
      */
    private def getJobInfo(jobId: Int): String = {
       val sb = new StringBuffer("Job")
       val map = new util.TreeMap[Int, String]()
       if(jobListMap.contains(jobId)){
          for(stageId <- jobListMap.get(jobId).get){
              val stageInfo = getStageInfo(stageId)
              if(StringUtils.isNotEmpty(stageInfo)){
                  map.put(stageId, getStageInfo(stageId))
              }
          }
       }
       sb.append(jobId).append("/").append(formatString(map, "/"))
       sb.toString
    }

    /**
      * 查詢總的job進度資訊
      * @param groupId
      * @return
      */
    def getLogInfo(groupId: String) : String = {
        val sb = new StringBuffer()

    if(jobGroupMap.contains(groupId)){
        val map = new util.TreeMap[Int, String]()
        val iter = jobGroupMap.get(groupId).get.iterator
        while(iter.hasNext){
            val jobId = iter.next()
            map.put(jobId, getJobInfo(jobId))
        }
        sb.append(formatString(map, "; "))
     }
     sb.toString
    }

    private def formatString(map: util.TreeMap[Int, String], split: String): String = {
       val list = new util.ArrayList[String]()
       val iter = map.keySet().iterator()
       while(iter.hasNext){
          list.add(map.get(iter.next()))
       }
       StringUtils.join(list, split)
     }


    def clearCaches(groupId: String) : Unit = {
       if(jobGroupMap.contains(groupId)){
           val iter = jobGroupMap.get(groupId).get.iterator
           while(iter.hasNext){
              val jobId = iter.next()
              val iter1 = jobListMap.get(jobId).get.iterator
              while(iter1.hasNext){
                  stageMap.remove(iter1.next())
              }
              jobListMap.remove(jobId)
           }
           jobGroupMap.remove(groupId)
        }
    }

  }

 object SparkHiveLog {

    private var sparkHiveLog : SparkHiveLog = null

    def getSparkHiveLog() : SparkHiveLog = {
       if(sparkHiveLog==null){
           sparkHiveLog = new SparkHiveLog()
       }
       sparkHiveLog
     }
 }

     該類的主要作用是在onJobStart、onStageSubmitted、onTaskEnd階段儲存任務的資訊。

     在org.apache.hive.service.cli.operation.OperationManager類中新增介面:

public RowSet getQueryLog(OperationHandle opHandle){return null;}
     在org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager類中新增該介面的實現:
override def getQueryLog(opHandle: OperationHandle): RowSet = {

    val operation = handleToOperation.get(opHandle)
    val groupId = operation.getStatementId

    val schema = new Schema
    val fieldSchema = new FieldSchema
    fieldSchema.setName("operation_log")
    fieldSchema.setType("string")
    schema.addToFieldSchemas(fieldSchema)
    val tableSchema = new TableSchema(schema)

    val rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion)
    rowSet.addRow(Array(SparkHiveLog.getSparkHiveLog().getLogInfo(groupId)))
    rowSet
 }
    為了使getQueryLog方法生效,需要修改org.apache.hive.service.cli.session.HiveSessionImpl類中的fetchResults方法:
@Override
  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
  long maxRows, FetchType fetchType) throws HiveSQLException {
    acquire(true);
    try {
       if (fetchType == FetchType.QUERY_OUTPUT) {
          return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
       }
         return operationManager.getQueryLog(opHandle);
      } finally {
        release(true);
    }
  }
    同時重寫org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager類中的closeOperation方法,該方法的主要作用是在statement關閉時清空SparkHiveLog中儲存的任務執行資訊。
override def closeOperation(opHandle: OperationHandle): Unit = {

    val operation = handleToOperation.get(opHandle)
    val groupId = operation.getStatementId

    SparkHiveLog.getSparkHiveLog().clearCaches(groupId)
    operation.close()
    handleToOperation.remove(opHandle)
  }

    最後需要在org.apache.spark.sql.hive.thriftserver.HiveThriftServer2類中新增SparkHiveLog監聽:

SparkSQLEnv.sparkContext.addSparkListener(SparkHiveLog.getSparkHiveLog)

      重新編譯spark原始碼,替換spark-hive-thriftserver_2.11-2.x包,重啟thriftserver,通過HiveStatement執行sql程式碼時,可以呼叫getQueryLog返回執行日誌。