我們都知道,在大資料領域,Hive的出現幫我降低了許多使用Hadoop書寫方式的學習成本.使用使用者可以使用類似Sql的語法規則寫明查詢語句,從hive表資料中查詢目標資料.最為重要的是這些sql語句會最終轉化為map reduce作業進行處理.這也是Hive最強大的地方.可以簡單的理解為Hive就是依託在Hadoop上的1個殼.但是這裡有一點點小小的不同,不是每段hive查詢sql語句與最後生成的job一一對應,如果你的這段sql是一個大sql,他在轉化掉之後,會衍生出許多小job,這些小job是獨立存在執行的,以不同的job名稱進行區別,但是也會保留公共的job名稱.所以一個問題來了,對於超級長的hive sql語句,我想檢視到底是哪段子sql花費了我大量的執行時間,在JobHistory上只有每個子Job的執行時間,沒有子Job對應的sql語句,一旦這個功能有了之後,就會幫助我們迅速的定位到問題所在.



我們發現裡面包含了之前分析過的.jhist檔案,還有帶conf字元的.xml格式檔案,從檔名上來看就是job提交時的一些配置資訊,然後我們用vim命令查閱conf.xml字尾的檔案,看看裡面是不是有我們想要的hive qury string 這樣的屬性


程式工具分析Hive Sql Job


1.hive sql中的中文導致解析出現亂碼


