1. 程式人生 > 其它 >hbase原始碼系列(十二)Get、Scan在服務端是如何處理?

hbase原始碼系列(十二)Get、Scan在服務端是如何處理?

繼上一篇講了Put和Delete之後,這一篇我們講Get和Scan, 因為我發現這兩個操作幾乎是一樣的過程,就像之前的Put和Delete一樣,上一篇我本來只打算寫Put的,結果發現Delete也可以走這個過程,所以就一起寫了。

Get

我們開啟HRegionServer找到get方法。Get的方法處理分兩種,設定了ClosestRowBefore和沒有設定的,一般來講,我們都是知道了明確的rowkey,不太會設定這個引數,它預設是false的。

if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
    byte[] row = get.getRow().toByteArray();
    byte[] family = get.getColumn(0).getFamily().toByteArray();
    r = region.getClosestRowBefore(row, family);
} else {
   Get clientGet = ProtobufUtil.toGet(get);
   if (existence == null) {
      r = region.get(clientGet);
   }
}    

所以我們走的是HRegion的get方法,殺過去。

public Result get(final Get get) throws IOException {
    checkRow(get.getRow(), "Get");
    // 檢查列族,以下省略程式碼一百字
    List<Cell> results = get(get, true);
    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
}

先檢查get的row是否在這個region裡面,然後檢查列族,如果沒有的話,它會根據表定義給補全的,然後它轉身又進入了另外一個get方法,真是狠心啊!

    List<Cell> results = new ArrayList<Cell>();
    Scan scan = new Scan(get);
    RegionScanner scanner = null;
    try {
      scanner = getScanner(scan);
      scanner.next(results);
    } finally {
      if (scanner != null)
        scanner.close();
    }

從上面可以看得出來,為什麼我要把get和Scanner一起講了吧,因為get也是一種特殊的Scan的方法,它只尋找一個row的資料。

 Scan

下面開始講Scan,在《HTable探祕》裡面有個細節不知道注意到沒,在查詢之前,它要先OpenScanner獲得要給ScannerId,這個OpenScanner其實也呼叫了scan方法,但是它過去不是幹活的,而是先過去註冊一個Scanner,訂個租約,然後再把這個返回的ScannerId再次傳送一個scan請求,這次才開始呼叫開始掃描。

掃描的時候,走的是這一段

      if (!done) {
              long maxResultSize = scanner.getMaxResultSize();
              if (maxResultSize <= 0) {
                maxResultSize = maxScannerResultSize;
              }
              List<Cell> values = new ArrayList<Cell>();
              MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
              region.startRegionOperation(Operation.SCAN);
              try {
                int i = 0;
                synchronized(scanner) {
                  for (; i < rows && currentScanResultSize < maxResultSize; i++) {
                    // 它用的是這個nextRaw方法
                    boolean moreRows = scanner.nextRaw(values);
                    if (!values.isEmpty()) {
                       results.add(Result.create(values));
                    }
                    if (!moreRows) {
                      break;
                    }
                    values.clear();
                  }
                }
              } finally {
                region.closeRegionOperation();
              }
            }

            // 沒找到設定moreResults為false,找到了把結果新增到builder裡面去
            if (scanner.isFilterDone() && results.isEmpty()) {
              moreResults = false;
              results = null;
            } else {
              addResults(builder, results, controller);
            }
          } 
        }

這裡面有controller和result,這塊的話,我求證了一下RpcServer那塊,如果Rpc傳輸的時候使用了codec來壓縮的話,就用controller返回結果,否則用response返回。 這塊就不管了不是重點,下面我們看一下RegionScanner。

RegionScanner詳解與程式碼拆分

我們衝過去看RegionScannerImpl吧,它在HRegion裡面,我們直接去看nextRaw方法就可以了,get方法的那個next方法也是呼叫了nextRaw方法。

if (outResults.isEmpty()) {
     // 把結果存到outResults當中
        returnResult = nextInternal(outResults, limit);
} else {
        List<Cell> tmpList = new ArrayList<Cell>();
        returnResult = nextInternal(tmpList, limit);
        outResults.addAll(tmpList);
}

去nextInternal方法吧,這方法真大,尼瑪,我要歇菜了,我們進入下一個階段吧。

