1. 程式人生 > >OpenTSDB原始碼分析之TSDB-UID表操作(新增)

OpenTSDB原始碼分析之TSDB-UID表操作(新增)

為了方便研究將tsdb-uid表的新增操作抽取出來,程式碼如下:
package net.opentsdb.tools;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import net.opentsdb.core.TSDB;
import net.opentsdb.meta.UIDMeta;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;

import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Bytes;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.HBaseException;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

public class TestTsdbuidAdd {

	private static final Logger LOG = LoggerFactory.getLogger(TestTsdbuidAdd.class);

	private static final Charset CHARSET = Charset.forName("ISO-8859-1");
	private static final byte[] ID_FAMILY = toBytes("id");
	private static final byte[] NAME_FAMILY = toBytes("name");
	private static final byte[] MAXID_ROW = { 0 };
	private static final short MAX_ATTEMPTS_ASSIGN_ID = 3;

	private final HBaseClient client;
	private final byte[] table;
	private final byte[] kind;//列修飾符
	private final UniqueIdType type;
	private final short id_width;

	private final ConcurrentHashMap<String, byte[]> name_cache = new ConcurrentHashMap<String, byte[]>();
	private final ConcurrentHashMap<String, String> id_cache = new ConcurrentHashMap<String, String>();
	//pending_assignments pending即將發生的 assignments工作、任務
	private final ConcurrentHashMap<String, Deferred<byte[]>> pending_assignments = new ConcurrentHashMap<String, Deferred<byte[]>>();

	private volatile int cache_hits;
	private volatile int cache_misses;
	private TSDB tsdb;

	public TestTsdbuidAdd(final HBaseClient client, final byte[] table, final String kind, final int width) {
		this.client = client;
		this.table = table;
		if (kind.isEmpty()) {
			throw new IllegalArgumentException("Empty string as 'kind' argument!");
		}
		
		this.kind = toBytes(kind);
		type = stringToUniqueIdType(kind);
		//LOG.debug("---type=" + type + "---");
		
		if (width < 1 || width > 8) {
			throw new IllegalArgumentException("Invalid width: " + width);
		}
		this.id_width = (short) width;
	}

	public static UniqueIdType stringToUniqueIdType(final String type) {
		if (type.toLowerCase().equals("metric") || type.toLowerCase().equals("metrics")) {
			return UniqueIdType.METRIC;
		} else if (type.toLowerCase().equals("tagk")) {
			return UniqueIdType.TAGK;
		} else if (type.toLowerCase().equals("tagv")) {
			return UniqueIdType.TAGV;
		} else {
			throw new IllegalArgumentException("Invalid type requested: " + type);
		}
	}

	static void usage(final ArgP argp, final String errmsg) {
		System.err.println(errmsg);
		System.err.println("Usage: uid <subcommand> args\n" + "Sub commands:\n" + "  grep [kind] <RE>: Finds matching IDs.\n"
				+ "  assign <kind> <name> [names]:" + " Assign an ID for the given name(s).\n"
				+ "  rename <kind> <name> <newname>: Renames this UID.\n" + "  fsck: Checks the consistency of UIDs.\n"
				+ "  [kind] <name>: Lookup the ID of this name.\n" + "  [kind] <ID>: Lookup the name of this ID.\n"
				+ "  metasync: Generates missing TSUID and UID meta entries, updates\n" 
				+ "            created timestamps\n"
				+ "  metapurge: Removes meta data entries from the UID table\n"
				+ "  treesync: Process all timeseries meta objects through tree rules\n"
				+ "  treepurge <id> [definition]: Purge a tree and/or the branches\n"
				+ "            from storage. Provide an integer Tree ID and optionally\n"
				+ "            add \"true\" to delete the tree definition\n\n" 
				+ "Example values for [kind]:"
				+ " metric, tagk (tag name), tagv (tag value).");
		if (argp != null) {
			System.err.print(argp.usage());
		}
	}
	
	public static void main(String[] args) throws Exception {
		args = new String[] { "assign", "metrics", "m15" };

		ArgP argp = new ArgP();
		CliOptions.addCommon(argp);
		CliOptions.addVerbose(argp);
		argp.addOption("--idwidth", "N", "Number of bytes on which the UniqueId is encoded.");
		argp.addOption("--ignore-case", "Ignore case distinctions when matching a regexp.");
		argp.addOption("-i", "Short for --ignore-case.");
		args = CliOptions.parse(argp, args);
		
		
		if (args == null) {
			usage(argp, "Invalid usage");
			System.exit(2);
		} else if (args.length < 1) {
			usage(argp, "Not enough arguments");
			System.exit(2);
		}

		
		final byte[] table = argp.get("--uidtable", "tsdb-uid").getBytes();
		final short idwidth = (argp.has("--idwidth") ? Short.parseShort(argp.get("--idwidth")) : 3);
		if (idwidth <= 0) {
			usage(argp, "Negative or 0 --idwidth");
			System.exit(3);
		}
		final boolean ignorecase = argp.has("--ignore-case") || argp.has("-i");
		

		Config config = CliOptions.getConfig(argp);

		
		final TSDB tsdb = new TSDB(config);

		tsdb.getClient().ensureTableExists(config.getString("tsd.storage.hbase.uid_table")).joinUninterruptibly();
		argp = null;
		int rc;
		try {
			rc = runCommand(tsdb, table, idwidth, ignorecase, args);
		} finally {
			try {
				tsdb.getClient().shutdown().joinUninterruptibly();
				LOG.info("Gracefully shutdown the TSD");
			} catch (Exception e) {
				LOG.error("Unexpected exception while shutting down", e);
				rc = 42;
			}
		}
		System.exit(rc);
	}
	
	

	private static int runCommand(final TSDB tsdb, final byte[] table, final short idwidth, final boolean ignorecase, final String[] args) {
		/**
		 * 新增 assign metrics t1
		 */
		if (args[0].equals("assign")) {
			assign(tsdb.getClient(), table, idwidth, args);
		}

		/**
		 * 查詢 grep t grep metrics t11
		 */
		if (args[0].equals("grep")) {
		}

		/**
		 * 修改
		 */
		if (args[0].equals("rename")) {

		}

		/**
		 * 刪除
		 */
		if (args[0].equals("delete")) {

		}
		return 0;
	}

	private static int assign(final HBaseClient client, final byte[] table, final short idwidth, final String[] args) {
		final TestTsdbuidAdd uid = new TestTsdbuidAdd(client, table, args[1], (int) idwidth);

		for (int i = 2; i < args.length; i++) {
			try {
				uid.getOrCreateId(args[i]);
				
				extactLookupName(uid, args[i]);
			} catch (HBaseException e) {
				LOG.error("error while processing " + args[i], e);
				return 3;
			}
		}
		return 0;
	}
	
	/*
	private static int extactLookupName(final HBaseClient client, final byte[] table, final short idwidth, final String kind, final String name) {
		final TestTsdbuidAdd uid = new TestTsdbuidAdd(client, table, kind, (int) idwidth);
		try {
			final byte[] id = uid.getId(name);
			System.out.println(kind + ' ' + name + ": " + Arrays.toString(id));
			return 0;
		} catch (NoSuchUniqueName e) {
			LOG.error(e.getMessage());
			return 1;
		}
	}
	*/
	
	private static int extactLookupName(TestTsdbuidAdd uid, String name) {
		try {
			final byte[] id = uid.getId(name);
			System.out.println(uid.kind() + ' ' + name + ": " + Arrays.toString(id));
			return 0;
		} catch (NoSuchUniqueName e) {
			LOG.error(e.getMessage());
			return 1;
		}
	}
	
	public byte[] getId(final String name) throws NoSuchUniqueName, HBaseException {
		try {
			return getIdAsync(name).joinUninterruptibly();
		} catch (RuntimeException e) {
			throw e;
		} catch (Exception e) {
			throw new RuntimeException("Should never be here", e);
		}
	}

