1. 程式人生 > >OpenTSDB原始碼分析之TSDB表操作(查詢)

OpenTSDB原始碼分析之TSDB表操作(查詢)

TSDB表的行鍵比tsdb-uid表複雜,查詢也要複雜一些,這種複雜性體現在欄位過濾器,在該過濾器中監控指標、日期範圍和標籤都要考慮到。

       所有的過濾器都通過TsdbQuery.run()方法實現,建立帶過濾器的掃描器,遍歷返回的行和收集資料供顯示使用,使用輔助方法TsdbQuery.getScanner()和TsdbQuery.findSpans()。

       Tsdb表的查詢可以具體檢視CliQuery.main()方法。

public static void main(String[] args) throws Exception {
		/**
		 * 將查詢結果列印輸出
		 */
		// args = new String[] { "2013/09/08-12:57", "2013/10/20-16:57", "sum",
		// "t22", "host=foo"};

		/**
		 * 查詢輸出寫入到gnuplot載入檔案中 w l lt 3 lw 2(with line linetype 3 linewidth 2)為plot繪圖引數不可缺少
		 */
		args = new String[] { "--graph=e:/a", "2013/09/08-12:57", "2013/10/20-16:57", /*"+format x=\"%a %d %H:%M\"",*/ "sum", "t22", "host=foo", "w l lt 3 lw 2" };

		ArgP argp = new ArgP();
		CliOptions.addCommon(argp);
		CliOptions.addVerbose(argp);
		argp.addOption("--graph", "BASEPATH", "Output data points to a set of files for gnuplot." + "  The path of the output files will start with"
				+ " BASEPATH.");
		args = CliOptions.parse(argp, args);
		if (args == null) {
			usage(argp, "Invalid usage.", 1);
		} else if (args.length < 3) {
			usage(argp, "Not enough arguments.", 2);
		}

		// get a config object
		Config config = CliOptions.getConfig(argp);

		final TSDB tsdb = new TSDB(config);
		tsdb.checkNecessaryTablesExist().joinUninterruptibly();
		final String basepath = argp.get("--graph");
		argp = null;

		Plot plot = null;
		try {
			plot = doQuery(tsdb, args, basepath != null);
		} finally {
			try {
				tsdb.shutdown().joinUninterruptibly();
			} catch (Exception e) {
				LOG.error("Unexpected exception", e);
				System.exit(1);
			}
		}

		if (plot != null) {
			try {
				/**
				 * 將Plot寫gnuplot載入檔案 修改圖片樣式可以在Plot中實現
				 */
				final int npoints = plot.dumpToFiles(basepath);
				LOG.info("Wrote " + npoints + " for Gnuplot");
			} catch (IOException e) {
				LOG.error("Failed to write the Gnuplot file under " + basepath, e);
				System.exit(1);
			}
		}
	}

	private static Plot doQuery(final TSDB tsdb, final String args[], final boolean want_plot) {
		final ArrayList<String> plotparams = new ArrayList<String>();
		final ArrayList<Query> queries = new ArrayList<Query>();
		final ArrayList<String> plotoptions = new ArrayList<String>();

		/**
		 * 給queries賦值(TsdbQuery) 
		 * 給plotparams、plotoptions初始化
		 * 
		 * 生成gnuplot載入檔案內容示例如下:
set term png small size 1024,768
set xdata time
set timefmt "%s"
if (GPVAL_VERSION < 4.6) set xtics rotate; else set xtics rotate right
set output "e:/a.png"
set xrange ["1378645020":"1382288220"]
set grid
set style data linespoints
set key right box
set format x "%a %d %H:%M"
plot  "e:/a_0.dat" using 1:2 title "t22{host=foo}" w l lt 3 lw 2

plotparams用於傳引數行set format x "%a %d %H:%M"
plotoptions用於plot引數如:w l lt 3 lw 2
		 */
		parseCommandLineQuery(args, tsdb, queries, plotparams, plotoptions);

		if (queries.isEmpty()) {
			usage(null, "Not enough arguments, need at least one query.", 2);
		}

		final Plot plot = (want_plot ? new Plot(queries.get(0).getStartTime(), queries.get(0).getEndTime()) : null);
		if (want_plot) {
			plot.setParams(parsePlotParams(plotparams));
		}
		final int nqueries = queries.size();
		for (int i = 0; i < nqueries; i++) {
			// TO DO(tsuna): Optimization: run each query in
			// parallel.最好是並行執行每個查詢
			final StringBuilder buf = want_plot ? null : new StringBuilder();
			/**
			 * 查詢都在queries.get(i).run()中完成
			 */
			for (final DataPoints datapoints : queries.get(i).run()) {
				/**
				 * 以gnuplot的載入檔案輸出
				 */
				if (want_plot) {
					plot.add(datapoints, plotoptions.get(i));
					/**
					 * 列印輸出
					 */
				} else {
					final String metric = datapoints.metricName();
					final String tagz = datapoints.getTags().toString();
					for (final DataPoint datapoint : datapoints) {
						buf.append(metric).append(' ').append(datapoint.timestamp()).append(' ');
						if (datapoint.isInteger()) {
							buf.append(datapoint.longValue());
						} else {
							buf.append(String.format("%f", datapoint.doubleValue()));
						}
						buf.append(' ').append(tagz).append('\n');
						System.out.print(buf);
						buf.delete(0, buf.length());
					}
				}
			}
		}
		return plot;
	}

