HBase分析之Get、Scan(一)
Get
Get操作呼叫的是RSRpcServices的get方法,呼叫過程首先找到包含資料的Region,然後從這個Region中獲取需要的資料。
public GetResponse get(final RpcController controller,
final GetRequest request) throws ServiceException {
...
try {
...
Region region = getRegion(request.getRegion());
...
if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
...
r = region.getClosestRowBefore(row, family);
} else {
...
r = region.get(clientGet);
...
}
...
return builder.build();
...
}
region.getClosestRowBefore實際也是呼叫的region.get方法,所以直接看get方法
public Result get(final Get get) throws IOException {
checkRow(get.getRow(), "Get");
// Verify families are all valid
if (get.hasFamilies()) {
for (byte [] family: get.familySet()) {
checkFamily(family);
}
} else { // Adding all families to scanner
for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
get.addFamily(family);
}
}
List<Cell> results = get(get, true);
boolean stale = this.getRegionInfo().getReplicaId() != 0 ;
return Result.create(results, get.isCheckExistenceOnly() ?
!results.isEmpty() : null, stale);
}
這個方法特別有意思,如果這個請求裡包含了family,就檢查下這些family是否存在;如果這個請求裡不包含family,就把這個表裡所有的family都加到這個請求裡。所以不設定family時,就會遍歷所有的family來找到需要的資料。然後看List<Cell> results = get(get, true);
,這個方法建立了一個Scan,呼叫了Scan,所以要了解Get,還看Scan!
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
...
Scan scan = new Scan(get);
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
}
...
return results;
}
Scan
又是一段400多行的程式碼,完整程式碼就不貼了,有興趣可以閱讀下原始碼,在RSRpcServices#scan方法中。
整理一下程式碼的邏輯,共處理了Scan過程中3個階段的操作:
- 獲得scanner id,簽訂租約Leases
- 掃描獲取資料,續約Leases
- 再次請求,確認資料掃描已經完成
第一步,獲得scanner id,簽訂租約Leases。
程式碼中依靠scanner id來判斷是否已經完成了第一階段,如果沒有完成,就查詢到資料所在的Region,建立一個scanner,並把這個scanner新增到快取中。(查詢Region的過程就是從map裡取一個Region,這個在Put一節中已經講過了)
if (request.hasScannerId()) {
...
} else {
region = getRegion(request.getRegion());
...
if (!scan.hasFamilies()) {
// Adding all families to scanner
for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
scan.addFamily(family);
}
}
...
if (scanner == null) {
scanner = region.getScanner(scan);
}
...
scannerId = addScanner(scanner, region);
scannerName = String.valueOf(scannerId);
ttl = this.scannerLeaseTimeoutPeriod;
}
在scannerId = addScanner(scanner, region);
方法中,scan與Region Server簽訂了租約,表示scanner會其快取多長時間。通過hbase.client.scanner.timeout.perio
引數設定,預設情況下為60000ms,即一分鐘。租約是一個非同步執行緒,通過執行緒sleep,等待到租約到期,然後清除快取。
public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException {
addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
}
然後設定上ttl、scanner id、moreResults(這裡的moreResults為初始值true),就返回了
if (ttl > 0) {
builder.setTtl(ttl);
}
builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();
第二步,掃描獲取資料,簽訂租約Leases
如果超出了租約時間繼續請求,那就會丟擲錯誤;如果在租約時間內繼續請求了,那麼就到了第二階段,掃描獲取資料。
首先從request中獲得scanner id。
long scannerId = -1;
if (request.hasScannerId()) {
scannerId = request.getScannerId();
scannerName = String.valueOf(scannerId);
}
然後從快取中獲取快取的scanner,這裡就是第一步的if中省略的程式碼
if (request.hasScannerId()) {
rsh = scanners.get(scannerName);
scanner = rsh.s;
HRegionInfo hri = scanner.getRegionInfo();
region = regionServer.getRegion(hri.getRegionName());
}
然後將租約移除,因為租約是非同步的,所以很可能在執行過程中過期了,還是先移除掉。
lease = regionServer.leases.removeLease(scannerName);
然後獲取資料,迴圈呼叫scanner.nextRaw方法獲取資料,獲取到的資料先存入values,轉換完成後放入results中。如果results中的數量達到了上限或者沒有更多資料了,就不再獲取了,break出來。scanner.nextRaw方法比較複雜,這裡涉及到多個Put版本、Delete的資料掃描,需要再開一篇來講。
List<Result> results = new ArrayList<Result>();
...
while (i < rows) {
...
moreRows = scanner.nextRaw(values, scannerContext);
if (!values.isEmpty()) {
final boolean partial = scannerContext.partialResultFormed();
Result r = Result.create(values, null, stale, partial);
lastBlock = addSize(context, r, lastBlock);
results.add(r);
i++;
}
if (limitReached || !moreRows) {
break;
}
}
經過一輪掃描,如果沒有更多資料、或者達到了一次請求的上限,就把已經取到的資料results放進builder返回。然後續簽一個新的租約,租約時長還是1分鐘。
if (scanner.isFilterDone() && results.isEmpty()) {
// 如果掃描完成,這就是第三步的事情了
} else {
// 沒完成,就把資料放進builder返回
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
// 續約Leases,addLease方法中會重新重新整理租期
if (scanners.containsKey(scannerName)) {
if (lease != null) regionServer.leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod;
}
第三步,再次請求,確認資料掃描已經完成
和第二步乾的事情一樣,只不過這次發現掃描完成了
if (scanner.isFilterDone() && results.isEmpty()) {
moreResults = false;
results = null;
}
當moreResults為false時,就會關閉快取的scanner,closeScanner方法中會將租約移除,這樣佔用的資源就釋放完了,就可以返回沒有results的builder,確認掃描完成了。
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
closeScanner(region, scanner, scannerName);
}
好了,Get、Scan的流程看完了,下篇看scanner.nextRaw方法
-END-