	public byte[] getOrCreateId(final String name) throws HBaseException {
		try {
			/**
			 * 建立一個Deferred並等待其返回
			 */
			return getOrCreateIdAsync(name).joinUninterruptibly();
		} catch (RuntimeException e) {
			throw e;
		} catch (Exception e) {
			throw new RuntimeException("Should never be here", e);
		}
	}

	public Deferred<byte[]> getOrCreateIdAsync(final String name) {
		
		class HandleNoSuchUniqueNameCB implements Callback<Object, Exception> {

			public Object call(final Exception e) {
				LOG.debug("---處理1--失敗處理---對從hbase中查詢丟擲異常的處理:---");
				if (e instanceof NoSuchUniqueName) {

					Deferred<byte[]> assignment = null;
					//pending_assignments主要用於一個避免多執行緒衝突,比如兩個人同時新建衝突
					synchronized (pending_assignments) {
						assignment = pending_assignments.get(name);
						if (assignment == null) {
							// to prevent(預防) UID leaks(洩漏) that can be caused when multiple time
							// series for the same metric or tags arrive, we need to write a
							// deferred to the pending map as quickly as possible. 
							// Then we can start the assignment process after we've stashed(貯藏)
							// the deferred and released the lock
							assignment = new Deferred<byte[]>();
							pending_assignments.put(name, assignment);
						} else {
							LOG.info("Already waiting for UID assignment: " + name);
							return assignment;
						}
					}

					// start the assignment dance after stashing(貯藏) the deferred
					LOG.debug("---hbase資料庫中不存在進行新增-----UniqueIdAllocator(name, assignment).tryAllocate()--------");
					return new UniqueIdAllocator(name, assignment).tryAllocate();
				}

				// Other unexpected exception, let it bubble up.
				System.out.println("Caught an exception here");
				return e;
			}
		}

		// Kick off(開始) the HBase lookup(查詢), and if we don't find it there
		// either,start the process to allocate a UID.
		LOG.debug("---查詢1--失敗處理---通過getIdAsync(name)查詢標籤是否存在---------");
		return getIdAsync(name).addErrback(new HandleNoSuchUniqueNameCB());
	}

	
	public Deferred<byte[]> getIdAsync(final String name) {

		LOG.debug("--先從緩衝區中找---");
		/**
		 * 先從緩衝區中找 找到就返回
		 */
		final byte[] id = getIdFromCache(name);
		if (id != null) {
			cache_hits++;
			return Deferred.fromResult(id);
		}
		cache_misses++;

		class GetIdCB implements Callback<byte[], byte[]> {
			public byte[] call(final byte[] id) {
				LOG.debug("---處理2---對從資料庫HBase中查詢結果進行處理:沒找到丟擲NoSuchUniqueName異常,找到將id,name放入快取");
				if (id == null) {
					throw new NoSuchUniqueName(kind(), name);
				}
				if (id.length != id_width) {
					throw new IllegalStateException("Found id.length = " + id.length + " which is != " + id_width + " required for '" + kind() + '\'');
				}
				addIdToCache(name, id);
				addNameToCache(id, name);
				return id;
			}
		}

		/**
		 * 從資料庫HBase中查詢 
		 * 沒有找到就丟擲NoSuchUniqueName異常 
		 * 找到的話就把(name,id)放入緩衝區並返回id
		 */
		LOG.debug("---查詢2---利用name從資料庫HBase中查詢 ");
		Deferred<byte[]> d = getIdFromHBase(name).addCallback(new GetIdCB());
		return d;
	}

	private Deferred<byte[]> getIdFromHBase(final String name) {
		return hbaseGet(toBytes(name), ID_FAMILY);
	}
	
