1. 程式人生 > 其它 >hbase原始碼系列(二)HTable 探祕

hbase原始碼系列(二)HTable 探祕

  hbase的原始碼終於搞一個段落了,在接下來的一個月,著重於把看過的原始碼提煉一下,對一些有意思的主題進行分享一下。繼上一篇講了負載均衡之後,這一篇我們從client開始講吧,從client到master再到region server,按照這個順序來開展,網友也可以對自己感興趣的部分給我留言或者直接聯絡我的QQ。

  現在我們講一下HTable吧,為什麼講HTable,因為這是我們最常見的一個類,這是我們對hbase中資料的操作的入口。

1.Put操作

  下面是一個很簡單往hbase插入一條記錄的例子。

HBaseConfiguration conf =  (HBaseConfiguration) HBaseConfiguration.create();
byte[] rowkey = Bytes.toBytes("cenyuhai");
byte[] family = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("name");
byte[] value = Bytes.toBytes("岑玉海");
        
HTable table = new HTable(conf, "test");
Put put = new Put(rowkey);
put.add(family,qualifier,value);
        
table.put(put);

  我們平常就是採用這種方式提交的資料,為了提高重用性採用HTablePool,最新的API推薦使用HConnection.getTable("test")來獲得HTable,舊的HTablePool已經被拋棄了。好,我們下面開始看看HTable內部是如何實現的吧,首先我們看看它內部有什麼屬性。

  /** 實際提交資料所用的類 */
  protected HConnection connection;/** 需要提交的資料的列表 */
  protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
  /** flush的size */
  private long writeBufferSize;
  /** 是否自動flush */
  private boolean autoFlush;
  /** 當前的資料的size,達到指定的size就要提交 */
  protected long currentWriteBufferSize;
  protected int scannerCaching;
  private int maxKeyValueSize;
  private ExecutorService pool;  // For Multi

  /** 非同步提交 */
  protected AsyncProcess<Object> ap;
  ** rpc工廠 */
  private RpcRetryingCallerFactory rpcCallerFactory;

  主要是靠上面的這些傢伙來幹活的,這裡面的connection、ap、rpcCallerFactory是用來和後臺通訊的,HTable只是做一個操作,資料進來之後,新增到writeAsyncBuffer,滿足條件就flush。

  下面看看table.put是怎麼執行的:

    doPut(put);
    if (autoFlush) {
      flushCommits();
    }

  執行put操作,如果是autoFush,就提交,先看doPut的過程,如果之前的ap非同步提交到有問題,就先進行後臺提交,不過這次是同步的,如果沒有錯誤,就把put新增到隊列當中,然後檢查一下當前的 buffer的大小,超過我們設定的內容的時候,就flush掉。

if (ap.hasError()){
      backgroundFlushCommits(true);
}
currentWriteBufferSize += put.heapSize();
writeAsyncBuffer.add(put);
while (currentWriteBufferSize > writeBufferSize) {
    backgroundFlushCommits(false);
}

  寫下來,讓我們看看backgroundFlushCommits這個方法吧,它的核心就這麼一句ap.submit(writeAsyncBuffer, true) ,如果出錯了的話,就報錯了。所以網上所有關於客戶端調優的方法裡面無非就這麼幾種:

1)關閉autoFlush

2)關閉wal日誌

3)把writeBufferSize設大一點,一般說是設定成5MB

  經過實踐,就第二條關閉日誌的效果比較明顯,其它的效果都不明顯,因為提交的過程是非同步的,所以提交的時候佔用的時間並不多,提交到server端後,server還有一個寫入的佇列,(⊙o⊙)… 讓人想起小米手機那噁心的排隊了。。。所以大規模寫入資料,別指望著用put來解決。。。mapreduce生成hfile,然後用bulk load的方式比較好。

  不廢話了,我們繼續追蹤ap.submit方法吧,F3進去。

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        //為row定位
        HRegionLocation loc = findDestLocation(r, 1, posInList);

        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
          // loc is null if there is an error such as meta not available.
          Action<Row> action = new Action<Row>(r, ++posInList);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer);
          it.remove();
        }
      }

  迴圈遍歷r,為每個r找到它的位置loc,loc是HRegionLocation,裡面記錄著這行記錄所在的目標region所在的位置,loc怎麼獲得呢,走進findDestLocation方法裡面,看到了這麼一句。