/**  把查詢出來的結果儲存到results當中  */
    private boolean nextInternal(List<Cell> results, int limit)
    throws IOException {

      while (true) {
        //從storeHeap裡面取出一個來
        KeyValue current = this.storeHeap.peek();

        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getBuffer();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }
        //檢查一下到這個row是否應該停止了
        boolean stopRow = isStopRow(currentRow, offset, length);
        if (joinedContinuationRow == null) {
          // 如果要停止了,就用filter的filterRowCells過濾一下results.
          if (stopRow) {
            if (filter != null && filter.hasFilterRow()) {
              //使用filter過濾掉一些cells
              filter.filterRowCells(results);
            }
            return false;
          }
          // 如果有filter的話,過濾通過
          if (filterRowKey(currentRow, offset, length)) {
            boolean moreRows = nextRow(currentRow, offset, length);
            if (!moreRows) return false;
            results.clear();
            continue;
          }
          //把結果儲存到results當中
          KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
              length);
          // Ok, we are good, let's try to get some results from the main heap.
          // 在populateResult找到了足夠limit數量的
          if (nextKv == KV_LIMIT) {
            if (this.filter != null && filter.hasFilterRow()) {
              throw new IncompatibleFilterException(
                "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
            }
            return true; // We hit the limit.
          }
          
          stopRow = nextKv == null ||
              isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
          // save that the row was empty before filters applied to it.
          final boolean isEmptyRow = results.isEmpty();

          // We have the part of the row necessary for filtering (all of it, usually).
          // First filter with the filterRow(List). 過濾一下剛才找出來的
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRowCells(results);
          }
          //如果result的空的,啥也沒找到,這是。。。悲劇啊
          if (isEmptyRow) {
            boolean moreRows = nextRow(currentRow, offset, length);
            if (!moreRows) return false;
            results.clear();
            // This row was totally filtered out, if this is NOT the last row,
            // we should continue on. Otherwise, nothing else to do.
            if (!stopRow) continue;
            return false;
          }

          // Ok, we are done with storeHeap for this row.
          // Now we may need to fetch additional, non-essential data into row.
          // These values are not needed for filter to work, so we postpone their
          // fetch to (possibly) reduce amount of data loads from disk.
          if (this.joinedHeap != null) {
            KeyValue nextJoinedKv = joinedHeap.peek();
            // If joinedHeap is pointing to some other row, try to seek to a correct one.
            boolean mayHaveData =
              (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
              || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
                true, true)
                && joinedHeap.peek() != null
                && joinedHeap.peek().matchingRow(currentRow, offset, length));
            if (mayHaveData) {
              joinedContinuationRow = current;
              populateFromJoinedHeap(results, limit);
            }
          }
        } else {
          // Populating from the joined heap was stopped by limits, populate some more.
          populateFromJoinedHeap(results, limit);
        }

        // We may have just called populateFromJoinedMap and hit the limits. If that is
        // the case, we need to call it again on the next next() invocation.
        if (joinedContinuationRow != null) {
          return true;
        }

        // Finally, we are done with both joinedHeap and storeHeap.
        // Double check to prevent empty rows from appearing in result. It could be
        // the case when SingleColumnValueExcludeFilter is used.
        if (results.isEmpty()) {
          boolean moreRows = nextRow(currentRow, offset, length);
          if (!moreRows) return false;
          if (!stopRow) continue;
        }

        // We are done. Return the result.
        return !stopRow;
      }
    }

上面那段程式碼真的很長很臭,尼瑪。。被我摺疊起來了,有興趣的看一眼就行,我們先分解開來看吧,這裡面有兩個Heap,一個是storeHeap,一個是JoinedHeap,他們啥時候用呢?看一下它的構造方法吧

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
          scan.getFamilyMap().entrySet()) {
        //遍歷列族和列的對映關係,設定store相關的內容
        Store store = stores.get(entry.getKey());
        KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
          || this.filter.isFamilyEssential(entry.getKey())) {
          scanners.add(scanner);
        } else {
          joinedScanners.add(scanner);
        }
      }
      this.storeHeap = new KeyValueHeap(scanners, comparator);
      if (!joinedScanners.isEmpty()) {
        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
      }
}

如果joinedScanners不空的話,就new一個joinedHeap出來,但是我們看看它的成立條件,有點兒難吧。