	private Deferred<byte[]> hbaseGet(final byte[] key, final byte[] family) {

		final GetRequest get = new GetRequest(table, key);
		// kind為列簇修飾符metrics
		get.family(family).qualifier(kind);

		class GetCB implements Callback<byte[], ArrayList<KeyValue>> {
			public byte[] call(final ArrayList<KeyValue> row) {
				LOG.debug("---處理3---返回在hbase表中查詢指定name的id的結果");
				if (row == null || row.isEmpty()) {
					return null;
				}
				return row.get(0).value();
			}
		}

		LOG.debug("---查詢3---先利用HBaseClient.get(get)來在HBase中的tsdb-uid表中通過key+family+qualifier查詢id");
		return client.get(get).addCallback(new GetCB());
	}

	private void addIdToCache(final String name, final byte[] id) {
		byte[] found = name_cache.get(name);
		if (found == null) {
			found = name_cache.putIfAbsent(name,
					// Must make a defensive copy to be immune
					// to any changes the caller may do on the
					// array later on.
					Arrays.copyOf(id, id.length));
		}
		if (found != null && !Arrays.equals(found, id)) {
			throw new IllegalStateException("name=" + name + " => id=" + Arrays.toString(id) + ", already mapped to " + Arrays.toString(found));
		}
	}

	private void addNameToCache(final byte[] id, final String name) {
		final String key = fromBytes(id);
		String found = id_cache.get(key);
		if (found == null) {
			found = id_cache.putIfAbsent(key, name);
		}
		if (found != null && !found.equals(name)) {
			throw new IllegalStateException("id=" + Arrays.toString(id) + " => name=" + name + ", already mapped to " + found);
		}
	}


	private byte[] getIdFromCache(final String name) {
		
		for(Entry<String, byte[]> en : name_cache.entrySet()) {
			System.out.println(en.getKey() + ":" + en.getValue().toString());
		}
		
		
		return name_cache.get(name);
	}

	private static byte[] toBytes(final String s) {
		return s.getBytes(CHARSET);
	}

	private static String fromBytes(final byte[] b) {
		return new String(b, CHARSET);
	}

	private final class UniqueIdAllocator implements Callback<Object, Object> {
		private final String name; // What we're trying to allocate an ID for.
		private final Deferred<byte[]> assignment; // deferred to call back
		private short attempt = MAX_ATTEMPTS_ASSIGN_ID; // Give up when zero.

		private HBaseException hbe = null; // Last exception caught.

		private long id = -1; // The ID we'll grab with an atomic increment.
		private byte row[]; // The same ID, as a byte array.

		private static final byte ALLOCATE_UID = 0;
		private static final byte CREATE_REVERSE_MAPPING = 1;
		private static final byte CREATE_FORWARD_MAPPING = 2;
		private static final byte DONE = 3;
		private byte state = ALLOCATE_UID; // Current state of the process.

		UniqueIdAllocator(final String name, final Deferred<byte[]> assignment) {
			this.name = name;
			this.assignment = assignment;
		}

		Deferred<byte[]> tryAllocate() {
			attempt--;
			state = ALLOCATE_UID;

			LOG.debug("---在此處呼叫UniqueIdAllocator.call方法---");
			call(null);
			return assignment;
		}

		@SuppressWarnings("unchecked")
		public Object call(final Object arg) {

			LOG.debug("--對UniqueIdAllocator物件的回撥 ----");
			if (attempt == 0) {
				if (hbe == null) {
					throw new IllegalStateException("Should never happen!");
				}
				LOG.error("Failed to assign an ID for kind='" + kind() + "' name='" + name + "'", hbe);
				throw hbe;
			}

			if (arg instanceof Exception) {
				final String msg = ("Failed attempt #" + (MAX_ATTEMPTS_ASSIGN_ID - attempt) + " to assign an UID for " + kind() + ':' + name
						+ " at step #" + state);
				if (arg instanceof HBaseException) {
					LOG.error(msg, (Exception) arg);
					hbe = (HBaseException) arg;
					return tryAllocate(); // Retry from the beginning.
				} else {
					LOG.error("WTF?  Unexpected exception!  " + msg, (Exception) arg);
					return arg; // Unexpected exception, let it bubble up.
				}
			}

			class ErrBack implements Callback<Object, Exception> {
				public Object call(final Exception e) throws Exception {
					assignment.callback(e);
					return assignment;
				}
			}

			
			final Deferred d;
			LOG.debug("---UniqueId分配狀態state=" + state);
			switch (state) {
			case ALLOCATE_UID:
				LOG.debug("---計數器自增---");
				d = allocateUid();
				break;

			case CREATE_REVERSE_MAPPING:
				LOG.debug("---儲存id/name的行---");
				d = createReverseMapping(arg);
				break;

			case CREATE_FORWARD_MAPPING:
				LOG.debug("---儲存name/id的行---");
				d = createForwardMapping(arg);
				break;

			case DONE:
				return done(arg);
			default:
				throw new AssertionError("Should never be here!");
			}

			/**
			 * addBoth()方法作用:Registers a callback both as a callback and as an "errback". 
			 * 
			 * 對進行的操作增加對此類的call()回撥(即在執行完call方法後再執行call方法相當於遞迴,只到state=DONE)
			 */
			LOG.debug("---增加對此類UniqueIdAllocator的回撥    如果回撥返回異常對ErrBack進行回撥");
			return d.addBoth(this).addErrback(new ErrBack());
		}

		private Deferred<Long> allocateUid() {
			LOG.info("---計數器自增--- 更新UniqueIdAllocator.state=CREATE_REVERSE_MAPPING");
			LOG.info("Creating an ID for kind='" + kind() + "' name='" + name + '\'');
			state = CREATE_REVERSE_MAPPING;
			return client.atomicIncrement(new AtomicIncrementRequest(table, MAXID_ROW, ID_FAMILY, kind));
		}

		/**
		 * Create the reverse mapping. We do this before the forward one so that
		 * if we die before creating the forward mapping we don't run the risk
		 * of "publishing" a partially assigned ID. The reverse mapping on its
		 * own is harmless but the forward mapping without reverse mapping is
		 * bad as it would point to an ID that cannot be resolved.
		 * 意思是:先儲存id/name的行,再儲存name/id的行,這樣的好處是在name/id的行儲存失敗時沒有什麼影響
		 */
		private Deferred<Boolean> createReverseMapping(final Object arg) {

			LOG.debug("---先儲存id/name的行,更新UniqueIdAllocator.state=CREATE_FORWARD_MAPPING");
			/**
			 * 此方法在id自增之後呼叫,因此此處arg必為Long型
			 */
			if (!(arg instanceof Long)) {
				throw new IllegalStateException("Expected a Long but got " + arg);
			}
			
			id = (Long) arg;
			if (id <= 0) {
				throw new IllegalStateException("Got a negative ID from HBase: " + id);
			}
			LOG.info("Got ID=" + id + " for kind='" + kind() + "' name='" + name + "'");
			row = Bytes.fromLong(id);
			System.out.println("----------show row start-----------");
			for(byte b : row) {
				System.out.println(b);
			}
			System.out.println("----------show row end-------------");
			// row.length should actually(實際上) be 8.
			if (row.length < id_width) {
				throw new IllegalStateException("OMG, row.length = " + row.length + " which is less than " + id_width + " for id=" + id + " row="
						+ Arrays.toString(row));
			}
			
			// Verify that we're going to drop bytes that are 0.
			for (int i = 0; i < row.length - id_width; i++) {
				if (row[i] != 0) {
					final String message = "All Unique IDs for " + kind() + " on " + id_width + " bytes are already assigned!";
					LOG.error("OMG " + message);
					throw new IllegalStateException(message);
				}
			}
			// Shrink the ID on the requested number of bytes.
			row = Arrays.copyOfRange(row, row.length - id_width, row.length);
			System.out.println("----------show row start-----------");
			for(byte b : row) {
				System.out.println(b);
			}
			System.out.println("----------show row end-------------");
			
			state = CREATE_FORWARD_MAPPING;
			// We are CAS'ing(compareAndSet) the KV into existence(存在) -- the second argument is
			// how we tell HBase we want to atomically(原子的) create the KV, so that if
			// there is already a KV in this cell, we'll fail. Technically we could do
			// just a `put' here, as we have a freshly(剛剛) allocated UID, so there
			// is not reason why a KV should already exist for this UID, but just
			// to err on the safe side and catch really weird corruption cases, we
			// do a CAS instead to create the KV.
			return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY);
		}