loc = hConnection.locateRegion(this.tableName, row.getRow());

  通過表名和rowkey,使用HConnection就可以定位到它的位置,這裡就先不講定位了,稍後放一節出來講,請看這一篇《Client如何找到正確的Region Server》,否則篇幅太長了,這裡我們只需要記住,提交操作,是要知道它對應的region在哪裡的。

  定位到它的位置之後,它把loc新增到了actionsByServer,一個region server對應一組操作。(插句題外話為什麼這裡叫action呢,其實我們熟知的Put、Delete,以及不常用的Append、Increment都是繼承自Row的,在介面傳遞時候,其實都是視為一種操作,到了後臺之後,才做區分)。

  接下來,就是多執行緒的rpc提交了。

MultiServerCallable<Row> callable = createCallable(loc, multiAction);
......
res = createCaller(callable).callWithoutRetries(callable);

  再深挖一點,把它們的實現都扒出來吧。

  protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
      final MultiAction<Row> multi) {
    return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
  }

  protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
    return rpcCallerFactory.<MultiResponse> newCaller();
  }

  ok,看到了,先構造一個MultiServerCallable,然後再通過rpcCallerFactory做最後的call操作。

  好了,到這裡再總結一下put操作吧,前面寫得有點兒凌亂了。

  (1)把put操作新增到writeAsyncBuffer佇列裡面,符合條件(自動flush或者超過了閥值writeBufferSize)就通過AsyncProcess非同步批量提交。

  (2)在提交之前,我們要根據每個rowkey找到它們歸屬的region server,這個定位的過程是通過HConnection的locateRegion方法獲得的,然後再把這些rowkey按照HRegionLocation分組。

  (3)通過多執行緒,一個HRegionLocation構造MultiServerCallable<Row>,然後通過rpcCallerFactory.<MultiResponse> newCaller()執行呼叫,忽略掉失敗重新提交和錯誤處理,客戶端的提交操作到此結束。

2.Delete操作

  對於Delete,我們也可以通過以下程式碼執行一個delete操作

Delete del = new Delete(rowkey);
table.delete(del);

  這個操作比較乾脆,new一個RegionServerCallable<Boolean>,直接走rpc了,爽快啊。

RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
        tableName, delete.getRow()) {
      public Boolean call() throws IOException {
        try {
          MutateRequest request = RequestConverter.buildMutateRequest(
            getLocation().getRegionInfo().getRegionName(), delete);
          MutateResponse response = getStub().mutate(null, request);
          return Boolean.valueOf(response.getProcessed());
        } catch (ServiceException se) {
          throw ProtobufUtil.getRemoteException(se);
        }
      }
    };
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);

  這裡面注意一下這行MutateResponse response = getStub().mutate(null, request);

  getStub()返回的是一個ClientService.BlockingInterface介面,實現這個介面的類是HRegionServer,這樣子我們就知道它在服務端執行了HRegionServer裡面的mutate方法。

3.Get操作

  get操作也和delete一樣簡單

Get get = new Get(rowkey);
Result row = table.get(get);

  get操作也沒幾行程式碼,還是直接走的rpc

public Result get(final Get get) throws IOException {
    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}

  注意裡面的ProtobufUtil.get操作,它其實是構建了一個GetRequest,需要的引數是regionName和get,然後走HRegionServer的get方法,返回一個GetResponse

public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
}

 4.批量操作

  針對put、delete、get都有相應的操作的方式:

  1.Put(list)操作,很多童鞋以為這個可以提高寫入速度,其實無效。。。為啥?因為你構造了一個list進去,它再遍歷一下list,執行doPut操作。。。。反而還慢點。

  2.delete和get的批量操作走的都是connection.processBatchCallback(actions, tableName, pool, results, callback),具體的實現在HConnectionManager的靜態類HConnectionImplementation裡面,結果我們驚人的發現:

AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();

  它走的還是put一樣的操作,既然是一樣的,何苦程式碼寫得那麼繞呢?

5.查詢操作

  現在講一下scan吧,這個操作相對複雜點。還是老規矩,先上一下程式碼吧。

        Scan scan = new Scan();
        //scan.setTimeRange(new Date("20140101").getTime(), new Date("20140429").getTime());
        scan.setBatch(10);
        scan.setCaching(10);
        scan.setStartRow(Bytes.toBytes("cenyuhai-00000-20140101"));
        scan.setStopRow(Bytes.toBytes("cenyuhai-zzzzz-201400429"));
        //如果設定為READ_COMMITTED,它會取當前的時間作為讀的檢查點,在這個時間點之後的就排除掉了
        scan.setIsolationLevel(IsolationLevel.READ_COMMITTED);
        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("pattern"));
        ResultScanner resultScanner = table.getScanner(scan);
        Result result = null;
        while ((result = resultScanner.next()) != null) {
            //自己處理去吧...
        }

  這個是帶正則表示式的模糊查詢的scan查詢,Scan這個類是包括我們查詢所有需要的引數,batch和caching的設定,在我的另外一篇文章裡面有寫《hbase客戶端設定快取優化查詢》

