Hbase 原始碼分析之 Get 流程及rpc原理
分析版本為hbase 0.94
附上趨勢團隊畫的圖:
rpc角色表:
HBase通訊通道 |
HBase的通訊介面 |
|
---|---|---|
客戶端 |
服務端 |
|
HBase Client |
Master Server |
HMasterInterface |
HBase Client |
Region Server |
HRegionInterface |
Region Server |
Master Server |
HMasterRegionInterface |
客戶端發起請求:
htable.get(Get)
public Result get(final Get get) throws IOException { return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) { public Result call() throws IOException { return server.get(location.getRegionInfo().getRegionName(), get); } }.withRetries(); }
呼叫get方法後,客戶端進入睡眠,睡眠時間為pause * HConstants.RETRY_BACKOFF[ntries];
pause= HBASE_CLIENT_PAUSE(1秒)
RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };
有結果則中斷執行返回rpc結果,否則重試十次(預設DEFAULT_HBASE_CLIENT_RETRIES_NUMBER=10)
第一次進行get時,客戶端需要先進行rpc通訊,獲得root表 meta表資訊,確定row對應的location
通過ServerCallable維持HRegionInterface 的server例項 ,server為HConnectionManager的getHRegionConnection方法獲取的HBaseRPC的VersionedProtocol代理,其實是
WritableRpcEngine例項,call方法則會呼叫成員HbaseClient的call方法與regionserver進行遠端通訊
伺服器端:
當regionserver 收到來自客戶端的Get請求時,呼叫介面
public Result get(byte[] regionName, Get get)
{
...
HRegion region = getRegion(regionName);
return region.get(get, getLockFromId(get.getLockId()));
...
}
在HRegion中
Scan scan = new Scan(get); 會先根據設定的columnFamily存放familyMap對 ---- columnFamily:null
public Get addFamily(byte [] family) {
familyMap.remove(family);
familyMap.put(family, null);
return this;
}
如果查詢的family不在htableDescriptor中,返回錯誤
scanner = getScanner(scan);
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
additionalScanners為null 所以在RegionScannerImpl的構造中只會使用StoreScanner
return instantiateRegionScanner(scan, additionalScanners); return new RegionScannerImpl(scan, additionalScanners);
RegionScannerImpl 是 HRegion中的子類
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}
按照familyMap的數量存放對應數量的 StoreScanner
Hregion initialize時會對應每個columnFamily存放一個stores Future<Store> future = completionService.take(); Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store);
scanners 新增從Store中獲取的scanner
store.getScanners(cacheBlocks, isGet, isCompaction, matcher)
Store 類:
memStoreScanners = this.memstore.getScanners();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;
memStoreScanners 為Collections.<KeyValueScanner>singletonList( new MemStoreScanner())
Store中為StoreScanner添加了StoreFileScanner和 memStoreScanner
進行scan時
scanner = getScanner(scan); scanner.next(results);
現在分析RegionScannerImpl中的next方法,此時正式進入獲取資料流程
@Override
public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch);
}
batch預設為-1
startRegionOperation();
outResults.addAll(results);
startRegionOperation 會為操作加讀鎖,lock.readLock().lock();
然後遍歷storeHeap,找到對應Row
do { this.storeHeap.next(results, limit - results.size()); } while (Bytes.equals(currentRow, nextRow = peekRow()));
this.storeHeap 會不斷poll出儲存的scanner
因RegionScannerImpl 中 memStoreScanners後新增,所以會先從memStoreScanners中查詢,如果沒有則從StoreFileScanner中查詢
RegionScannerImpl 的 storeHeap為KeyValueHeap,會強制轉型scanner為 InternalScanner
InternalScanner currentAsInternal = (InternalScanner)this.current;
總結下目前流程get request -> regionServer -> region -> storeHeap -> scanner -> find row
但上述流程沒有解釋reguest是怎麼找到regionServer去處理請求的,下邊我們在分析下
伺服器端服務在HMaster和HRegionServer啟動時,中都會生成一個全域性的RpcServer
hmaster的rpc server:
hmaster會使用org.apache.hadoop.hbase.executor.ExecutorService啟動多種執行緒服務 (This is a generic executor service. This component abstracts a threadpool, a queue to which EventHandler.EventType
s can be submitted, and a Runnable
that handles the object that is added to the queue. ):
MASTER_OPEN_REGION (預設5)
MASTER_CLOSE_REGION (預設5)
MASTER_SERVER_OPERATIONS (預設3)
MASTER_META_SERVER_OPERATIONS (預設5)
MASTER_TABLE_OPERATIONS (單執行緒)
logCleaner (單執行緒)
infoServer (master-status 等資訊展示)
rpcServer (我們需要用的rpc服務)
RpcServer是個介面,實現類為HBaseServer,啟動時會開啟responder listener handlers幾種類去響應請求,如設定了priorityHandlers的數目,會另外啟動priorityHandlers,listener監聽埠,提供請求給handlers,handlers則呼叫RpcEngine,反射出需要的方法並執行,通過responder寫結果回去(this.responder.doRespond)。
HMaster的 handlers的個數由hbase.master.handler.count
HRegionServer的 handlers的個數由 hbase.regionserver.handler.count 指定
HRegionServer的啟動和HMaster類似,它啟動以下執行緒:
this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
hlogRoller(daemon)
cacheFlusher(daemon)
compactionChecker(daemon)
Leases(它不是執行緒,會啟動後臺執行緒)
splitLogWorker
rpcServer
HBaseClient 和 HMaster關係由HMasterInterface描述:
Clients interact with the HMasterInterface to gain access to meta-level HBase functionality, like finding an HRegionServer and creating/destroying tables.
HBaseClient 和 HRegionServer關係由HRegionInterface描述:
Clients interact with HRegionServers using a handle to the HRegionInterface
參考資料: