hbase客戶端原始碼分析--put流程
阿新 • • 發佈:2019-02-20
—client 的呼叫流程
table.put(put); 操作
HTable table = new HTable(conf, Bytes.toBytes(tableName));
呼叫流程如上面的delete流程一樣
首先建立一個muti的操作物件
new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
然後呼叫
BufferedMutatorImpl.mutate(Mutation m)
在建立 BufferedMutatorImpl 物件的時候,在低層有非同步建立
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
實時非同步批量的操作提交
AsyncProcess.submit
判斷提交的那一個row物件是在那個region當中
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
該過程和之前的delete的過程中查詢row的過程一樣,先到zk中拿到meta然後在meta 的regionserver中掃描對應的行在那個regionserver當中
submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool)
如下程式碼又建立一個非同步的提交物件
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName, List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback, Object[] results, boolean needResults, List<Exception> locationErrors, List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( tableName, retainedActions, nonceGroup, pool, callback, results, needResults); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { int originalIndex = locationErrorRows.get(i); Row row = retainedActions.get(originalIndex).getAction(); ars.manageError(originalIndex, row, Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); } } ars.sendMultiAction(actionsByServer, 1, null, false); return ars; }
然後根據傳送到不同的regionser進行起動多個執行緒進行傳送,
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction<Row> multiAction = e.getValue();
incTaskCounters(multiAction.getRegions(), server);
Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
numAttempt);
對每個region建立對應的執行緒
Runnable runnable =
new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
callsInProgress);
進行非同步傳送過去。線上程中建立
new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi)
然後在callable物件中建立proto物件,組裝資料,傳送過去
for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action<R>> actions = e.getValue();
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
if (this.cellBlock) {
// Presize. Presume at least a KV per Action. There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
regionActionBuilder, actionBuilder, mutationBuilder);
} else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
regionActionBuilder, actionBuilder, mutationBuilder);
}
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
// Controller optionally carries cell data over the proxy/service boundary and also
// optionally ferries cell response data back out again.
if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
ClientProtos.MultiResponse responseProto;
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
try {
responseProto = getStub().multi(controller, requestProto);
} catch (ServiceException e) {
throw ProtobufUtil.getRemoteException(e);
}
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
}