		private PutRequest reverseMapping() {
			return new PutRequest(table, row, NAME_FAMILY, kind, toBytes(name));
		}

		private Deferred<?> createForwardMapping(final Object arg) {
			LOG.debug("---再儲存name/id的行,更新UniqueIdAllocator.state=DONE");
			if (!(arg instanceof Boolean)) {
				throw new IllegalStateException("Expected a Boolean but got " + arg);
			}
			if (!((Boolean) arg)) { // Previous CAS failed. Something is really // messed up.
				LOG.error("WTF!  Failed to CAS reverse mapping: " + reverseMapping() + " -- run an fsck against the UID table!");
				return tryAllocate(); // Try again from the beginning.
			}

			state = DONE;
			return client.compareAndSet(forwardMapping(), HBaseClient.EMPTY_ARRAY);
		}

		private PutRequest forwardMapping() {
			return new PutRequest(table, toBytes(name), ID_FAMILY, kind, row);
		}

		private Deferred<byte[]> done(final Object arg) {
			if (!(arg instanceof Boolean)) {
				throw new IllegalStateException("Expected a Boolean but got " + arg);
			}
			if (!((Boolean) arg)) { // Previous CAS failed. We lost a race.
				LOG.warn("Race condition: tried to assign ID " + id + " to " + kind() + ":" + name + ", but CAS failed on " + forwardMapping()
						+ ", which indicates this UID must have" + " been allocated concurrently by another TSD. So ID " + id + " was leaked.");
				// If two TSDs attempted to allocate a UID for the same name at
				// the
				// same time, they would both have allocated a UID, and created
				// a
				// reverse mapping, and upon getting here, only one of them
				// would
				// manage to CAS this KV into existence. The one that loses the
				// race will retry and discover the UID assigned by the winner
				// TSD,
				// and a UID will have been wasted in the process. No big deal.

				class GetIdCB implements Callback<Deferred<byte[]>, byte[]> {
					public Deferred<byte[]> call(final byte[] row) throws Exception {
						assignment.callback(row);
						return assignment;
					}
				}
				return getIdAsync(name).addCallbackDeferring(new GetIdCB());
			}

			cacheMapping(name, row);

			if (tsdb != null && tsdb.getConfig().enable_realtime_uid()) {
				final UIDMeta meta = new UIDMeta(type, row, name);
				meta.storeNew(tsdb);
				LOG.info("Wrote UIDMeta for: " + name);
				tsdb.indexUIDMeta(meta);
			}

			pending_assignments.remove(name);
			assignment.callback(row);
			return assignment;
		}

	}

	/**
	 * Adds the bidirectional(雙向的) mapping in the cache.
	 */
	private void cacheMapping(final String name, final byte[] id) {
		addIdToCache(name, id);
		addNameToCache(id, name);
	}
	
	public String kind() {
		return fromBytes(kind);
	}
}