static void parseCommandLineQuery(final String[] args, final TSDB tsdb, final ArrayList<Query> queries, final ArrayList<String> plotparams,
			final ArrayList<String> plotoptions) {

		/**
		 * 第一個引數為start_ts
		 */
		long start_ts = DateTime.parseDateTimeString(args[0], null);
		if (start_ts >= 0)
			start_ts /= 1000;
		long end_ts = -1;
		
		/**
		 * 把滿足條件的第二個引數設定為end_ts
		 */
		if (args.length > 3) {
			// see if we can detect(發現) an end time
			try {
				if (args[1].charAt(0) != '+'
						&& (args[1].indexOf(':') >= 0 || args[1].indexOf('/') >= 0 || args[1].indexOf('-') >= 0 || Long.parseLong(args[1]) > 0)) {
					end_ts = DateTime.parseDateTimeString(args[1], null);
				}
			} catch (NumberFormatException nfe) {
				// ignore it as it means the third parameter is likely the
				// aggregator
			}
		}
		
		// temp fixup to seconds from ms until the rest of TSDB supports ms
		// Note you can't append this to the DateTime.parseDateTimeString() call
		// as
		// it clobbers -1 results
		if (end_ts >= 0)
			end_ts /= 1000;

		/**
		 * 獲取plotparams
		 */
		int i = end_ts < 0 ? 1 : 2;
		while (i < args.length && args[i].charAt(0) == '+') {
			if (plotparams != null) {
				plotparams.add(args[i]);
			}
			i++;
		}

		
		while (i < args.length) {
			final Aggregator agg = Aggregators.get(args[i++]);
			final boolean rate = args[i].equals("rate");
			RateOptions rate_options = new RateOptions(false, Long.MAX_VALUE, RateOptions.DEFAULT_RESET_VALUE);
			if (rate) {
				i++;

				long counterMax = Long.MAX_VALUE;
				long resetValue = RateOptions.DEFAULT_RESET_VALUE;
				if (args[i].startsWith("counter")) {
					String[] parts = Tags.splitString(args[i], ',');
					if (parts.length >= 2 && parts[1].length() > 0) {
						counterMax = Long.parseLong(parts[1]);
					}
					if (parts.length >= 3 && parts[2].length() > 0) {
						resetValue = Long.parseLong(parts[2]);
					}
					rate_options = new RateOptions(true, counterMax, resetValue);
					i++;
				}
			}
			final boolean downsample = args[i].equals("downsample");
			if (downsample) {
				i++;
			}
			final int interval = downsample ? Integer.parseInt(args[i++]) : 0;
			final Aggregator sampler = downsample ? Aggregators.get(args[i++]) : null;
			final String metric = args[i++];
			final HashMap<String, String> tags = new HashMap<String, String>();
			while (i < args.length && args[i].indexOf(' ', 1) < 0 && args[i].indexOf('=', 1) > 0) {
				Tags.parse(tags, args[i++]);
			}
			if (i < args.length && args[i].indexOf(' ', 1) > 0) {
				plotoptions.add(args[i++]);
			}
			final Query query = tsdb.newQuery();
			query.setStartTime(start_ts);
			if (end_ts > 0) {
				query.setEndTime(end_ts);
			}
			query.setTimeSeries(metric, tags, agg, rate, rate_options);
			if (downsample) {
				query.downsample(interval, sampler);
			}
			queries.add(query);
		}
	}