HBase原始碼分析之regionserver讀取流程分析
資料的讀取包括Get和Scan2種,通過get的程式碼可以看出實際也是通過轉換為一個Scan來處理的。
//HRegion.java
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
List<Cell> results = new ArrayList<Cell>();
...
Scan scan = new Scan(get);
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
...
return results;
}
接下來,來看getScaner方法的處理。getScanner方法開始做了family檢查後,接下來就是呼叫instantiateRegionScanner
//HRegion.java
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new ReversedRegionScannerImpl(scan, additionalScanners, this);
}
return new RegionScannerImpl(scan, additionalScanners, this);
}
這裡根據scan的型別的不同返回不同的RegionScanner物件。
順序scan為RegionScannerImpl,反向scan為ReversedRegionScannerImpl
先來看RegionScannerImpl的實現
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) {
this.filter = new FilterWrapper(scan.getFilter());
} else {
this.filter = null;
}
/**
* By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
* scanner context that can be used to enforce the batch limit in the event that a
* ScannerContext is not specified during an invocation of next/nextRaw
*/
defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
this.stopRow = null;
} else {
this.stopRow = scan.getStopRow();
}
// If we are doing a get, we want to be [startRow,endRow] normally
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;
// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
IsolationLevel isolationLevel = scan.getIsolationLevel();
synchronized(scannerReadPoints) {
this.readPt = getReadpoint(isolationLevel);
scannerReadPoints.put(this, this.readPt);
}
// Here we separate all scanners into two lists - scanner that provide data required
// by the filter to operate (scanners list) and all others (joinedScanners list).
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
initializeKVHeap(scanners, joinedScanners, region);
}
這裡additionalScanners為null,
scan.doLoadColumnFamiliesOnDemand() scan預設是false。
filter.isFamilyEssential(entry.getKey()) filter都為true
所以這裡每個family的scanner都是在scanners裡面
再來看HStore的getScanner方法
public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols, long readPt) throws IOException {
lock.readLock().lock();
try {
KeyValueScanner scanner = null;
if (this.getCoprocessorHost() != null) {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
}
if (scanner == null) {
scanner = scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
}
return scanner;
} finally {
lock.readLock().unlock();
}
}
這裡與上面類似,也是根據scan的型別返回不同的KeyValueScanner
順序scan為StoreScanner,反向scan為ReversedStoreScanner。
先看StoreScanner,
private StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
this.store.addChangedReaderObserver(this);
}
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
resetKVHeap(scanners, scanInfo.getComparator());
}
/**
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
boolean usePread = isGet || scanUsePread;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
}
HStore.getScanners方法返回所有storefile的scanner(StoreFileScanner)以及memstore的scanner(MemStoreScanner)
StoreScanner.selectScannersFrom通過bloom filter, time range以及ttl對這些scanner進行過濾。
最後將這些的KeyValueScanner封裝到一個KeyValueHeap
從下圖可以看出StoreFileScanner,MemStoreScanner,StoreScanner都是實現的KeyValueScanner介面
下面來梳理下這些scanner之間的關係。
1、發起的scan請求到每個Region會返回一個RegionScanner
2、針對Region的每個family會分別用一個StoreScanner,這些StoreScanner在RegionScanner中都是放在KeyValueHeap中的
3、在每個StoreScanner中,會有1個或多個的StoreFileScanner(每個storefile一個)和一個MemStoreScanner,這些scanner也是放在KeyVaueHeap中的。
而針對一個rowkey的資料會有多個family,每個family下有多個column,每個column還會有多個版本的資料。那是如何把一個rowkey的資料查詢到後再如何合併到一起的呢?下面來看看RegionScanner.nextRaw方法,該方法是來獲取資料的。
先看其定義
boolean nextRaw(List result) throws IOException;
獲取的資料會放在傳入的result中。
nextRaw會呼叫nextInterval方法
// Region.java
// class RegionScannerImpl
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
throws IOException {
// 這裡我們先只關注如何從storeHeap中取數的過程
// storeHeap即為前面提到的存放了StoreScanner的KeyValueHeap
...
// Let's see what we have in the storeHeap.
Cell current = this.storeHeap.peek();
byte[] currentRow = null;
int offset = 0;
short length = 0;
if (current != null) {
currentRow = current.getRowArray();
offset = current.getRowOffset();
length = current.getRowLength();
}
...
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
continue;
}
// Ok, we are good, let's try to get some results from the main heap.
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
...
}
private boolean populateResult(List<Cell> results, KeyValueHeap heap,
ScannerContext scannerContext, byte[] currentRow, int offset, short length)
throws IOException {
Cell nextKv;
boolean moreCellsInRow = false;
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
do {
// We want to maintain any progress that is made towards the limits while scanning across
// different column families. To do this, we toggle the keep progress flag on during calls
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
scannerContext.setKeepProgress(true);
heap.next(results, scannerContext);
scannerContext.setKeepProgress(tmpKeepProgress);
nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
return nextKv != null;
}
這裡是通過KeyValueHeap的peek()和next(List result) 方法來獲取資料的。接著來看看KeyValueHeap的實現
public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner {
// 使用優先順序佇列來存放所有的KeyValueScanner
protected PriorityQueue<KeyValueScanner> heap = null;
/**
* The current sub-scanner, i.e. the one that contains the next key/value
* to return to the client. This scanner is NOT included in {@link #heap}
* (but we frequently add it back to the heap and pull the new winner out).
* We maintain an invariant that the current sub-scanner has already done
* a real seek, and that current.peek() is always a real key/value (or null)
* except for the fake last-key-on-row-column supplied by the multi-column
* Bloom filter optimization, which is OK to propagate to StoreScanner. In
* order to ensure that, always use {@link #pollRealKV()} to update current.
*/
//當前的KeyValueScanner,是從heap中取出來最頂端的scanner。
// 取出來後就不在heap中了,後續會再放回heap中
protected KeyValueScanner current = null;
protected KVScannerComparator comparator;
}
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
if (this.current == null) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
InternalScanner currentAsInternal = (InternalScanner)this.current;
// 往result中寫資料
boolean moreCells = currentAsInternal.next(result, scannerContext);
Cell pee = this.current.peek();
/*
* By definition, any InternalScanner must return false only when it has no
* further rows to be fetched. So, we can close a scanner if it returns
* false. All existing implementations seem to be fine with this. It is much
* more efficient to close scanners which are not needed than keep them in
* the heap. This is also required for certain optimizations.
*/
// current的KeyValueScanner中沒有資料了,就關閉這個KeyValueScanner
if (pee == null || !moreCells) {
this.current.close();
} else {
this.heap.add(this.current);
}
this.current = null;
this.current = pollRealKV();
if (this.current == null) {
moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
return moreCells;
}
// 從heap中取出最頂端的scanner
protected KeyValueScanner pollRealKV() throws IOException {
// 取得最頂端的kvScanner
KeyValueScanner kvScanner = heap.poll();
if (kvScanner == null) {
return null;
}
while (kvScanner != null && !kvScanner.realSeekDone()) {
if (kvScanner.peek() != null) {
try {
// 移動到指定的位置
kvScanner.enforceSeek();
} catch (IOException ioe) {
kvScanner.close();
throw ioe;
}
Cell curKV = kvScanner.peek();
if (curKV != null) {
// 獲取heap目前的最頂端的scanner但是不拿出來。
KeyValueScanner nextEarliestScanner = heap.peek();
if (nextEarliestScanner == null) {
// The heap is empty. Return the only possible scanner.
return kvScanner;
}
// Compare the current scanner to the next scanner. We try to avoid
// putting the current one back into the heap if possible.
// 由於拿出來的scanner有移動了位置,需重新比較
Cell nextKV = nextEarliestScanner.peek();
if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
// We already have the scanner with the earliest KV, so return it.
// 拿出來的scanner的第一個Cell仍然是最小的
return kvScanner;
}
// Otherwise, put the scanner back into the heap and let it compete
// against all other scanners (both those that have done a "real
// seek" and a "lazy seek").
// 拿出來的scanner的第一個Cell不是最小的了,就需要將這個scanner放回到heap中
heap.add(kvScanner);
} else {
// Close the scanner because we did a real seek and found out there
// are no more KVs.
kvScanner.close();
}
} else {
// Close the scanner because it has already run out of KVs even before
// we had to do a real seek on it.
kvScanner.close();
}
// 重新從heap中取出頂端的scanner,然後再重新開始剛才的過程
kvScanner = heap.poll();
}
return kvScanner;
}
再來看看heap中是如何比較來獲取頂端的scanner的。
this.heap = new PriorityQueue(scanners.size(),
this.comparator);
在例項化PriorityQueue時傳入了一個KVScannerComparator,KVScannerComparator裡面包含了一個KVComparator,後者這個實際傳入的是region的compactor
// HRegion.java
this.comparator = fs.getRegionInfo().getComparator();
...
public KVComparator getComparator() {
return isMetaRegion()?
KeyValue.META_COMPARATOR: KeyValue.COMPARATOR;
}
// KeyValue.java
public static final KVComparator COMPARATOR = new KVComparator();
// KeyValueHeap.java
// class KVScannerComparator
public int compare(KeyValueScanner left, KeyValueScanner right) {
int comparison = compare(left.peek(), right.peek());
if (comparison != 0) {
return comparison;
} else {
// Since both the keys are exactly the same, we break the tie in favor
// of the key which came latest.
long leftSequenceID = left.getSequenceID();
long rightSequenceID = right.getSequenceID();
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
}
}
KVScannerComparator的compare方法中先比較KeyValueScanner中的第一個的Cell。如果一樣再比較每個KeyValueScanner的sequenceID,PriorityQueue的頭部是最小的資料。而sequenceId最大的是最新的資料,所以這裡比較時做了個反向處理讓sequenceId大的在頭部
在Cell的比較中是通過CellCompartor.compare來比較的
// CellCompartor.java
public static int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
// row
int c = compareRows(a, b);
if (c != 0) return c;
c = compareWithoutRow(a, b);
if(c != 0) return c;
if (!ignoreSequenceid) {
// Negate following comparisons so later edits show up first
// mvccVersion: later sorts first
return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
} else {
return c;
}
}
;
}
if (rightCell.getFamilyLength() + rightCell.getQualifierLength() == 0
&& rightCell.getTypeByte() == Type.Minimum.getCode()) {
return -1;
}
boolean sameFamilySize = (leftCell.getFamilyLength() == rightCell.getFamilyLength());
if (!sameFamilySize) {
// comparing column family is enough.
return Bytes.compareTo(leftCell.getFamilyArray(), leftCell.getFamilyOffset(),
leftCell.getFamilyLength(), rightCell.getFamilyArray(), rightCell.getFamilyOffset(),
rightCell.getFamilyLength());
}
int diff = compareColumns(leftCell, rightCell);
if (diff != 0) return diff;
diff = compareTimestamps(leftCell, rightCell);
if (diff != 0) return diff;
// Compare types. Let the delete types sort ahead of puts; i.e. types
// of higher numbers sort before those of lesser numbers. Maximum (255)
// appears ahead of everything, and minimum (0) appears after
// everything.
return (0xff & rightCell.getTypeByte()) - (0xff & leftCell.getTypeByte());
}
public static int compareTimestamps(final Cell left, final Cell right) {
long ltimestamp = left.getTimestamp();
long rtimestamp = right.getTimestamp();
return compareTimestamps(ltimestamp, rtimestamp);
}
private static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
// The below older timestamps sorting ahead of newer timestamps looks
// wrong but it is intentional. This way, newer timestamps are first
// found when we iterate over a memstore and newer versions are the
// first we trip over when reading from a store file.
if (ltimestamp < rtimestamp) {
return 1;
} else if (ltimestamp > rtimestamp) {
return -1;
}
return 0;
}
在這裡依次比較rowkey,family,column,timestamp。前面3個比較都是按Bytes.compareTo來比較,字典順序,而最後比較時間戳時,同樣是由於時間戳大的資料是新的資料,所以這裡同樣用了一個反向的處理,保證時間戳大的資料在前面。
而在ReversedStoreScanner中呢,使用的是ReversedKeyValueHeap,KVCompactor為ReversedKVScannerComparator,
//ReversedKVScannerComparator.java
public int compare(KeyValueScanner left, KeyValueScanner right) {
int rowComparison = compareRows(left.peek(), right.peek());
if (rowComparison != 0) {
return -rowComparison;
}
return super.compare(left, right);
}
將原來的比較的結果取反,
ReversedKeyValueHeap.next()方法是將記錄往小的方向移動。
至此,服務端的資料讀取流程就分析完了。