執行流程日誌:
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getOrCreateIdAsync(TestTsdbuidAdd.java:289): ---查詢1--失敗處理---通過getIdAsync(name)查詢標籤是否存在---------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:296): --先從緩衝區中找---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:327): ---查詢2---利用name從資料庫HBase中查詢 
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.hbaseGet(TestTsdbuidAdd.java:352): ---查詢3---先利用HBaseClient.get(get)來在HBase中的tsdb-uid表中通過key+family+qualifier查詢id
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1GetCB.call(TestTsdbuidAdd.java:344): ---處理3---返回在hbase表中查詢指定name的id的結果
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1GetIdCB.call(TestTsdbuidAdd.java:309): ---處理2---對從資料庫HBase中查詢結果進行處理:沒找到丟擲NoSuchUniqueName異常,找到將id,name放入快取
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1HandleNoSuchUniqueNameCB.call(TestTsdbuidAdd.java:255): ---處理1--失敗處理---對從hbase中查詢丟擲異常的處理:---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1HandleNoSuchUniqueNameCB.call(TestTsdbuidAdd.java:277): ---hbase資料庫中不存在進行新增-----UniqueIdAllocator(name, assignment).tryAllocate()--------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.tryAllocate(TestTsdbuidAdd.java:425): ---在此處呼叫UniqueIdAllocator.call方法---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=0
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:467): ---計數器自增---
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.allocateUid(TestTsdbuidAdd.java:497): ---計數器自增--- 更新UniqueIdAllocator.state=CREATE_REVERSE_MAPPING
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.allocateUid(TestTsdbuidAdd.java:498): Creating an ID for kind='metrics' name='m15'
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥    如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=1
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:472): ---儲存id/name的行---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createReverseMapping(TestTsdbuidAdd.java:513): ---先儲存id/name的行,更新UniqueIdAllocator.state=CREATE_FORWARD_MAPPING
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createReverseMapping(TestTsdbuidAdd.java:525): Got ID=15 for kind='metrics' name='m15'
----------show row start-----------
0
0
0
0
0
0
0
15
----------show row end-------------
----------show row start-----------
0
0
15
----------show row end-------------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥    如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=2
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:477): ---儲存name/id的行---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createForwardMapping(TestTsdbuidAdd.java:570): ---再儲存name/id的行,更新UniqueIdAllocator.state=DONE
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥    如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=3
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:296): --先從緩衝區中找---
m15:[
[email protected]
metrics m15: [0, 0, 15] 13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd.main(TestTsdbuidAdd.java:145): Gracefully shutdown the TSD

相關推薦

OpenTSDB原始碼分析TSDB-UID操作(新增)

為了方便研究將tsdb-uid表的新增操作抽取出來,程式碼如下: package net.opentsdb.tools; import java.nio.charset.Charset; import java.util.ArrayList; import java.ut

OpenTSDB原始碼分析TSDB操作(查詢)

TSDB表的行鍵比tsdb-uid表複雜,查詢也要複雜一些,這種複雜性體現在欄位過濾器,在該過濾器中監控指標、日期範圍和標籤都要考慮到。        所有的過濾器都通過TsdbQuery.run()方法實現,建立帶過濾器的掃描器,遍歷返回的行和收集資料供顯示使用,使用輔助

swoole原始碼分析Buffer的substr操作

swoole_buffer提供的substr操作用於從緩衝區中取出內容。 string swoole_buffer->substr(int $offset, int $length = -1,

swoole原始碼分析Buffer的expend操作

swoole_buffer提供的expend操作用於為快取區擴容。 swoole_buffer->expand(int $new_size); $new_size 指定新的緩衝區尺寸,必須大

swoole原始碼分析Buffer的write操作

swoole_buffer提供的write操作用於向快取區的任意記憶體位置寫資料。read/write函式可以直接讀寫記憶體。所以使用務必要謹慎,否則可能會破壞現有資料。 swoole_buffer->write(int $offset, string $data)

Memcached原始碼分析增刪改查操作(5)

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_comman

Netty原始碼分析ChannelPipeline(二)—ChannelHandler的新增與刪除

上篇文章中,我們對Netty中ChannelPipeline的構造與初始化進行了分析與總結,本篇文章我們將對ChannelHandler的新增與刪除操作進行具體的的程式碼分析; 一、ChannelHandler的新增 下面是Netty官方的一段demo原始碼,可以看到在服務端初始化時執行了向Channel

openTSDB 原始碼詳解寫入資料到 tsdb-uid

openTSDB 原始碼詳解之寫入資料到tsdb-uid表 1.方法入口messageReceived public void messageReceived(final ChannelHandlerContext ctx,

Memcached原始碼分析Hash操作

Memcached的Hash表用來提高資料訪問效能,通過連結法來解決Hash衝突,當Hash表中資料多餘Hash表容量的1.5倍時,Hash表就會擴容,Memcached的Hash表操作沒什麼特別的,我們這裡簡單介紹下Memcached裡面的Hash表操作。 //hash表

openTSDB詳解底層HBase分析

openTSDB詳解之底層HBase表分析 【Updating】 檢視hbase的表負載情況 hbase(main):001:0> status 1 active master, 0 backup masters, 3 servers, 0 dead, 2.3

swoole_process原始碼分析setBlocking操作

swoole_process提供的setBlocking用於設定管道是否為阻塞模式。預設Process的管道為同步阻塞。 function swoole_process->setBlocking

swoole_process原始碼分析alarm操作

swoole_process提供的alarm是個高精度定時器,是作業系統setitimer系統呼叫的封裝,可以設定微秒級別的定時器。定時器會觸發訊號,需要與swoole_process::signal或pcntl_signal配合使用。 function swoole_pr

swoole_process原始碼分析close操作

swoole_process提供的close用於關閉建立的好的管道。 bool swoole_process->close(int $which = 0); $which 指定關閉哪一個管道

Open vSwitch(OvS)原始碼分析工作流程(flow流查詢)

前面分析了Open vSwitch幾部分原始碼,對於Open vSwitch也有了個大概的理解,今天要分析的程式碼將是整個Open vSwitch的重中之重。整個Open vSwitch的核心程式碼在datapath檔案中;而datapath檔案中的核心程式碼又在ovs_dp_process_re

Python 原始碼分析位元組碼基本操作

本文基於 Python 3.6.4 編譯器生成位元組碼,你可通過如下程式碼片段得到 python 原始碼對應的位元組碼 #!/usr/bin/env python # encoding: utf-8 import sys import dis filename=sys.argv

elasticsearch原始碼分析索引操作(九)

上節介紹了es的node啟動如何建立叢集服務的過程,這節在其基礎之上介紹es索引的基本操作功能(create、exist、delete),用來進一步細化es叢集是如果工作的。 客戶端部分的操作就不予介紹了,詳細可以參照elasticsearch原始碼分析之客戶

Android Wi-Fi原始碼分析WifiService操作Wi-Fi(一):分析Wifi.c中的wifi_load_driver()函式

Wi-Fi原始碼分析之WifiService操作Wi-Fi(一) 分析Wifi.c中的wifi_load_driver()函式 int wifi_load_driver() { AL

MySQL系列:innodb原始碼分析空間管理

innodb在實現表空間(table space)基於檔案IO之上構建的一層邏輯儲存空間管理,table space採用邏輯分層的結構:space、segment inode、extent和page.在實現層的邏輯使用了磁碟連結串列這種結構來管理邏輯關係。我們先來介紹磁碟連

STL原始碼分析hash(gnu-c++ 2.9)

1、基本概念 關於hash表的概念這裡就不再多說,hash表的變化一般都在雜湊函式和退避方法上。STL採用的是開鏈法,即每個hash桶裡面維持一個連結串列,hash函式計算出位置後,就將節點插入該位置的連結串列上,因此,底層實現為hash表的容器,迭代器的實現

雲客Drupal8原始碼分析單Form API

在閱讀本主題前建議你先閱讀本系列前面的《表單定義示例》主題,看一看在drupal8中是如何運用表單的。表單處理流程:一般情況下表單流程是先顯示一個表單,使用者填寫,然後提交,系統處理,如果有錯則重新顯示並給出錯誤提示,反之沒有錯誤那麼完成後給出一個響應或者一個重定向響應,這是