1. 程式人生 > 其它 >Hbase 原始碼分析之 Get 流程及rpc原理

Hbase 原始碼分析之 Get 流程及rpc原理

分析版本為hbase 0.94

附上趨勢團隊畫的圖:

rpc角色表:

HBase通訊通道

HBase的通訊介面

客戶端

服務端

HBase Client

Master Server

HMasterInterface

HBase Client

Region Server

HRegionInterface

Region Server

Master Server

HMasterRegionInterface

客戶端發起請求:

htable.get(Get)

public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
}

呼叫get方法後,客戶端進入睡眠,睡眠時間為pause * HConstants.RETRY_BACKOFF[ntries];  

pause= HBASE_CLIENT_PAUSE(1秒)

RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };

有結果則中斷執行返回rpc結果,否則重試十次(預設DEFAULT_HBASE_CLIENT_RETRIES_NUMBER=10)

第一次進行get時,客戶端需要先進行rpc通訊,獲得root表 meta表資訊,確定row對應的location

通過ServerCallable維持HRegionInterface 的server例項 ,server為HConnectionManager的getHRegionConnection方法獲取的HBaseRPC的VersionedProtocol代理,其實是

WritableRpcEngine例項,call方法則會呼叫成員HbaseClient的call方法與regionserver進行遠端通訊

伺服器端:

當regionserver 收到來自客戶端的Get請求時,呼叫介面 

public Result get(byte[] regionName, Get get) 
{ 
... 
HRegion region = getRegion(regionName); 
return region.get(get, getLockFromId(get.getLockId())); 
... 
} 

在HRegion中

Scan scan = new Scan(get); 會先根據設定的columnFamily存放familyMap對  ----  columnFamily:null

public Get addFamily(byte [] family) {
familyMap.remove(family);
familyMap.put(family, null);
return this;
}

如果查詢的family不在htableDescriptor中,返回錯誤

scanner = getScanner(scan);
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}

additionalScanners為null 所以在RegionScannerImpl的構造中只會使用StoreScanner

return instantiateRegionScanner(scan, additionalScanners); return new RegionScannerImpl(scan, additionalScanners);

RegionScannerImpl 是 HRegion中的子類

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}

按照familyMap的數量存放對應數量的 StoreScanner

Hregion initialize時會對應每個columnFamily存放一個stores Future<Store> future = completionService.take(); Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store);

scanners 新增從Store中獲取的scanner

store.getScanners(cacheBlocks, isGet, isCompaction, matcher)

Store 類:

memStoreScanners = this.memstore.getScanners();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;

memStoreScanners  為Collections.<KeyValueScanner>singletonList( new MemStoreScanner())

Store中為StoreScanner添加了StoreFileScanner和 memStoreScanner

進行scan時

scanner = getScanner(scan); scanner.next(results);

現在分析RegionScannerImpl中的next方法,此時正式進入獲取資料流程

@Override
public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch);
}

batch預設為-1

   startRegionOperation();

  outResults.addAll(results);

startRegionOperation 會為操作加讀鎖,lock.readLock().lock();

然後遍歷storeHeap,找到對應Row

do {     this.storeHeap.next(results, limit - results.size()); } while (Bytes.equals(currentRow, nextRow = peekRow()));

 this.storeHeap 會不斷poll出儲存的scanner

因RegionScannerImpl 中 memStoreScanners後新增,所以會先從memStoreScanners中查詢,如果沒有則從StoreFileScanner中查詢

RegionScannerImpl 的 storeHeap為KeyValueHeap,會強制轉型scanner為 InternalScanner

InternalScanner currentAsInternal = (InternalScanner)this.current;  

總結下目前流程get request -> regionServer -> region -> storeHeap -> scanner -> find row

但上述流程沒有解釋reguest是怎麼找到regionServer去處理請求的,下邊我們在分析下

伺服器端服務在HMaster和HRegionServer啟動時,中都會生成一個全域性的RpcServer  

hmaster的rpc server:

hmaster會使用org.apache.hadoop.hbase.executor.ExecutorService啟動多種執行緒服務 (This is a generic executor service. This component abstracts a threadpool, a queue to which EventHandler.EventTypes can be submitted, and a Runnable that handles the object that is added to the queue. ):

MASTER_OPEN_REGION   (預設5)

MASTER_CLOSE_REGION   (預設5)

MASTER_SERVER_OPERATIONS  (預設3)

MASTER_META_SERVER_OPERATIONS  (預設5)

MASTER_TABLE_OPERATIONS (單執行緒)

logCleaner (單執行緒)

infoServer  (master-status 等資訊展示)

rpcServer (我們需要用的rpc服務)

RpcServer是個介面,實現類為HBaseServer,啟動時會開啟responder listener handlers幾種類去響應請求,如設定了priorityHandlers的數目,會另外啟動priorityHandlers,listener監聽埠,提供請求給handlers,handlers則呼叫RpcEngine,反射出需要的方法並執行,通過responder寫結果回去(this.responder.doRespond)。

HMaster的 handlers的個數由hbase.master.handler.count

HRegionServer的 handlers的個數由 hbase.regionserver.handler.count 指定

HRegionServer的啟動和HMaster類似,它啟動以下執行緒:

this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));

hlogRoller(daemon)

cacheFlusher(daemon)

compactionChecker(daemon)

Leases(它不是執行緒,會啟動後臺執行緒)

splitLogWorker

rpcServer

HBaseClient 和 HMaster關係由HMasterInterface描述:

Clients interact with the HMasterInterface to gain access to meta-level  HBase functionality, like finding an HRegionServer and creating/destroying  tables.

HBaseClient 和 HRegionServer關係由HRegionInterface描述:

Clients interact with HRegionServers using a handle to the HRegionInterface

參考資料:

http://zjushch.iteye.com/blog/1173304

http://www.spnguru.com/2010/07/hbase-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-rpc%E6%9C%BA%E5%88%B6-%E5%9F%BA%E7%A1%80/