1、filter不為null

2、scan設定了doLoadColumnFamiliesOnDemand為true

3、設定了的filter的isFamilyEssential方法返回false,這個估計得自己寫一個,因為我剛才去看了幾個filter的這個方法預設都是用的FilterBase的方法返回false。

好的,到這裡我們有可以把上面那段程式碼砍掉很大一部分了,它的成立條件比較困難,所以很難出現了,那我們就挑重點的storeHeap來講吧,我們先看著這三行。

Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
this.storeHeap = new KeyValueHeap(scanners, comparator);

通過列族獲得相應的Store,然後通過getScanner返回scanner加到KeyValueHeap當中,我們應該去刺探一下HStore的getScanner方法,它new了一個StoreScanner返回,繼續看StoreScanner。

public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns) throws IOException {

    matcher = new ScanQueryMatcher(scan, scanInfo, columns,
        ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
        oldestUnexpiredTS);

    // 返回MemStore、所有StoreFile的Scanner.
    List<KeyValueScanner> scanners = getScannersNoCompaction();

    //explicitColumnQuery:是否過濾列族 lazySeekEnabledGlobally預設是true 如果檔案數量超過1個,isParallelSeekEnabled就是true
    if (explicitColumnQuery && lazySeekEnabledGlobally) {
      for (KeyValueScanner scanner : scanners) {
        scanner.requestSeek(matcher.getStartKey(), false, true);
      }
    } else {
      if (!isParallelSeekEnabled) {
        for (KeyValueScanner scanner : scanners) {
          scanner.seek(matcher.getStartKey());
        }
      } else {
     //一般走這裡,並行查
        parallelSeek(scanners, matcher.getStartKey());
      }
    }

    // 一個堆裡面包括了兩個scanner,MemStore、StoreFile的Scanner
    heap = new KeyValueHeap(scanners, store.getComparator());

    this.store.addChangedReaderObserver(this);
  }

對上面的程式碼,我們再慢慢來分解。 

1、先new了一個ScanQueryMatcher,它是一個用來過濾的類,傳引數的時候,需要傳遞scan和oldestUnexpiredTS進去,oldestUnexpiredTS是個引數,是(當前時間-列族的生存週期),小於這個時間戳的kv視為已經過期了,在它初始化的時候,我們注意一下它的startKey和stopRow,這個startKey要注意,它可不是我們設定的那個startRow,而是用這個startRow來new了一個DeleteFamily型別的KeyValue。

this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow())

2、接著我們看getScannersNoCompaction這個方法,它這裡是返回了兩個Scanner,MemStoreScanner和所有StoreFile的Scanner,在從StoreHeap中peak出來一個kv的時候,是從他們當中交替取出kv來的,StoreHeap從它的名字上面來看像是用了堆排序的演算法,它的peek方法和next方法真有點兒複雜,下一章講MemStore的時候再講吧。

//獲取所有的storefile,預設的實現沒有用上startRow和stopRow
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners();

預設的getStoreFileManager的getFilesForScanOrGet是返回了所有的StoreFile的Scanner,而不是通過startRow和stopRow做過濾,它的註釋裡面給出的解釋,裡面的files預設是按照seq id來排序的,而不是startKey,需要優化的可以從這裡下手。

3、然後就開始先seek一下,而不是全表掃啊!

//過濾列族的情況
scanner.requestSeek(matcher.getStartKey(), false, true);
//一般走這裡,並行查
parallelSeek(scanners, matcher.getStartKey());

scanner.requestSeek不是所有情況都要seek,是查詢Delete的時候,如果查詢的kv的時間戳比檔案的最大時間戳小,就seek到上次未查詢到的kv;它這裡可能會用上DeleteFamily刪除真個family這種情況。

parallelSeek就是開多執行緒去呼叫Scanner的seek方法, MemStore的seek很簡單,因為它的kv集合是一個排序好的集合,HFile的seek比較複雜,下面我用一個圖來表達吧。

在搜尋HFile的時候,key先從一級索引找,通過它定位到細的二級索引,然後再定位到具體的block上面,到了HFileBlock之後,就不是seek了,就是遍歷,遍歷沒什麼好說的,不熟悉的朋友建議先回去看看《StoreFile儲存格式》。注意哦,這個key就是我們的startKey哦,所以大家知道為什麼要在scan的時候要設定StartKey了嗎?

