OpenTSDB原始碼分析之TSDB-UID表操作(新增)
package net.opentsdb.tools;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import net.opentsdb.core.TSDB;
import net.opentsdb.meta.UIDMeta;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Bytes;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.HBaseException;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
public class TestTsdbuidAdd {
private static final Logger LOG = LoggerFactory.getLogger(TestTsdbuidAdd.class);
private static final Charset CHARSET = Charset.forName("ISO-8859-1");
private static final byte[] ID_FAMILY = toBytes("id");
private static final byte[] NAME_FAMILY = toBytes("name");
private static final byte[] MAXID_ROW = { 0 };
private static final short MAX_ATTEMPTS_ASSIGN_ID = 3;
private final HBaseClient client;
private final byte[] table;
private final byte[] kind;//列修飾符
private final UniqueIdType type;
private final short id_width;
private final ConcurrentHashMap<String, byte[]> name_cache = new ConcurrentHashMap<String, byte[]>();
private final ConcurrentHashMap<String, String> id_cache = new ConcurrentHashMap<String, String>();
//pending_assignments pending即將發生的 assignments工作、任務
private final ConcurrentHashMap<String, Deferred<byte[]>> pending_assignments = new ConcurrentHashMap<String, Deferred<byte[]>>();
private volatile int cache_hits;
private volatile int cache_misses;
private TSDB tsdb;
public TestTsdbuidAdd(final HBaseClient client, final byte[] table, final String kind, final int width) {
this.client = client;
this.table = table;
if (kind.isEmpty()) {
throw new IllegalArgumentException("Empty string as 'kind' argument!");
}
this.kind = toBytes(kind);
type = stringToUniqueIdType(kind);
//LOG.debug("---type=" + type + "---");
if (width < 1 || width > 8) {
throw new IllegalArgumentException("Invalid width: " + width);
}
this.id_width = (short) width;
}
public static UniqueIdType stringToUniqueIdType(final String type) {
if (type.toLowerCase().equals("metric") || type.toLowerCase().equals("metrics")) {
return UniqueIdType.METRIC;
} else if (type.toLowerCase().equals("tagk")) {
return UniqueIdType.TAGK;
} else if (type.toLowerCase().equals("tagv")) {
return UniqueIdType.TAGV;
} else {
throw new IllegalArgumentException("Invalid type requested: " + type);
}
}
static void usage(final ArgP argp, final String errmsg) {
System.err.println(errmsg);
System.err.println("Usage: uid <subcommand> args\n" + "Sub commands:\n" + " grep [kind] <RE>: Finds matching IDs.\n"
+ " assign <kind> <name> [names]:" + " Assign an ID for the given name(s).\n"
+ " rename <kind> <name> <newname>: Renames this UID.\n" + " fsck: Checks the consistency of UIDs.\n"
+ " [kind] <name>: Lookup the ID of this name.\n" + " [kind] <ID>: Lookup the name of this ID.\n"
+ " metasync: Generates missing TSUID and UID meta entries, updates\n"
+ " created timestamps\n"
+ " metapurge: Removes meta data entries from the UID table\n"
+ " treesync: Process all timeseries meta objects through tree rules\n"
+ " treepurge <id> [definition]: Purge a tree and/or the branches\n"
+ " from storage. Provide an integer Tree ID and optionally\n"
+ " add \"true\" to delete the tree definition\n\n"
+ "Example values for [kind]:"
+ " metric, tagk (tag name), tagv (tag value).");
if (argp != null) {
System.err.print(argp.usage());
}
}
public static void main(String[] args) throws Exception {
args = new String[] { "assign", "metrics", "m15" };
ArgP argp = new ArgP();
CliOptions.addCommon(argp);
CliOptions.addVerbose(argp);
argp.addOption("--idwidth", "N", "Number of bytes on which the UniqueId is encoded.");
argp.addOption("--ignore-case", "Ignore case distinctions when matching a regexp.");
argp.addOption("-i", "Short for --ignore-case.");
args = CliOptions.parse(argp, args);
if (args == null) {
usage(argp, "Invalid usage");
System.exit(2);
} else if (args.length < 1) {
usage(argp, "Not enough arguments");
System.exit(2);
}
final byte[] table = argp.get("--uidtable", "tsdb-uid").getBytes();
final short idwidth = (argp.has("--idwidth") ? Short.parseShort(argp.get("--idwidth")) : 3);
if (idwidth <= 0) {
usage(argp, "Negative or 0 --idwidth");
System.exit(3);
}
final boolean ignorecase = argp.has("--ignore-case") || argp.has("-i");
Config config = CliOptions.getConfig(argp);
final TSDB tsdb = new TSDB(config);
tsdb.getClient().ensureTableExists(config.getString("tsd.storage.hbase.uid_table")).joinUninterruptibly();
argp = null;
int rc;
try {
rc = runCommand(tsdb, table, idwidth, ignorecase, args);
} finally {
try {
tsdb.getClient().shutdown().joinUninterruptibly();
LOG.info("Gracefully shutdown the TSD");
} catch (Exception e) {
LOG.error("Unexpected exception while shutting down", e);
rc = 42;
}
}
System.exit(rc);
}
private static int runCommand(final TSDB tsdb, final byte[] table, final short idwidth, final boolean ignorecase, final String[] args) {
/**
* 新增 assign metrics t1
*/
if (args[0].equals("assign")) {
assign(tsdb.getClient(), table, idwidth, args);
}
/**
* 查詢 grep t grep metrics t11
*/
if (args[0].equals("grep")) {
}
/**
* 修改
*/
if (args[0].equals("rename")) {
}
/**
* 刪除
*/
if (args[0].equals("delete")) {
}
return 0;
}
private static int assign(final HBaseClient client, final byte[] table, final short idwidth, final String[] args) {
final TestTsdbuidAdd uid = new TestTsdbuidAdd(client, table, args[1], (int) idwidth);
for (int i = 2; i < args.length; i++) {
try {
uid.getOrCreateId(args[i]);
extactLookupName(uid, args[i]);
} catch (HBaseException e) {
LOG.error("error while processing " + args[i], e);
return 3;
}
}
return 0;
}
/*
private static int extactLookupName(final HBaseClient client, final byte[] table, final short idwidth, final String kind, final String name) {
final TestTsdbuidAdd uid = new TestTsdbuidAdd(client, table, kind, (int) idwidth);
try {
final byte[] id = uid.getId(name);
System.out.println(kind + ' ' + name + ": " + Arrays.toString(id));
return 0;
} catch (NoSuchUniqueName e) {
LOG.error(e.getMessage());
return 1;
}
}
*/
private static int extactLookupName(TestTsdbuidAdd uid, String name) {
try {
final byte[] id = uid.getId(name);
System.out.println(uid.kind() + ' ' + name + ": " + Arrays.toString(id));
return 0;
} catch (NoSuchUniqueName e) {
LOG.error(e.getMessage());
return 1;
}
}
public byte[] getId(final String name) throws NoSuchUniqueName, HBaseException {
try {
return getIdAsync(name).joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}
public byte[] getOrCreateId(final String name) throws HBaseException {
try {
/**
* 建立一個Deferred並等待其返回
*/
return getOrCreateIdAsync(name).joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}
public Deferred<byte[]> getOrCreateIdAsync(final String name) {
class HandleNoSuchUniqueNameCB implements Callback<Object, Exception> {
public Object call(final Exception e) {
LOG.debug("---處理1--失敗處理---對從hbase中查詢丟擲異常的處理:---");
if (e instanceof NoSuchUniqueName) {
Deferred<byte[]> assignment = null;
//pending_assignments主要用於一個避免多執行緒衝突,比如兩個人同時新建衝突
synchronized (pending_assignments) {
assignment = pending_assignments.get(name);
if (assignment == null) {
// to prevent(預防) UID leaks(洩漏) that can be caused when multiple time
// series for the same metric or tags arrive, we need to write a
// deferred to the pending map as quickly as possible.
// Then we can start the assignment process after we've stashed(貯藏)
// the deferred and released the lock
assignment = new Deferred<byte[]>();
pending_assignments.put(name, assignment);
} else {
LOG.info("Already waiting for UID assignment: " + name);
return assignment;
}
}
// start the assignment dance after stashing(貯藏) the deferred
LOG.debug("---hbase資料庫中不存在進行新增-----UniqueIdAllocator(name, assignment).tryAllocate()--------");
return new UniqueIdAllocator(name, assignment).tryAllocate();
}
// Other unexpected exception, let it bubble up.
System.out.println("Caught an exception here");
return e;
}
}
// Kick off(開始) the HBase lookup(查詢), and if we don't find it there
// either,start the process to allocate a UID.
LOG.debug("---查詢1--失敗處理---通過getIdAsync(name)查詢標籤是否存在---------");
return getIdAsync(name).addErrback(new HandleNoSuchUniqueNameCB());
}
public Deferred<byte[]> getIdAsync(final String name) {
LOG.debug("--先從緩衝區中找---");
/**
* 先從緩衝區中找 找到就返回
*/
final byte[] id = getIdFromCache(name);
if (id != null) {
cache_hits++;
return Deferred.fromResult(id);
}
cache_misses++;
class GetIdCB implements Callback<byte[], byte[]> {
public byte[] call(final byte[] id) {
LOG.debug("---處理2---對從資料庫HBase中查詢結果進行處理:沒找到丟擲NoSuchUniqueName異常,找到將id,name放入快取");
if (id == null) {
throw new NoSuchUniqueName(kind(), name);
}
if (id.length != id_width) {
throw new IllegalStateException("Found id.length = " + id.length + " which is != " + id_width + " required for '" + kind() + '\'');
}
addIdToCache(name, id);
addNameToCache(id, name);
return id;
}
}
/**
* 從資料庫HBase中查詢
* 沒有找到就丟擲NoSuchUniqueName異常
* 找到的話就把(name,id)放入緩衝區並返回id
*/
LOG.debug("---查詢2---利用name從資料庫HBase中查詢 ");
Deferred<byte[]> d = getIdFromHBase(name).addCallback(new GetIdCB());
return d;
}
private Deferred<byte[]> getIdFromHBase(final String name) {
return hbaseGet(toBytes(name), ID_FAMILY);
}
private Deferred<byte[]> hbaseGet(final byte[] key, final byte[] family) {
final GetRequest get = new GetRequest(table, key);
// kind為列簇修飾符metrics
get.family(family).qualifier(kind);
class GetCB implements Callback<byte[], ArrayList<KeyValue>> {
public byte[] call(final ArrayList<KeyValue> row) {
LOG.debug("---處理3---返回在hbase表中查詢指定name的id的結果");
if (row == null || row.isEmpty()) {
return null;
}
return row.get(0).value();
}
}
LOG.debug("---查詢3---先利用HBaseClient.get(get)來在HBase中的tsdb-uid表中通過key+family+qualifier查詢id");
return client.get(get).addCallback(new GetCB());
}
private void addIdToCache(final String name, final byte[] id) {
byte[] found = name_cache.get(name);
if (found == null) {
found = name_cache.putIfAbsent(name,
// Must make a defensive copy to be immune
// to any changes the caller may do on the
// array later on.
Arrays.copyOf(id, id.length));
}
if (found != null && !Arrays.equals(found, id)) {
throw new IllegalStateException("name=" + name + " => id=" + Arrays.toString(id) + ", already mapped to " + Arrays.toString(found));
}
}
private void addNameToCache(final byte[] id, final String name) {
final String key = fromBytes(id);
String found = id_cache.get(key);
if (found == null) {
found = id_cache.putIfAbsent(key, name);
}
if (found != null && !found.equals(name)) {
throw new IllegalStateException("id=" + Arrays.toString(id) + " => name=" + name + ", already mapped to " + found);
}
}
private byte[] getIdFromCache(final String name) {
for(Entry<String, byte[]> en : name_cache.entrySet()) {
System.out.println(en.getKey() + ":" + en.getValue().toString());
}
return name_cache.get(name);
}
private static byte[] toBytes(final String s) {
return s.getBytes(CHARSET);
}
private static String fromBytes(final byte[] b) {
return new String(b, CHARSET);
}
private final class UniqueIdAllocator implements Callback<Object, Object> {
private final String name; // What we're trying to allocate an ID for.
private final Deferred<byte[]> assignment; // deferred to call back
private short attempt = MAX_ATTEMPTS_ASSIGN_ID; // Give up when zero.
private HBaseException hbe = null; // Last exception caught.
private long id = -1; // The ID we'll grab with an atomic increment.
private byte row[]; // The same ID, as a byte array.
private static final byte ALLOCATE_UID = 0;
private static final byte CREATE_REVERSE_MAPPING = 1;
private static final byte CREATE_FORWARD_MAPPING = 2;
private static final byte DONE = 3;
private byte state = ALLOCATE_UID; // Current state of the process.
UniqueIdAllocator(final String name, final Deferred<byte[]> assignment) {
this.name = name;
this.assignment = assignment;
}
Deferred<byte[]> tryAllocate() {
attempt--;
state = ALLOCATE_UID;
LOG.debug("---在此處呼叫UniqueIdAllocator.call方法---");
call(null);
return assignment;
}
@SuppressWarnings("unchecked")
public Object call(final Object arg) {
LOG.debug("--對UniqueIdAllocator物件的回撥 ----");
if (attempt == 0) {
if (hbe == null) {
throw new IllegalStateException("Should never happen!");
}
LOG.error("Failed to assign an ID for kind='" + kind() + "' name='" + name + "'", hbe);
throw hbe;
}
if (arg instanceof Exception) {
final String msg = ("Failed attempt #" + (MAX_ATTEMPTS_ASSIGN_ID - attempt) + " to assign an UID for " + kind() + ':' + name
+ " at step #" + state);
if (arg instanceof HBaseException) {
LOG.error(msg, (Exception) arg);
hbe = (HBaseException) arg;
return tryAllocate(); // Retry from the beginning.
} else {
LOG.error("WTF? Unexpected exception! " + msg, (Exception) arg);
return arg; // Unexpected exception, let it bubble up.
}
}
class ErrBack implements Callback<Object, Exception> {
public Object call(final Exception e) throws Exception {
assignment.callback(e);
return assignment;
}
}
final Deferred d;
LOG.debug("---UniqueId分配狀態state=" + state);
switch (state) {
case ALLOCATE_UID:
LOG.debug("---計數器自增---");
d = allocateUid();
break;
case CREATE_REVERSE_MAPPING:
LOG.debug("---儲存id/name的行---");
d = createReverseMapping(arg);
break;
case CREATE_FORWARD_MAPPING:
LOG.debug("---儲存name/id的行---");
d = createForwardMapping(arg);
break;
case DONE:
return done(arg);
default:
throw new AssertionError("Should never be here!");
}
/**
* addBoth()方法作用:Registers a callback both as a callback and as an "errback".
*
* 對進行的操作增加對此類的call()回撥(即在執行完call方法後再執行call方法相當於遞迴,只到state=DONE)
*/
LOG.debug("---增加對此類UniqueIdAllocator的回撥 如果回撥返回異常對ErrBack進行回撥");
return d.addBoth(this).addErrback(new ErrBack());
}
private Deferred<Long> allocateUid() {
LOG.info("---計數器自增--- 更新UniqueIdAllocator.state=CREATE_REVERSE_MAPPING");
LOG.info("Creating an ID for kind='" + kind() + "' name='" + name + '\'');
state = CREATE_REVERSE_MAPPING;
return client.atomicIncrement(new AtomicIncrementRequest(table, MAXID_ROW, ID_FAMILY, kind));
}
/**
* Create the reverse mapping. We do this before the forward one so that
* if we die before creating the forward mapping we don't run the risk
* of "publishing" a partially assigned ID. The reverse mapping on its
* own is harmless but the forward mapping without reverse mapping is
* bad as it would point to an ID that cannot be resolved.
* 意思是:先儲存id/name的行,再儲存name/id的行,這樣的好處是在name/id的行儲存失敗時沒有什麼影響
*/
private Deferred<Boolean> createReverseMapping(final Object arg) {
LOG.debug("---先儲存id/name的行,更新UniqueIdAllocator.state=CREATE_FORWARD_MAPPING");
/**
* 此方法在id自增之後呼叫,因此此處arg必為Long型
*/
if (!(arg instanceof Long)) {
throw new IllegalStateException("Expected a Long but got " + arg);
}
id = (Long) arg;
if (id <= 0) {
throw new IllegalStateException("Got a negative ID from HBase: " + id);
}
LOG.info("Got ID=" + id + " for kind='" + kind() + "' name='" + name + "'");
row = Bytes.fromLong(id);
System.out.println("----------show row start-----------");
for(byte b : row) {
System.out.println(b);
}
System.out.println("----------show row end-------------");
// row.length should actually(實際上) be 8.
if (row.length < id_width) {
throw new IllegalStateException("OMG, row.length = " + row.length + " which is less than " + id_width + " for id=" + id + " row="
+ Arrays.toString(row));
}
// Verify that we're going to drop bytes that are 0.
for (int i = 0; i < row.length - id_width; i++) {
if (row[i] != 0) {
final String message = "All Unique IDs for " + kind() + " on " + id_width + " bytes are already assigned!";
LOG.error("OMG " + message);
throw new IllegalStateException(message);
}
}
// Shrink the ID on the requested number of bytes.
row = Arrays.copyOfRange(row, row.length - id_width, row.length);
System.out.println("----------show row start-----------");
for(byte b : row) {
System.out.println(b);
}
System.out.println("----------show row end-------------");
state = CREATE_FORWARD_MAPPING;
// We are CAS'ing(compareAndSet) the KV into existence(存在) -- the second argument is
// how we tell HBase we want to atomically(原子的) create the KV, so that if
// there is already a KV in this cell, we'll fail. Technically we could do
// just a `put' here, as we have a freshly(剛剛) allocated UID, so there
// is not reason why a KV should already exist for this UID, but just
// to err on the safe side and catch really weird corruption cases, we
// do a CAS instead to create the KV.
return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY);
}
private PutRequest reverseMapping() {
return new PutRequest(table, row, NAME_FAMILY, kind, toBytes(name));
}
private Deferred<?> createForwardMapping(final Object arg) {
LOG.debug("---再儲存name/id的行,更新UniqueIdAllocator.state=DONE");
if (!(arg instanceof Boolean)) {
throw new IllegalStateException("Expected a Boolean but got " + arg);
}
if (!((Boolean) arg)) { // Previous CAS failed. Something is really // messed up.
LOG.error("WTF! Failed to CAS reverse mapping: " + reverseMapping() + " -- run an fsck against the UID table!");
return tryAllocate(); // Try again from the beginning.
}
state = DONE;
return client.compareAndSet(forwardMapping(), HBaseClient.EMPTY_ARRAY);
}
private PutRequest forwardMapping() {
return new PutRequest(table, toBytes(name), ID_FAMILY, kind, row);
}
private Deferred<byte[]> done(final Object arg) {
if (!(arg instanceof Boolean)) {
throw new IllegalStateException("Expected a Boolean but got " + arg);
}
if (!((Boolean) arg)) { // Previous CAS failed. We lost a race.
LOG.warn("Race condition: tried to assign ID " + id + " to " + kind() + ":" + name + ", but CAS failed on " + forwardMapping()
+ ", which indicates this UID must have" + " been allocated concurrently by another TSD. So ID " + id + " was leaked.");
// If two TSDs attempted to allocate a UID for the same name at
// the
// same time, they would both have allocated a UID, and created
// a
// reverse mapping, and upon getting here, only one of them
// would
// manage to CAS this KV into existence. The one that loses the
// race will retry and discover the UID assigned by the winner
// TSD,
// and a UID will have been wasted in the process. No big deal.
class GetIdCB implements Callback<Deferred<byte[]>, byte[]> {
public Deferred<byte[]> call(final byte[] row) throws Exception {
assignment.callback(row);
return assignment;
}
}
return getIdAsync(name).addCallbackDeferring(new GetIdCB());
}
cacheMapping(name, row);
if (tsdb != null && tsdb.getConfig().enable_realtime_uid()) {
final UIDMeta meta = new UIDMeta(type, row, name);
meta.storeNew(tsdb);
LOG.info("Wrote UIDMeta for: " + name);
tsdb.indexUIDMeta(meta);
}
pending_assignments.remove(name);
assignment.callback(row);
return assignment;
}
}
/**
* Adds the bidirectional(雙向的) mapping in the cache.
*/
private void cacheMapping(final String name, final byte[] id) {
addIdToCache(name, id);
addNameToCache(id, name);
}
public String kind() {
return fromBytes(kind);
}
}
執行流程日誌:
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getOrCreateIdAsync(TestTsdbuidAdd.java:289): ---查詢1--失敗處理---通過getIdAsync(name)查詢標籤是否存在---------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:296): --先從緩衝區中找---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:327): ---查詢2---利用name從資料庫HBase中查詢
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.hbaseGet(TestTsdbuidAdd.java:352): ---查詢3---先利用HBaseClient.get(get)來在HBase中的tsdb-uid表中通過key+family+qualifier查詢id
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1GetCB.call(TestTsdbuidAdd.java:344): ---處理3---返回在hbase表中查詢指定name的id的結果
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1GetIdCB.call(TestTsdbuidAdd.java:309): ---處理2---對從資料庫HBase中查詢結果進行處理:沒找到丟擲NoSuchUniqueName異常,找到將id,name放入快取
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1HandleNoSuchUniqueNameCB.call(TestTsdbuidAdd.java:255): ---處理1--失敗處理---對從hbase中查詢丟擲異常的處理:---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$1HandleNoSuchUniqueNameCB.call(TestTsdbuidAdd.java:277): ---hbase資料庫中不存在進行新增-----UniqueIdAllocator(name, assignment).tryAllocate()--------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.tryAllocate(TestTsdbuidAdd.java:425): ---在此處呼叫UniqueIdAllocator.call方法---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=0
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:467): ---計數器自增---
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.allocateUid(TestTsdbuidAdd.java:497): ---計數器自增--- 更新UniqueIdAllocator.state=CREATE_REVERSE_MAPPING
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.allocateUid(TestTsdbuidAdd.java:498): Creating an ID for kind='metrics' name='m15'
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥 如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=1
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:472): ---儲存id/name的行---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createReverseMapping(TestTsdbuidAdd.java:513): ---先儲存id/name的行,更新UniqueIdAllocator.state=CREATE_FORWARD_MAPPING
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createReverseMapping(TestTsdbuidAdd.java:525): Got ID=15 for kind='metrics' name='m15'
----------show row start-----------
0
0
0
0
0
0
0
15
----------show row end-------------
----------show row start-----------
0
0
15
----------show row end-------------
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥 如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=2
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:477): ---儲存name/id的行---
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.createForwardMapping(TestTsdbuidAdd.java:570): ---再儲存name/id的行,更新UniqueIdAllocator.state=DONE
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:492): ---增加對此類UniqueIdAllocator的回撥 如果回撥返回異常對ErrBack進行回撥
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:433): --對UniqueIdAllocator物件的回撥 ----
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd$UniqueIdAllocator.call(TestTsdbuidAdd.java:464): ---UniqueId分配狀態state=3
13/09/26 11:40:50 DEBUG net.opentsdb.tools.TestTsdbuidAdd.getIdAsync(TestTsdbuidAdd.java:296): --先從緩衝區中找---
m15:[ [email protected]
metrics m15: [0, 0, 15]
13/09/26 11:40:50 INFO net.opentsdb.tools.TestTsdbuidAdd.main(TestTsdbuidAdd.java:145): Gracefully shutdown the TSD
相關推薦
OpenTSDB原始碼分析之TSDB-UID表操作(新增)
為了方便研究將tsdb-uid表的新增操作抽取出來,程式碼如下: package net.opentsdb.tools; import java.nio.charset.Charset; import java.util.ArrayList; import java.ut
OpenTSDB原始碼分析之TSDB表操作(查詢)
TSDB表的行鍵比tsdb-uid表複雜,查詢也要複雜一些,這種複雜性體現在欄位過濾器,在該過濾器中監控指標、日期範圍和標籤都要考慮到。 所有的過濾器都通過TsdbQuery.run()方法實現,建立帶過濾器的掃描器,遍歷返回的行和收集資料供顯示使用,使用輔助
swoole原始碼分析之Buffer的substr操作
swoole_buffer提供的substr操作用於從緩衝區中取出內容。 string swoole_buffer->substr(int $offset, int $length = -1,
swoole原始碼分析之Buffer的expend操作
swoole_buffer提供的expend操作用於為快取區擴容。 swoole_buffer->expand(int $new_size); $new_size 指定新的緩衝區尺寸,必須大
swoole原始碼分析之Buffer的write操作
swoole_buffer提供的write操作用於向快取區的任意記憶體位置寫資料。read/write函式可以直接讀寫記憶體。所以使用務必要謹慎,否則可能會破壞現有資料。 swoole_buffer->write(int $offset, string $data)
Memcached原始碼分析之增刪改查操作(5)
文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_comman
Netty原始碼分析之ChannelPipeline(二)—ChannelHandler的新增與刪除
上篇文章中,我們對Netty中ChannelPipeline的構造與初始化進行了分析與總結,本篇文章我們將對ChannelHandler的新增與刪除操作進行具體的的程式碼分析; 一、ChannelHandler的新增 下面是Netty官方的一段demo原始碼,可以看到在服務端初始化時執行了向Channel
openTSDB 原始碼詳解之寫入資料到 tsdb-uid 表
openTSDB 原始碼詳解之寫入資料到tsdb-uid表 1.方法入口messageReceived public void messageReceived(final ChannelHandlerContext ctx,
Memcached原始碼分析之Hash表操作
Memcached的Hash表用來提高資料訪問效能,通過連結法來解決Hash衝突,當Hash表中資料多餘Hash表容量的1.5倍時,Hash表就會擴容,Memcached的Hash表操作沒什麼特別的,我們這裡簡單介紹下Memcached裡面的Hash表操作。 //hash表
openTSDB詳解之底層HBase表分析
openTSDB詳解之底層HBase表分析 【Updating】 檢視hbase的表負載情況 hbase(main):001:0> status 1 active master, 0 backup masters, 3 servers, 0 dead, 2.3
swoole_process原始碼分析之setBlocking操作
swoole_process提供的setBlocking用於設定管道是否為阻塞模式。預設Process的管道為同步阻塞。 function swoole_process->setBlocking
swoole_process原始碼分析之alarm操作
swoole_process提供的alarm是個高精度定時器,是作業系統setitimer系統呼叫的封裝,可以設定微秒級別的定時器。定時器會觸發訊號,需要與swoole_process::signal或pcntl_signal配合使用。 function swoole_pr
swoole_process原始碼分析之close操作
swoole_process提供的close用於關閉建立的好的管道。 bool swoole_process->close(int $which = 0); $which 指定關閉哪一個管道
Open vSwitch(OvS)原始碼分析之工作流程(flow流表查詢)
前面分析了Open vSwitch幾部分原始碼,對於Open vSwitch也有了個大概的理解,今天要分析的程式碼將是整個Open vSwitch的重中之重。整個Open vSwitch的核心程式碼在datapath檔案中;而datapath檔案中的核心程式碼又在ovs_dp_process_re
Python 原始碼分析之位元組碼之基本操作
本文基於 Python 3.6.4 編譯器生成位元組碼,你可通過如下程式碼片段得到 python 原始碼對應的位元組碼 #!/usr/bin/env python # encoding: utf-8 import sys import dis filename=sys.argv
elasticsearch原始碼分析之索引操作(九)
上節介紹了es的node啟動如何建立叢集服務的過程,這節在其基礎之上介紹es索引的基本操作功能(create、exist、delete),用來進一步細化es叢集是如果工作的。 客戶端部分的操作就不予介紹了,詳細可以參照elasticsearch原始碼分析之客戶
Android Wi-Fi原始碼分析之WifiService操作Wi-Fi(一):分析Wifi.c中的wifi_load_driver()函式
Wi-Fi原始碼分析之WifiService操作Wi-Fi(一) 分析Wifi.c中的wifi_load_driver()函式 int wifi_load_driver() { AL
MySQL系列:innodb原始碼分析之表空間管理
innodb在實現表空間(table space)基於檔案IO之上構建的一層邏輯儲存空間管理,table space採用邏輯分層的結構:space、segment inode、extent和page.在實現層的邏輯使用了磁碟連結串列這種結構來管理邏輯關係。我們先來介紹磁碟連
STL原始碼分析之hash表(gnu-c++ 2.9)
1、基本概念 關於hash表的概念這裡就不再多說,hash表的變化一般都在雜湊函式和退避方法上。STL採用的是開鏈法,即每個hash桶裡面維持一個連結串列,hash函式計算出位置後,就將節點插入該位置的連結串列上,因此,底層實現為hash表的容器,迭代器的實現
雲客Drupal8原始碼分析之表單Form API
在閱讀本主題前建議你先閱讀本系列前面的《表單定義示例》主題,看一看在drupal8中是如何運用表單的。表單處理流程:一般情況下表單流程是先顯示一個表單,使用者填寫,然後提交,系統處理,如果有錯則重新顯示並給出錯誤提示,反之沒有錯誤那麼完成後給出一個響應或者一個重定向響應,這是