快速統計HBase錶行數
阿新 • • 發佈:2021-02-08
背景
對於其它資料庫而言,統計一張表的行數是最基本的操作,但是對於HBase這種列式儲存的資料庫而言,使用不同方式統計表的行數效率差別巨大,本文將提供兩種java客戶端程式碼統計HBase錶行數的方法
方案一:scan + filter
引入jar包
<dependency> <groupId>com.aliyun.hbase</groupId> <artifactId>alihbase-client</artifactId> <version>2.0.3</version> </dependency>
java程式碼實現
String tableNameStr = "${table_name}";
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, "${zk_addr}");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(config);
Table table = connection. getTable(TableName.valueOf(tableNameStr));
try {
long start = System.currentTimeMillis();
Scan scan = new Scan();
// 只獲取每行資料的第一個kv,提高count速度
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner results = table.getScanner(scan);
long count = 0;
for (Result r : results) {
count += r.size();
}
System.out.println("count=="+count);
System.out.println("scan 總共耗時:" + (System.currentTimeMillis() - start) + "ms");
} finally {
if (table != null) table.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
方案二:scan + coprocessor
引入jar包
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-endpoint</artifactId>
<version>2.0.3</version>
</dependency>
java程式碼實現
String tableNameStr = "${table_name}";
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, "${zk_addr}");
config.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,6000000);
config.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,6000000);
config.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,6000000);
config.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE,"ThreadLocalPool");
config.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE,10);
String aggregateCoprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
CoprocessorDescriptor aggregateCoprocessorDescriptor = CoprocessorDescriptorBuilder
.newBuilder(aggregateCoprocessorClass)
.setPriority(10000000)
.build();
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf(tableNameStr);
// 檢視已有的cp列表
Collection<CoprocessorDescriptor> coprocessorDescriptors = admin.getDescriptor(tableName).getCoprocessorDescriptors();
// 追加aggregate協處理器
boolean exists=false;
for(CoprocessorDescriptor cp:coprocessorDescriptors){
if(aggregateCoprocessorClass.equals(cp.getClassName())){
exists=true;
break;
}
}
if(!exists){
coprocessorDescriptors.add(aggregateCoprocessorDescriptor);
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build())
.setCoprocessors(coprocessorDescriptors)
.build();
if (!admin.isTableDisabled(tableName)) {
admin.disableTable(tableName);
}
admin.modifyTable(tableDescriptor);
admin.enableTable(tableName);
}
// 執行rowcount統計
long startTime = System.currentTimeMillis();
Scan scan = new Scan();
AggregationClient aggregationClient = new AggregationClient(config);
long rowCount = aggregationClient.rowCount(tableName, new LongColumnInterpreter(), scan);
System.out.println("rowCount==="+rowCount);
System.out.println("總共耗時:"+(System.currentTimeMillis()-startTime)+"ms");
// 移除aggregate協處理器
boolean cpExists = false;
Collection<CoprocessorDescriptor> cpCollect = admin.getDescriptor(tableName).getCoprocessorDescriptors();
for(Iterator<CoprocessorDescriptor> iterator=cpCollect.iterator();iterator.hasNext();){
CoprocessorDescriptor cp = iterator.next();
if(aggregateCoprocessorClass.equals(cp.getClassName())){
iterator.remove();
cpExists = true;
}
}
if(cpExists){
TableDescriptor newTableDesc = TableDescriptorBuilder.newBuilder(tableName)
.setCoprocessors(cpCollect)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build())
.build();
if (!admin.isTableDisabled(tableName)) {
admin.disableTable(tableName);
}
admin.modifyTable(newTableDesc);
admin.enableTable(tableName);
}
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable throwable) {
throwable.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
執行結果如下:
一億條資料查詢,只花費了2分鐘,這種協處理器執行方式是將rowcount操作分散到table對應的每個region,由每個region對應的region server進行分別統計計算,最後彙總。region server使用了InternalScanner,這是距離實際資料儲存最近的Scanner介面,是目前最高效的RowCount統計方式