nextInternal的流程

通過前面的分析,我們可以把nextInternal分解與拆分、抹去一些不必要的程式碼,我發現程式碼還是很難懂,所以我畫了一個過程圖出來代替那段程式碼。

特別注意事項:

1、這個圖是被我處理過的簡化之後的圖,還有在放棄該row的kv們 之後並非都要進行是StopRow的判斷,只是為了合併這個流程,我加上去的isStopRow的判斷,但並不影響整個流程。

2、!isStopRow代表返回程式碼的(!isStopRow)的意思, 根據isStopRow的當前值來返回true或者false

3、true意味著退出,並且還有結果,false意味著退出,沒有結果

誒,看到這裡,還是沒看到它是怎麼用ScanQueryMatcher去過濾被刪除的kv們啊,好,接下來我們重點考察這個問題。

ScanQueryMatcher如何過濾已經被刪除的KeyValue

這個過程遮蔽在了filterRow之後通過的把該row的kv接到結果集的這一步裡面去了。它在裡面不停的呼叫KeyValueHeap的next方法,match的呼叫正好在這個方法。我們現在就去追蹤這遺失的部分。

我們直接去看它的match方法就好了,別的不用看了,它處理的情況好多好多,尼瑪,這是要死人的節奏啊。

ScanQueryMatcher是用來處理一行資料之間的版本問題的,在每遇到一個新的row的時候,它都會先被設定matcher.setRow(row, offset, length)。

if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
      matcher.rowOffset, matcher.rowLength)) {
      this.countPerRow = 0;
      matcher.setRow(row, offset, length);
}

上面這段程式碼在StoreScanner的next方法裡面,每當一行結束之後,都會呼叫這個方法。

在講match方法之前,我先講一下rowkey的排序規則,rowkey 正序->family 正序->qualifier 正序->ts 降序->type 降序,那麼對於同一個行、列族、列的資料,時間越近的排在前面,型別越大的排在前面,比如Delete就在Put前面,下面是它的型別表。

//search用
Minimum((byte)0),
Put((byte)4),
Delete((byte)8),
DeleteFamilyVersion((byte)10),
DeleteColumn((byte)12),
DeleteFamily((byte)14),
//search用 
Maximum((byte)255);

為什麼這裡先KeyValue的排序規則呢,這當然有關係了,這關係著掃描的時候,誰先誰後的問題,如果時間戳小的在前面,下面這個過濾就不生效了。

下面我們看看它的match方法的檢查規則。

1、和當前行比較

//和當前的行進行比較,只有相等才繼續,大於當前的行就要跳到下一行,小於說明有問題,停止
int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
        bytes, offset, rowLength);
if (ret <= -1) {
      return MatchCode.DONE;
} else if (ret >= 1) {
      return MatchCode.SEEK_NEXT_ROW;
 }

 2、檢查是否所有列都查過了

//所有的列都掃描過來
if (this.columns.done()) {
      stickyNextRow = true;
      return MatchCode.SEEK_NEXT_ROW;
}

3、檢查列的時間戳是否過期

long timestamp = kv.getTimestamp();
// 檢查列的時間是否過期
if (columns.isDone(timestamp)) {
    return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}

4a、如果是Delete的型別,加到ScanDeleteTraker。

if (kv.isDelete()) {
    this.deletes.add(bytes, offset, qualLength, timestamp, type);
}

4b、如果不是,如果ScanDeleteTraker裡面有Delete,就要讓它經歷ScanDeleteTraker的檢驗了(進宮前先驗一下身)

DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
          timestamp);
      switch (deleteResult) {
        case FAMILY_DELETED:
        case COLUMN_DELETED:
          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
        case VERSION_DELETED:
        case FAMILY_VERSION_DELETED:
          return MatchCode.SKIP;
        case NOT_DELETED:
          break;
        default:
          throw new RuntimeException("UNEXPECTED");
}

 這裡就要說一下剛才那幾個Delete的了:

1)DeleteFamily是最凶狠的,生命週期也長,整個列族全刪,基本上會一直存在

2)DeleteColum只刪掉一個列,出現這個列的都會被幹掉

3)DeleteFamilyVersion沒遇到過

