HBase的Scan實現原始碼分析
阿新 • • 發佈:2019-02-02
public Cell peek() {
if (this.current == null) {
return null;
}
return this.current.peek();
}
講完了上述三個重要的資料結構,迴歸到hbase系統,HBase的表資料分為多個層次,分別是HRegion->HStore->[HFile,HFile,....,MemStoreFile]。一個表首先會水平分片形成多個HRegion,一個HRegion內不同的Column Family對應著不同的HStore,一個HStore下包含多個HFile和一個Memstore,資料寫入時先寫入MemstoreFile,MemStoreFile會不斷重新整理形成新的HFile。
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { //遍歷該region下的各store Store store = stores.get(entry.getKey()); KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); //new一個該store的StoreScanner if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); //將不同的StoreScanner歸入不同的scanner list中 } else { joinedScanners.add(scanner); } } initializeKVHeap(scanners, joinedScanners, region); //然後用這些StoreScanner初始化一個KeyValueHeap
store.getScanner就是針對每個store建立一個StoreScanner,最後一步的initializeKVHeap則將這些StoreScanner構建成一個堆儲存在RegionScanner的成員變數storeHeap中,用於遍歷取該region下所有store中的資料,而storeScanner同樣是一個由FileScanner組成的heap,其在store.getScanner中完成對每個store的storeScanner構造,建構函式中的關鍵兩步如下所示:
// Pass columns to try to filter out unnecessary StoreFiles. List<KeyValueScanner> scanners = getScannersNoCompaction(); //返回該Store下對應的MemStore/StoreFile Scanner // Seek all scanners to the start of the Row (or if the exact matching row // key does not exist, then to the start of the next matching Row). // Always check bloom filter to optimize the top row seek for delete // family marker. seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, isParallelSeekEnabled); //對這些StoreFileScanner和MemStoreScanner分別進行seek //seekKey是matcher.getStartKey()
getScannersNoCompaction()返回這個Store下包含的HFileScanner和memstoreScanner,儲存在scanners中。 seekScanners就是在memstore或者hfile中定位到指定的keyValue(通常是scan時startKey&endKey指定的keyValue),如果指定的keyValue不存在,則seek到指定keyValue的下一個元素。實際實現時這裡採用了lazy seek優化,優化的目的是為了不需要對所有的HFile進行seek尋找目標keyValue,而只需對keyValue真實存在的HFile進行seek。 典型的客戶端發起scan請求的程式碼如下所示:
Scan scan = new Scan();
scan.setStartRow(........);
scan.setStopRow(........);
Result result;
try (ResultScanner rs = table.getScanner(scan)) {
while ((result=rs.next()) != null) {
//your code here
}
}
進入上述程式碼的getScanner方法,會發現其new一個ClientScanner物件,該物件包含了使用者傳入的Scan物件以及快取、連線、重試次數、表名、region資訊等引數。ClientScanner建構函式的最後呼叫initializeScannerInConstruction(),這個函式實際上包裝了一個如下的呼叫: nextScanner(this.caching, false); 其中,this.caching是一個int型變數,表示一次scan的rpc請求返回的結果數量,返回結果儲存在客戶端的cache中。 進入nextScanner函式,其首先檢查是否已scan至表尾,如果已scan至表尾則關閉scan並返回false給客戶端,否則更新localStartKey作為本次scan的開始位置,並將輸入引數this.caching賦值給nbRows以表示本次scan返回的資料量,以上述兩個變數作為引數呼叫getScannerCallable方法,該方法會返回一個ScannerCallableWithReplicas型別的物件callable,接著呼叫callable物件中的call方法向服務端發起一次rpc呼叫,呼叫路徑如下:
ScannerCallableWithReplicas.call->ScannerCallable.call
服務呼叫是通過構造一個ScanRequest型別的物件request,並將其發往服務端來實現的,核心程式碼如下:
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
controller = controllerFactory.newController();
response = getStub().scan(controller, request);
這裡需要注意的是構造request時包裝了一個long型的變數nextCallSeq。
至此客戶端發起scan請求的流程結束,下面介紹服務端是如何處理這些scan請求,並與前面的知識建立起聯絡。
客戶端呼叫的getStub().scan向服務端發起了一次scan的rpc請求,服務端scan的實現在RSRpcServices中,首先其申請一個租約Lease.lease,用於客戶端和服務端之間的心跳連線,然後對照客戶端和服務端的nextCallSeq欄位(目的是保證客戶端順序得到所有資料而不漏),程式碼如下:
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
throw new OutOfOrderScannerNextException(
"Expected nextCallSeq: " + rsh.getNextCallSeq()
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
"; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.incNextCallSeq(); //保證客戶端順序得到所有資料不漏,Client和RS都維護一個nextCallSeq欄位
}
}
RSRpcServices中包含一個ConcurrentHashMap<String, RegionScannerHolder>型別的變數scanners,String是region的名字,也就是說每一個region都租用一個RegionScanner。回到scan函式,引數request中可以獲得scannerName,憑藉scannerName從scanners中獲取對應的RegionScanner物件scanner。 接著從request中提取本次scan的資訊,如是否是small scan、reverse scan等等,根據這些資訊構造ScannerContext型別的物件scannerContext,以此為引數呼叫RegionScanner的nextRaw方法,這樣就與前面介紹的RegionScanner建立起聯絡,返回結果存放在List<KeyValue>型別的變數values中:
moreRows = scanner.nextRaw(values, scannerContext)
newRaw方法的呼叫路徑如下: nextRaw() -> RegionScanner.nextInternal() -> populateResult() 其中,在RegionScanner.nextInternal()會進行一些對stopRow/filterRow的檢查,populateResult函式開始迭代取資料,呼叫語句如下: populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); this.storeHeap是前面我們說的RegionScanner中維護的一個由StoreScanner組成的堆。populateResult的主要邏輯簡化如下:
do{
heap.next(results, scannerContext);
nextKv= heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
} while (moreCellsInRow)
populateResult中真正返回資料呼叫的是heap的next方法,這裡還記得前面說的heap是由storescanner組成的堆,在next中用current變數記住當前正在處理的storescanner,然後呼叫next()函式返回了該storescanner中可能存在的結果。 到StoreScanner的next方法,StoreScanner維護著一個由StoreFileScanner/memstoreScanner構造的堆,next實際是從它的scanner堆中peek出一個StoreFileScanner或者是MemStoreScanner,然後呼叫next()取得資料,再將該scanner添加回佇列中。 在StoreScanner的next方法有下面一段程式碼需要注意:
LOOP: while((cell = this.heap.peek()) != null) {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
。。。。。。。。
}
這段程式碼維護了服務端和客戶端的一個心跳,是為了防止服務端scan到較大的資料時長時間沒有給客戶端返回響應,而造成客戶端誤以為服務端掛掉而產生超時錯誤。其中cellsPerHeartbeatCheck定義了心跳傳送的週期,該值由"hbase.cells.scanned.per.heartbeat.check"配置,預設是10000,表示的是每scan出10000個cell,則服務端向客戶端傳送一條心跳。參考資料: