1. 程式人生 > 其它 >hbase原始碼系列(十五)終結篇&Scan續集-->如何查詢出來下一個KeyValue

hbase原始碼系列(十五)終結篇&Scan續集-->如何查詢出來下一個KeyValue

這是這個系列的最後一篇了,實在沒精力寫了,本來還想寫一下hbck的,這個東西很常用,當hbase的Meta表出現錯誤的時候,它能夠幫助我們進行修復,無奈看到3000多行的程式碼時,退卻了,原諒我這點自私的想法吧。

在講《Get、Scan在服務端是如何處理?》當中的nextInternal流程,它的第一步從storeHeap當中取出當前kv,這塊其實有點兒小複雜的,因為它存在異構的Scanner(一個MemStoreScanner和多個StoreFileScanner),那怎麼保證從storeHeap裡面拿出來的總是離上一個kv最接近的kv呢?

這裡我們知道,在開啟這些Scanner之後,就對他們進行了一下seek操作,它們就已經調整到最佳位置了。

我們看看KeyValueHeap的建構函式裡面去看看吧。

public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) throws IOException {
    this.comparator = new KVScannerComparator(comparator);
    if (!scanners.isEmpty()) {
      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
          this.comparator);
      //...
     this.current = pollRealKV();
    }
}

它內部有一個叫heap的PriorityQueue<KeyValueScanner>佇列,它會對所有的Scanner進行排序,排序的比較器是KVScannerComparator, 然後current又呼叫了pollRealKV通過比較獲得當前的Scanner,後面會講。

那好,我們直接進去KVScannerComparator看看它的compare方法就能知道怎麼回事了。

public int compare(KeyValueScanner left, KeyValueScanner right) {
      // 先各取出來一個KeyValue進行比較
      int comparison = compare(left.peek(), right.peek());
      if (comparison != 0) {
        return comparison;
      } else {
        // key相同,選擇最新的那個
        long leftSequenceID = left.getSequenceID();
        long rightSequenceID = right.getSequenceID();
        if (leftSequenceID > rightSequenceID) {
          return -1;
        } else if (leftSequenceID < rightSequenceID) {
          return 1;
        } else {
          return 0;
        }
      }
}

額,從上面程式碼看得出來,把left和right各取出一個kv來進行比較,如果一樣就比較SequenceID,SequenceID越大說明這個檔案越新,返回-1,在升序的情況下,這個Scanner就跑到前面去了。 這樣就實現了heap裡面拿出來的第一個就是最小的kv的最新版。

在繼續將之前,我們看一下在KeyValue是怎麼被呼叫的,這樣我們好理清思路。

//從storeHeap裡面取出一個來
KeyValue current = this.storeHeap.peek();
//後面是一頓比較,比較通過,把結果儲存到results當中
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, length);

接著看populateResult方法。

private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
        byte[] currentRow, int offset, short length) throws IOException {
      KeyValue nextKv;
      do {
        //從heap當中取出剩下的結果儲存在results當中
        heap.next(results, limit - results.size());
        //如果夠數了,就返回了
        if (limit > 0 && results.size() == limit) {
          return KV_LIMIT;
        }
        nextKv = heap.peek();
      } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
      return nextKv;
}

我們對KeyValueHeap的使用,就是先peek,然後再next,我們接下來就按這個順序看吧。

先從peek取出來一個,peek就是從heap佇列取出來的current的scanner取出來的當前的KeyValue。

if (this.current == null) {
      return null;
}
return this.current.peek();

然後我們看next方法。

public boolean next(List<Cell> result, int limit) throws IOException {
    if (this.current == null) {
      return false;
    }
    InternalScanner currentAsInternal = (InternalScanner)this.current;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit);
    KeyValue pee = this.current.peek();
    if (pee == null || !mayContainMoreRows) {
      this.current.close();
    } else {
      this.heap.add(this.current);
    }
    this.current = pollRealKV();
    return (this.current != null);
}

1、通過currentAsInternal.next繼續獲取kv,它是隻針對通過通過檢查的當前行的剩下的KeyValue,這個過程在之前那篇文章講過了。

2、如果後面沒有值了,就關閉這個Scanner。

3、然後還有,就把這個Scanner放回heap上,等待下一次呼叫。

4、使用pollRealKV再去一個新的Scanner出來。

private KeyValueScanner pollRealKV() throws IOException {
    KeyValueScanner kvScanner = heap.poll();
    if (kvScanner == null) {
      return null;
    }

    while (kvScanner != null && !kvScanner.realSeekDone()) {
      if (kvScanner.peek() != null) {
        //查詢之前沒有查的
        kvScanner.enforceSeek();
        //把之前的查到位置的kv拿出來
        KeyValue curKV = kvScanner.peek();
        if (curKV != null) {
          //再選出來下一個的scanner
          KeyValueScanner nextEarliestScanner = heap.peek();
          if (nextEarliestScanner == null) {
            // 後面沒了,只能是它了
            return kvScanner;
          }
          
          // 那下一個Scanner的kv也出來比較比較
          KeyValue nextKV = nextEarliestScanner.peek();
          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
            // 它確實小,那麼就把它放出去吧
            return kvScanner;
          }

          // 把它放回去,和別的kv進行競爭
          heap.add(kvScanner);
        } else {
          // 它沒東西了,關閉完事
          kvScanner.close();
        }
      } else {
        // 它沒東西了,關閉完事
        kvScanner.close();
      }
      kvScanner = heap.poll();
    }

    return kvScanner;
}

鑑於它每次都要比較的情況,如果一個列族下的HFile比較多的話,它的比較次數也會增大,會影響查詢效率,查詢時間和HFile的數量成線性關係。

另外補充點內容,是前面寫Scan的時候拉下的:

由於寫入同一個rowkey相關的KeyValue的時候時間戳在前的先寫入,查詢的時候又需要總是讀該rowkey最新的KeyValue,所以在查詢的時候會先seek到該rowkey的時間戳最大的位置,具體查的時候,不斷的向前seekBefore,直到這個rowkey的KeyValue全部查完位置,然後再向前定位到一個rowkey的位置。

簡而言之:

不同rowkey的向前查,從rowkey小的查到rowkey大的;查相同rowkey的向後查,從最新的時間戳到查到最久的時間戳。

總結:

這就把如何查詢出來下一個KeyValue的過程講完了,它的peek方法、next方法、比較的方法,希望對大家有幫助,這個系列的文章到此也就結束了,下個目標是跟隨超哥學習Spark原始碼,感謝廣大讀者的支援,覺得我寫得好的,可以關注一下我的部落格,謝謝!