4)Delete最差勁兒了,只能刪除指定時間戳的,時間戳一定要對哦,否則一旦發現不對的,這個Delete就失效了,可以說,生命週期只有一次,下面是原始碼。

public DeleteResult isDeleted(byte [] buffer, int qualifierOffset,
      int qualifierLength, long timestamp) {
    //時間戳小於刪除列族的時間戳,說明這個列族被刪掉是後來的事情
    if (hasFamilyStamp && timestamp <= familyStamp) {
      return DeleteResult.FAMILY_DELETED;
    }
    //檢查時間戳
    if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
        return DeleteResult.FAMILY_VERSION_DELETED;
    }

    if (deleteBuffer != null) {
     
      int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength,
          buffer, qualifierOffset, qualifierLength);

      if (ret == 0) {
        if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
          return DeleteResult.COLUMN_DELETED;
        }
        // 坑爹的Delete它只刪除相同時間戳的,遇到不想的它就pass了
        if (timestamp == deleteTimestamp) {
          return DeleteResult.VERSION_DELETED;
        }

        //時間戳不對,這個Delete失效了
        deleteBuffer = null;
      } else if(ret < 0){
        // row比當前的大,這個Delete也失效了
        deleteBuffer = null;
      } else {
        throw new IllegalStateException(...);
      }
    }

    return DeleteResult.NOT_DELETED;

 上一章說過,Delete new出來之後什麼都不設定,就是DeleteFamily級別的選手,所以在它之後的會全部被幹掉,所以你們懂的,我們也會用DeleteColum來刪除某一列資料,只要時間戳在它之前的kv就會被幹掉,刪某個指定版本的少,因為你得知道具體的時間戳,否則你刪不了。

例子詳解DeleteFamily

假設我們有這些資料

KeyValue [] kvs1 = new KeyValue[] {
        KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
        KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.DeleteFamily, "dont-care"),
        KeyValueTestUtil.create("R1", "cf", "a", now-500, KeyValue.Type.Put, "dont-care"),
        KeyValueTestUtil.create("R1", "cf", "a", now+500, KeyValue.Type.Put, "dont-care"),
        KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
        KeyValueTestUtil.create("R2", "cf", "z", now, KeyValue.Type.Put, "dont-care")
};

Scan的引數是這些。

Scan scanSpec = new Scan(Bytes.toBytes("R1"));
scanSpec.setMaxVersions(3);
scanSpec.setBatch(10);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a","z"), scanners);

然後,我們先將他們排好序,是這樣的。

R1/cf:a/1400602376242(now+500)/Put/vlen=9/mvcc=0, 
R1/cf:a/1400602375742(now)/DeleteFamily/vlen=9/mvcc=0, 
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0, 
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0, 
R1/cf:a/1400602375242(now-500)/Put/vlen=9/mvcc=0, 
R2/cf:z/1400602375742(now)/Put/vlen=9/mvcc=0

所以到最後,黃色的三行會被刪除,只剩下第一行和最後一行,但是最後一行也會被排除掉,因為它已經換行了,不是同一個行的,不在這一輪進行比較,返回MatchCode.DONE。

---->回到前面是match過程

5、檢查時間戳,即設定給Scan的時間戳,這個估計一般很少設定,時間戳過期,就返回下一個MatchCode.SEEK_NEXT_ROW。

6、檢查列是否是Scan裡面設定的需要查詢的列。

7、檢查列的版本,Scan設定的MaxVersion,超過了這個version就要趕緊閃人了哈,返回MatchCode.SEEK_NEXT_COL。

對於match的結果,有幾個常見的:

1、MatchCode.INCLUDE_AND_SEEK_NEXT_COL 包括當前這個,跳到下一列,會引發StoreScanner的reseek方法。

2、MatchCode.SKIP 忽略掉,繼續呼叫next方法。

3、MatchCode.SEEK_NEXT_ROW 不包括當前這個,繼續呼叫next方法。

4、MatchCode.SEEK_NEXT_COL 不包括它,跳過下一列,會引發StoreScanner的reseek方法。

5、MatchCode.DONE rowkey變了,要留到下次進行比較了

講到這裡基本算結束了。

關於測試

呵呵,有興趣測試的童鞋可以開啟下hbase原始碼,找到TestStoreScanner這個類自己除錯看下結果。