修改spark thriftserver返回日誌資訊
阿新 • • 發佈:2019-01-04
通過spark thriftserver提交sql至spark執行時,由於thriftserver預設讀取的是本地的日誌檔案,因此getQueryLog會返回空,為了支援spark執行模式下返回任務執行資訊,對getQueryLog方法進行修改。
首先在org.apache.hive.service.cli.operation.Operation類中新增介面:
public String getStatementId() {return StringUtils.EMPTY;}
在org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
override def getStatementId: String = statementId
該方法的作用是返回Job的groupId。
其次新建一個SparkHiveLog類:
class SparkHiveLog extends SparkListener with Logging{ private class SparkStageInfo(val stageId: Int, val totalTask: Int) { var completedTask = 0 var status = "Running" } /** * 組對job列表對對映 */ private val jobGroupMap : Map[String, Set[Int]] = Map[String, Set[Int]]() /** * job對stage列表的對映 */ private val jobListMap : Map[Int, Seq[Int]] = Map[Int, Seq[Int]]() /** * stageId對應StageInfo */ private val stageMap: Map[Int, SparkStageInfo] = Map[Int, SparkStageInfo]() private var taskCPUs = 1 override def onJobStart(jobStart: SparkListenerJobStart): Unit = { val groupId = jobStart.properties.getProperty("spark.jobGroup.id") taskCPUs = jobStart.properties.getProperty("spark.task.cpus", "1").toInt jobListMap += (jobStart.jobId -> jobStart.stageIds) if(jobGroupMap.contains(groupId)){ jobGroupMap.get(groupId).get.add(jobStart.jobId) }else{ jobGroupMap += (groupId -> Set(jobStart.jobId)) } } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageId = stageSubmitted.stageInfo.stageId val numTasks = stageSubmitted.stageInfo.numTasks val stageInfo = new SparkStageInfo(stageId, numTasks) stageMap += (stageId -> stageInfo) } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageId = stageCompleted.stageInfo.stageId if(stageMap.contains(stageId)){ val stageInfo = stageMap.get(stageId).get stageInfo.status = "Completed" } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val stageId = taskEnd.stageId if(stageMap.contains(stageId)){ val stageInfo = stageMap.get(stageId).get if(taskEnd.taskInfo.successful){ val completeTask = stageInfo.completedTask stageInfo.completedTask = completeTask+1 } } } /** * 獲取stage下任務進度資訊 * @param stageId * @return */ private def getStageInfo(stageId: Int) : String = { val sb = new StringBuffer("stage") if(!stageMap.contains(stageId)){ return "" } val stageInfo = stageMap.get(stageId).get sb.append(stageId).append("(").append(stageInfo.completedTask).append("/") sb.append(stageInfo.totalTask).append(")") sb.toString } /** * 獲取job下個stage進度資訊 * @param jobId * @return */ private def getJobInfo(jobId: Int): String = { val sb = new StringBuffer("Job") val map = new util.TreeMap[Int, String]() if(jobListMap.contains(jobId)){ for(stageId <- jobListMap.get(jobId).get){ val stageInfo = getStageInfo(stageId) if(StringUtils.isNotEmpty(stageInfo)){ map.put(stageId, getStageInfo(stageId)) } } } sb.append(jobId).append("/").append(formatString(map, "/")) sb.toString } /** * 查詢總的job進度資訊 * @param groupId * @return */ def getLogInfo(groupId: String) : String = { val sb = new StringBuffer() if(jobGroupMap.contains(groupId)){ val map = new util.TreeMap[Int, String]() val iter = jobGroupMap.get(groupId).get.iterator while(iter.hasNext){ val jobId = iter.next() map.put(jobId, getJobInfo(jobId)) } sb.append(formatString(map, "; ")) } sb.toString } private def formatString(map: util.TreeMap[Int, String], split: String): String = { val list = new util.ArrayList[String]() val iter = map.keySet().iterator() while(iter.hasNext){ list.add(map.get(iter.next())) } StringUtils.join(list, split) } def clearCaches(groupId: String) : Unit = { if(jobGroupMap.contains(groupId)){ val iter = jobGroupMap.get(groupId).get.iterator while(iter.hasNext){ val jobId = iter.next() val iter1 = jobListMap.get(jobId).get.iterator while(iter1.hasNext){ stageMap.remove(iter1.next()) } jobListMap.remove(jobId) } jobGroupMap.remove(groupId) } } } object SparkHiveLog { private var sparkHiveLog : SparkHiveLog = null def getSparkHiveLog() : SparkHiveLog = { if(sparkHiveLog==null){ sparkHiveLog = new SparkHiveLog() } sparkHiveLog } }
該類的主要作用是在onJobStart、onStageSubmitted、onTaskEnd階段儲存任務的資訊。
在org.apache.hive.service.cli.operation.OperationManager類中新增介面:
public RowSet getQueryLog(OperationHandle opHandle){return null;}
在org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager類中新增該介面的實現:為了使getQueryLog方法生效,需要修改org.apache.hive.service.cli.session.HiveSessionImpl類中的fetchResults方法:override def getQueryLog(opHandle: OperationHandle): RowSet = { val operation = handleToOperation.get(opHandle) val groupId = operation.getStatementId val schema = new Schema val fieldSchema = new FieldSchema fieldSchema.setName("operation_log") fieldSchema.setType("string") schema.addToFieldSchemas(fieldSchema) val tableSchema = new TableSchema(schema) val rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion) rowSet.addRow(Array(SparkHiveLog.getSparkHiveLog().getLogInfo(groupId))) rowSet }
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
acquire(true);
try {
if (fetchType == FetchType.QUERY_OUTPUT) {
return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
}
return operationManager.getQueryLog(opHandle);
} finally {
release(true);
}
}
同時重寫org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager類中的closeOperation方法,該方法的主要作用是在statement關閉時清空SparkHiveLog中儲存的任務執行資訊。override def closeOperation(opHandle: OperationHandle): Unit = {
val operation = handleToOperation.get(opHandle)
val groupId = operation.getStatementId
SparkHiveLog.getSparkHiveLog().clearCaches(groupId)
operation.close()
handleToOperation.remove(opHandle)
}
最後需要在org.apache.spark.sql.hive.thriftserver.HiveThriftServer2類中新增SparkHiveLog監聽:
SparkSQLEnv.sparkContext.addSparkListener(SparkHiveLog.getSparkHiveLog)
重新編譯spark原始碼,替換spark-hive-thriftserver_2.11-2.x包,重啟thriftserver,通過HiveStatement執行sql程式碼時,可以呼叫getQueryLog返回執行日誌。