Scan查詢的時候,設定StartRow和StopRow可是重頭戲,假設我這裡要查我01月01日到04月29日總共發了多少業務,中間是業務型別,但是我可能是所有的都查,或者只查一部分,在所有都查的情況下,我就不能設定了,那但是StartRow和StopRow我不能空著啊,所以這裡可以填00000-zzzzz,只要保證它在這個區間就可以了,然後我們加了一個RowFilter,然後引入了正則表示式,之前好多人一直在問啊問的,不過我這個例子,其實不要也可以,因為是查所有業務的,在StartRow和StopRow之間的都可以要。

  好的,我們接著看,F3進入getScanner方法

if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
}
return new ClientScanner(getConfiguration(), scan, getName(), this.connection);

  這個scan還分大小, 沒關係,我們進入ClientScanner看一下吧, 在ClientScanner的構造方法裡面發現它會去呼叫nextScanner去初始化一個ScannerCallable。好的,我們接著來到ScannerCallable裡面,這裡需要注意的是它的兩個方法,prepare和call方法。在prepare裡面它主要乾了兩個事情,獲得region的HRegionLocation和ClientService.BlockingInterface介面的例項,之前說過這個繼承這個介面的只有Region Server的實現類。

  public void prepare(final boolean reload) throws IOException {
    this.location = connection.getRegionLocation(tableName, row, reload);    //HConnection.getClient()這個方法簡直就是神器啊
    setStub(getConnection().getClient(getLocation().getServerName()));
  }

  ok,我們下面看看call方法吧

  public Result [] call() throws IOException {
     // 第一次走的地方,開啟scanner
      if (scannerId == -1L) {
        this.scannerId = openScanner();
      } else {
        Result [] rrs = null;
        ScanRequest request = null;
        try {
          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
          ScanResponse response = null;       
      // 準備用controller去攜帶返回的資料,這樣的話就不用進行protobuf的序列化了           
      PayloadCarryingRpcController controller = new PayloadCarryingRpcController();          
      controller.setPriority(getTableName());
          response = getStub().scan(controller, request);
          nextCallSeq++;
          long timestamp = System.currentTimeMillis();
          // Results are returned via controller
          CellScanner cellScanner = controller.cellScanner();
          rrs = ResponseConverter.getResults(cellScanner, response);
      } catch (IOException e) {              
        }     
    }     return rrs;
     
    }
    return null;
  }

   在call方法裡面,我們可以看得出來,例項化ScanRequest,然後呼叫scan方法的時候把PayloadCarryingRpcController傳過去,這裡跟蹤了一下,如果設定了codec的就從PayloadCarryingRpcController裡面返回結果,否則從response裡面返回。

  好的,下面看next方法吧。

    @Override
    public Result next() throws IOException { if (cache.size() == 0) {
        Result [] values = null;
        long remainingResultSize = maxScannerResultSize;
        int countdown = this.caching;      
     // 設定獲取資料的條數         
     callable.setCaching(this.caching);
        boolean skipFirst = false;
        boolean retryAfterOutOfOrderException  = true;
        do {
        if (skipFirst) {
         // 上次讀的最後一個,這次就不讀了,直接跳過就是了
              callable.setCaching(1);
              values = this.caller.callWithRetries(callable);
              callable.setCaching(this.caching);
              skipFirst = false;
            }
       values = this.caller.callWithRetries(callable);
          if (values != null && values.length > 0) {
            for (Result rs : values) {          //快取起來               cache.add(rs);
              for (Cell kv : rs.rawCells()) {//計算出keyvalue的大小,然後減去
                remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
              }
              countdown--;
              this.lastResult = rs;
            }
           }
          // Values == null means server-side filter has determined we must STOP
        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
       
     //快取裡面有就從快取裡面取       
     if (cache.size() > 0) {
          return cache.poll();
        }
     return null;
    }

  從next方法裡面可以看出來,它是一次取caching條資料,然後下一次獲取的時候,先把上次獲取的最後一個給排除掉,再獲取下來儲存在cache當中,只要快取不空,就一直在快取裡面取。

  好了,至此Scan到此結束。