1. 程式人生 > >HBase分析之Get、Scan(一)

HBase分析之Get、Scan(一)

Get

Get操作呼叫的是RSRpcServices的get方法,呼叫過程首先找到包含資料的Region,然後從這個Region中獲取需要的資料。

public GetResponse get(final RpcController controller,
    final GetRequest request) throws ServiceException {
  ...
  try {
    ...
    Region region = getRegion(request.getRegion());
    ...
    if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
      ...
      r = region.getClosestRowBefore(row, family);
    } else
{ ... r = region.get(clientGet); ... } ... return builder.build(); ... }

region.getClosestRowBefore實際也是呼叫的region.get方法,所以直接看get方法

public Result get(final Get get) throws IOException {
  checkRow(get.getRow(), "Get");
  // Verify families are all valid
  if (get.hasFamilies()) {
    for
(byte [] family: get.familySet()) { checkFamily(family); } } else { // Adding all families to scanner for (byte[] family: this.htableDescriptor.getFamiliesKeys()) { get.addFamily(family); } } List<Cell> results = get(get, true); boolean stale = this.getRegionInfo().getReplicaId() != 0
; return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); }

這個方法特別有意思,如果這個請求裡包含了family,就檢查下這些family是否存在;如果這個請求裡不包含family,就把這個表裡所有的family都加到這個請求裡。所以不設定family時,就會遍歷所有的family來找到需要的資料。然後看List<Cell> results = get(get, true);,這個方法建立了一個Scan,呼叫了Scan,所以要了解Get,還看Scan!

public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
  ...
  Scan scan = new Scan(get);
  RegionScanner scanner = null;
  try {
    scanner = getScanner(scan);
    scanner.next(results);
  }
  ...
  return results;
}

Scan

又是一段400多行的程式碼,完整程式碼就不貼了,有興趣可以閱讀下原始碼,在RSRpcServices#scan方法中。

整理一下程式碼的邏輯,共處理了Scan過程中3個階段的操作:

  1. 獲得scanner id,簽訂租約Leases
  2. 掃描獲取資料,續約Leases
  3. 再次請求,確認資料掃描已經完成

第一步,獲得scanner id,簽訂租約Leases。

程式碼中依靠scanner id來判斷是否已經完成了第一階段,如果沒有完成,就查詢到資料所在的Region,建立一個scanner,並把這個scanner新增到快取中。(查詢Region的過程就是從map裡取一個Region,這個在Put一節中已經講過了)

if (request.hasScannerId()) {
  ...
} else {
  region = getRegion(request.getRegion());
  ...
  if (!scan.hasFamilies()) {
    // Adding all families to scanner
    for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
      scan.addFamily(family);
    }
  }
  ...
  if (scanner == null) {
    scanner = region.getScanner(scan);
  }
  ...
  scannerId = addScanner(scanner, region);
  scannerName = String.valueOf(scannerId);
  ttl = this.scannerLeaseTimeoutPeriod;
}

scannerId = addScanner(scanner, region);方法中,scan與Region Server簽訂了租約,表示scanner會其快取多長時間。通過hbase.client.scanner.timeout.perio引數設定,預設情況下為60000ms,即一分鐘。租約是一個非同步執行緒,通過執行緒sleep,等待到租約到期,然後清除快取。

public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
    throws LeaseStillHeldException {
  addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
}

然後設定上ttl、scanner id、moreResults(這裡的moreResults為初始值true),就返回了

if (ttl > 0) {
  builder.setTtl(ttl);
}
builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();

第二步,掃描獲取資料,簽訂租約Leases

如果超出了租約時間繼續請求,那就會丟擲錯誤;如果在租約時間內繼續請求了,那麼就到了第二階段,掃描獲取資料。

首先從request中獲得scanner id。

long scannerId = -1;
if (request.hasScannerId()) {
  scannerId = request.getScannerId();
  scannerName = String.valueOf(scannerId);
}

然後從快取中獲取快取的scanner,這裡就是第一步的if中省略的程式碼

if (request.hasScannerId()) {
  rsh = scanners.get(scannerName);
  scanner = rsh.s;
  HRegionInfo hri = scanner.getRegionInfo();
  region = regionServer.getRegion(hri.getRegionName());
}

然後將租約移除,因為租約是非同步的,所以很可能在執行過程中過期了,還是先移除掉。

lease = regionServer.leases.removeLease(scannerName);

然後獲取資料,迴圈呼叫scanner.nextRaw方法獲取資料,獲取到的資料先存入values,轉換完成後放入results中。如果results中的數量達到了上限或者沒有更多資料了,就不再獲取了,break出來。scanner.nextRaw方法比較複雜,這裡涉及到多個Put版本、Delete的資料掃描,需要再開一篇來講。

List<Result> results = new ArrayList<Result>();
...
while (i < rows) {
  ...
  moreRows = scanner.nextRaw(values, scannerContext);

  if (!values.isEmpty()) {
    final boolean partial = scannerContext.partialResultFormed();
    Result r = Result.create(values, null, stale, partial);
    lastBlock = addSize(context, r, lastBlock);
    results.add(r);
    i++;
  }
  if (limitReached || !moreRows) {
    break;
  }
}

經過一輪掃描,如果沒有更多資料、或者達到了一次請求的上限,就把已經取到的資料results放進builder返回。然後續簽一個新的租約,租約時長還是1分鐘。

if (scanner.isFilterDone() && results.isEmpty()) {
  // 如果掃描完成,這就是第三步的事情了
} else {
  // 沒完成,就把資料放進builder返回
  addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
// 續約Leases,addLease方法中會重新重新整理租期
if (scanners.containsKey(scannerName)) {
  if (lease != null) regionServer.leases.addLease(lease);
  ttl = this.scannerLeaseTimeoutPeriod;
}

第三步,再次請求,確認資料掃描已經完成

和第二步乾的事情一樣,只不過這次發現掃描完成了

if (scanner.isFilterDone() && results.isEmpty()) {
  moreResults = false;
  results = null;
}

當moreResults為false時,就會關閉快取的scanner,closeScanner方法中會將租約移除,這樣佔用的資源就釋放完了,就可以返回沒有results的builder,確認掃描完成了。

if (!moreResults || closeScanner) {
  ttl = 0;
  moreResults = false;
  closeScanner(region, scanner, scannerName);
}

好了,Get、Scan的流程看完了,下篇看scanner.nextRaw方法

-END-