fileSystem = path.getFileSystem(new Configuration());
			in = fileSystem.open(path);
			InputStreamReader isr;
			BufferedReader br;

			isr = new InputStreamReader(in, "UTF-8");
			br = new BufferedReader(isr);

			while ((str = br.readLine()) != null) {




後來處理速度是上去了,但是寫入sql速度過慢,比如說,我有一次測試,開10個執行緒區解析,花了8分鐘就解析好了幾萬個檔案資料,但是插入資料庫花了20分鐘左右,而且量也就幾萬條語句.後來改成了批處理的方式,效果並沒有什麼大的改變,這個慢的問題具體並沒有被解決掉,懷疑可能是因有些語句中存在超長的hive sql語句導致的.


package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.IOUtils;

public class HiveSqlAnalyseTool {
	private int threadNum;
	private String dirType;
	private String jobHistoryPath;

	private FileContext doneDirFc;
	private Path doneDirPrefixPath;

	private LinkedList<FileStatus> fileStatusList;
	private HashMap<String, String[]> dataInfos;
	private DbClient dbClient;

	public HiveSqlAnalyseTool(String dirType, String jobHistoryPath,
			int threadNum) {
		this.threadNum = threadNum;
		this.dirType = dirType;
		this.jobHistoryPath = jobHistoryPath;

		this.dataInfos = new HashMap<String, String[]>();
		this.fileStatusList = new LinkedList<FileStatus>();
		this.dbClient = new DbClient(BaseValues.DB_URL,
				BaseValues.DB_USER_NAME, BaseValues.DB_PASSWORD,

		try {
			doneDirPrefixPath = FileContext.getFileContext(new Configuration())
					.makeQualified(new Path(this.jobHistoryPath));
			doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri());
		} catch (UnsupportedFileSystemException e) {
			// TODO Auto-generated catch block
		} catch (IllegalArgumentException e) {
			// TODO Auto-generated catch block

	public void readJobInfoFiles() {
		List<FileStatus> files;

		files = new ArrayList<FileStatus>();
		try {
			files = scanDirectory(doneDirPrefixPath, doneDirFc, files);
		} catch (IOException e) {
			// TODO Auto-generated catch block

		if (files != null) {
			for (FileStatus fs : files) {
				// parseFileInfo(fs);
			System.out.println("files num is " + files.size());
					.println("fileStatusList size is" + fileStatusList.size());

			ParseThread[] threads;
			threads = new ParseThread[threadNum];
			for (int i = 0; i < threadNum; i++) {
				System.out.println("thread " + i + "start run");
				threads[i] = new ParseThread(this, fileStatusList, dataInfos);

			for (int i = 0; i < threadNum; i++) {
				System.out.println("thread " + i + "join run");
				try {
					if (threads[i] != null) {
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
		} else {
			System.out.println("files is null");


	protected List<FileStatus> scanDirectory(Path path, FileContext fc,
			List<FileStatus> jhStatusList) throws IOException {
		path = fc.makeQualified(path);
		System.out.println("dir path is " + path.getName());
		try {
			RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
			while (fileStatusIter.hasNext()) {
				FileStatus fileStatus = fileStatusIter.next();
				Path filePath = fileStatus.getPath();

				if (fileStatus.isFile()) {
				} else if (fileStatus.isDirectory()) {
					scanDirectory(filePath, fc, jhStatusList);
		} catch (FileNotFoundException fe) {
			System.out.println("Error while scanning directory " + path);

		return jhStatusList;

	private void parseFileInfo(FileStatus fs) {
		String resultStr;
		String str;
		String username;
		String fileType;
		String jobId;
		String jobName;
		String hiveSql;

		int startPos;
		int endPos;
		int hiveSqlFlag;
		long launchTime;
		long finishTime;
		int mapTaskNum;
		int reduceTaskNum;
		String xmlNameFlag;
		String launchTimeFlag;
		String finishTimeFlag;
		String launchMapFlag;
		String launchReduceFlag;

		Path path;
		FileSystem fileSystem;
		InputStream in;

		resultStr = "";
		fileType = "";
		hiveSql = "";
		jobId = "";
		jobName = "";
		username = "";
		hiveSqlFlag = 0;
		launchTime = 0;
		finishTime = 0;
		mapTaskNum = 0;
		reduceTaskNum = 0;
		xmlNameFlag = "<value>";
		launchTimeFlag = "\"launchTime\":";
		finishTimeFlag = "\"finishTime\":";
		launchMapFlag = "\"Launched map tasks\"";
		launchReduceFlag = "\"Launched reduce tasks\"";

		path = fs.getPath();
		str = path.getName();
		if (str.endsWith(".xml")) {
			fileType = "config";

			endPos = str.lastIndexOf("_");
			jobId = str.substring(0, endPos);
		} else if (str.endsWith(".jhist")) {
			fileType = "info";

			endPos = str.indexOf("-");
			jobId = str.substring(0, endPos);
		} else {

		try {
			fileSystem = path.getFileSystem(new Configuration());
			in = fileSystem.open(path);
			InputStreamReader isr;
			BufferedReader br;

			isr = new InputStreamReader(in, "UTF-8");
			br = new BufferedReader(isr);

			while ((str = br.readLine()) != null) {
				if (str.contains("mapreduce.job.user.name")) {
					startPos = str.indexOf(xmlNameFlag);
					endPos = str.indexOf("</value>");
					username = str.substring(startPos + xmlNameFlag.length(),
				} else if (str.contains("mapreduce.job.name")) {
					startPos = str.indexOf(xmlNameFlag);
					endPos = str.indexOf("</value>");
					jobName = str.substring(startPos + xmlNameFlag.length(),
				} else if (str.contains("hive.query.string")) {
					hiveSqlFlag = 1;
					hiveSql = str;
				} else if (hiveSqlFlag == 1) {
					hiveSql += str;

					if (str.contains("</value>")) {
						startPos = hiveSql.indexOf(xmlNameFlag);
						endPos = hiveSql.indexOf("</value>");
						hiveSql = hiveSql.substring(
								startPos + xmlNameFlag.length(), endPos);

						hiveSqlFlag = 0;
				} else if (str.startsWith("{\"type\":\"JOB_INITED\"")) {
					startPos = str.indexOf(launchTimeFlag);
					str = str.substring(startPos + launchTimeFlag.length());
					endPos = str.indexOf(",");
					launchTime = Long.parseLong(str.substring(0, endPos));
				} else if (str.startsWith("{\"type\":\"JOB_FINISHED\"")) {
					mapTaskNum = parseTaskNum(launchMapFlag, str);
					reduceTaskNum = parseTaskNum(launchReduceFlag, str);

					startPos = str.indexOf(finishTimeFlag);
					str = str.substring(startPos + finishTimeFlag.length());
					endPos = str.indexOf(",");
					finishTime = Long.parseLong(str.substring(0, endPos));

			System.out.println("jobId is " + jobId);
			System.out.println("jobName is " + jobName);
			System.out.println("username is " + username);
			System.out.println("map task num is " + mapTaskNum);
			System.out.println("reduce task num is " + reduceTaskNum);
			System.out.println("launchTime is " + launchTime);
			System.out.println("finishTime is " + finishTime);
			System.out.println("hive query sql is " + hiveSql);
		} catch (IOException e) {
			// TODO Auto-generated catch block

		if (fileType.equals("config")) {
			insertConfParseData(jobId, jobName, username, hiveSql);
		} else if (fileType.equals("info")) {
			insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,

	private void insertConfParseData(String jobId, String jobName,
			String username, String sql) {
		String[] array;

		if (dataInfos.containsKey(jobId)) {
			array = dataInfos.get(jobId);
		} else {
			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];

		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;
		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;
		array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;
		array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;

		dataInfos.put(jobId, array);

	private void insertJobInfoParseData(String jobId, long launchTime,
			long finishedTime, int mapTaskNum, int reduceTaskNum) {
		String[] array;

		if (dataInfos.containsKey(jobId)) {
			array = dataInfos.get(jobId);
		} else {
			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];

		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;
		array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String

		dataInfos.put(jobId, array);

	private int parseTaskNum(String flag, String jobStr) {
		int taskNum;
		int startPos;
		int endPos;

		String tmpStr;

		taskNum = 0;
		tmpStr = jobStr;
		startPos = tmpStr.indexOf(flag);

		if (startPos == -1) {
			return 0;

		tmpStr = tmpStr.substring(startPos + flag.length());
		endPos = tmpStr.indexOf("}");
		tmpStr = tmpStr.substring(0, endPos);
		taskNum = Integer.parseInt(tmpStr.split(":")[1]);

		return taskNum;

	private void printStatDatas() {
		String jobId;
		String jobInfo;
		String[] infos;

		if (dbClient != null) {

		if (dataInfos != null) {
			System.out.println("map data size is" + dataInfos.size());
			if (dbClient != null && dirType.equals("dateTimeDir")) {

		/*for (Entry<String, String[]> entry : this.dataInfos.entrySet()) {
			jobId = entry.getKey();
			infos = entry.getValue();

			jobInfo = String
					.format("jobId is %s, jobName:%s, usrname:%s, launchTime:%s, finishTime:%s, mapTaskNum:%s, reduceTaskNum:%s, querySql:%s",
							jobId, infos[1], infos[2], infos[3], infos[4],
							infos[5], infos[6], infos[7]);
			// System.out.println("job detail info " + jobInfo);

			if (dbClient != null && dirType.equals("dateTimeDir")) {

		if (dbClient != null) {

	public synchronized FileStatus getOneFile() {
		FileStatus fs;

		fs = null;
		if (fileStatusList != null & fileStatusList.size() > 0) {
			fs = fileStatusList.poll();

		return fs;

	public synchronized void addDataToMap(String jobId, String[] values) {
		if (dataInfos != null) {
			dataInfos.put(jobId, values);

package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.LinkedList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ParseThread extends Thread{
	private HiveSqlAnalyseTool tool;
	private LinkedList<FileStatus> fileStatus;
	private HashMap<String, String[]> dataInfos;
	public ParseThread(HiveSqlAnalyseTool tool, LinkedList<FileStatus> fileStatus, HashMap<String, String[]> dataInfos){
		this.tool = tool;
		this.fileStatus = fileStatus;
		this.dataInfos = dataInfos;
	public void run() {
		FileStatus fs;
		while(fileStatus != null && !fileStatus.isEmpty()){
			fs = tool.getOneFile();

	private void parseFileInfo(FileStatus fs) {
		String str;
		String username;
		String fileType;
		String jobId;
		String jobName;
		String hiveSql;

		int startPos;
		int endPos;
		int hiveSqlFlag;
		long launchTime;
		long finishTime;
		int mapTaskNum;
		int reduceTaskNum;
		String xmlNameFlag;
		String launchTimeFlag;
		String finishTimeFlag;
		String launchMapFlag;
		String launchReduceFlag;

		Path path;
		FileSystem fileSystem;
		InputStream in;

		fileType = "";
		hiveSql = "";
		jobId = "";
		jobName = "";
		username = "";
		hiveSqlFlag = 0;
		launchTime = 0;
		finishTime = 0;
		mapTaskNum = 0;
		reduceTaskNum = 0;
		xmlNameFlag = "<value>";
		launchTimeFlag = "\"launchTime\":";
		finishTimeFlag = "\"finishTime\":";
		launchMapFlag = "\"Launched map tasks\"";
		launchReduceFlag = "\"Launched reduce tasks\"";

		path = fs.getPath();
		str = path.getName();
		if (str.endsWith(".xml")) {
			fileType = "config";

			endPos = str.lastIndexOf("_");
			jobId = str.substring(0, endPos);
		} else if (str.endsWith(".jhist")) {
			fileType = "info";

			endPos = str.indexOf("-");
			jobId = str.substring(0, endPos);

		try {
			fileSystem = path.getFileSystem(new Configuration());
			in = fileSystem.open(path);
			InputStreamReader isr;
			BufferedReader br;

			isr = new InputStreamReader(in, "UTF-8");
			br = new BufferedReader(isr);

			while ((str = br.readLine()) != null) {
				if (str.contains("mapreduce.job.user.name")) {
					startPos = str.indexOf(xmlNameFlag);
					endPos = str.indexOf("</value>");
					username = str.substring(startPos + xmlNameFlag.length(),
				} else if (str.contains("mapreduce.job.name")) {
					startPos = str.indexOf(xmlNameFlag);
					endPos = str.indexOf("</value>");
					jobName = str.substring(startPos + xmlNameFlag.length(),
				} else if (str.contains("hive.query.string")) {
					hiveSqlFlag = 1;

					hiveSql = str;
				} else if (hiveSqlFlag == 1) {
					hiveSql += str;

					if (str.contains("</value>")) {
						startPos = hiveSql.indexOf(xmlNameFlag);
						endPos = hiveSql.indexOf("</value>");
						hiveSql = hiveSql.substring(
								startPos + xmlNameFlag.length(), endPos);

						hiveSqlFlag = 0;
				} else if (str.startsWith("{\"type\":\"JOB_INITED\"")) {
					startPos = str.indexOf(launchTimeFlag);
					str = str.substring(startPos + launchTimeFlag.length());
					endPos = str.indexOf(",");
					launchTime = Long.parseLong(str.substring(0, endPos));
				} else if (str.startsWith("{\"type\":\"JOB_FINISHED\"")) {
					mapTaskNum = parseTaskNum(launchMapFlag, str);
					reduceTaskNum = parseTaskNum(launchReduceFlag, str);

					startPos = str.indexOf(finishTimeFlag);
					str = str.substring(startPos + finishTimeFlag.length());
					endPos = str.indexOf(",");
					finishTime = Long.parseLong(str.substring(0, endPos));

			/*System.out.println("jobId is " + jobId);
			System.out.println("jobName is " + jobName);
			System.out.println("username is " + username);
			System.out.println("map task num is " + mapTaskNum);
			System.out.println("reduce task num is " + reduceTaskNum);
			System.out.println("launchTime is " + launchTime);
			System.out.println("finishTime is " + finishTime);
			System.out.println("hive query sql is " + hiveSql);*/
		} catch (IOException e) {
			// TODO Auto-generated catch block

		if (fileType.equals("config")) {
			insertConfParseData(jobId, jobName, username, hiveSql);
		} else if (fileType.equals("info")) {
			insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,

	private void insertConfParseData(String jobId, String jobName,
			String username, String sql) {
		String[] array;

		if (dataInfos.containsKey(jobId)) {
			array = dataInfos.get(jobId);
		} else {
			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];

		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;
		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;
		array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;
		array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;

		tool.addDataToMap(jobId, array);

	private void insertJobInfoParseData(String jobId, long launchTime,
			long finishedTime, int mapTaskNum, int reduceTaskNum) {
		String[] array;

		if (dataInfos.containsKey(jobId)) {
			array = dataInfos.get(jobId);
		} else {
			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];

		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;
		array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String
		array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String

		tool.addDataToMap(jobId, array);

	private int parseTaskNum(String flag, String jobStr) {
		int taskNum;
		int startPos;
		int endPos;

		String tmpStr;

		taskNum = 0;
		tmpStr = jobStr;
		startPos = tmpStr.indexOf(flag);
		if(startPos == -1){
			return 0;
		tmpStr = tmpStr.substring(startPos + flag.length());
		endPos = tmpStr.indexOf("}");
		tmpStr = tmpStr.substring(0, endPos);
		taskNum = Integer.parseInt(tmpStr.split(":")[1]);

		return taskNum;


一 新建JAVA專案 並新增 hive-exec-2.1.0.jar 和hadoop-common-2.7.3.jar hive-exec-2.1.0.jar 在HIVE安裝目錄的lib目錄下 hadoop-common-2.7.3.jar在hadoop