1. 程式人生 > 其它 >快速統計HBase錶行數

快速統計HBase錶行數

技術標籤:高併發hbasejava資料庫

背景

對於其它資料庫而言,統計一張表的行數是最基本的操作,但是對於HBase這種列式儲存的資料庫而言,使用不同方式統計表的行數效率差別巨大,本文將提供兩種java客戶端程式碼統計HBase錶行數的方法

方案一:scan + filter

引入jar包

<dependency>
    <groupId>com.aliyun.hbase</groupId>
    <artifactId>alihbase-client</artifactId>
    <version>2.0.3</version>
</dependency>

java程式碼實現

String tableNameStr = "${table_name}";
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, "${zk_addr}");
Connection connection = null;
try {
    connection = ConnectionFactory.createConnection(config);
    Table table = connection.
getTable(TableName.valueOf(tableNameStr)); try { long start = System.currentTimeMillis(); Scan scan = new Scan(); // 只獲取每行資料的第一個kv,提高count速度 scan.setFilter(new FirstKeyOnlyFilter()); ResultScanner results = table.getScanner(scan); long count = 0; for
(Result r : results) { count += r.size(); } System.out.println("count=="+count); System.out.println("scan 總共耗時:" + (System.currentTimeMillis() - start) + "ms"); } finally { if (table != null) table.close(); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }

方案二:scan + coprocessor

引入jar包

<dependency>
    <groupId>com.aliyun.hbase</groupId>
    <artifactId>alihbase-client</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-endpoint</artifactId>
    <version>2.0.3</version>
</dependency>

java程式碼實現

String tableNameStr = "${table_name}";
Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, "${zk_addr}");
config.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,6000000);
config.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,6000000);
config.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,6000000);
config.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE,"ThreadLocalPool");
config.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE,10);
String aggregateCoprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
CoprocessorDescriptor aggregateCoprocessorDescriptor = CoprocessorDescriptorBuilder
        .newBuilder(aggregateCoprocessorClass)
        .setPriority(10000000)
        .build();
Connection connection = null;
try {
    connection = ConnectionFactory.createConnection(config);
    Admin admin = connection.getAdmin();
    TableName tableName = TableName.valueOf(tableNameStr);

    // 檢視已有的cp列表
    Collection<CoprocessorDescriptor> coprocessorDescriptors = admin.getDescriptor(tableName).getCoprocessorDescriptors();

    // 追加aggregate協處理器
    boolean exists=false;
    for(CoprocessorDescriptor cp:coprocessorDescriptors){
        if(aggregateCoprocessorClass.equals(cp.getClassName())){
            exists=true;
            break;
        }
    }
    if(!exists){
        coprocessorDescriptors.add(aggregateCoprocessorDescriptor);
        TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
                .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build())
                .setCoprocessors(coprocessorDescriptors)
                .build();
        if (!admin.isTableDisabled(tableName)) {
            admin.disableTable(tableName);
        }
        admin.modifyTable(tableDescriptor);
        admin.enableTable(tableName);
    }

    // 執行rowcount統計
    long startTime = System.currentTimeMillis();
    Scan scan = new Scan();
    AggregationClient aggregationClient = new AggregationClient(config);
    long rowCount = aggregationClient.rowCount(tableName, new LongColumnInterpreter(), scan);
    System.out.println("rowCount==="+rowCount);
    System.out.println("總共耗時:"+(System.currentTimeMillis()-startTime)+"ms");

    // 移除aggregate協處理器
    boolean cpExists = false;
    Collection<CoprocessorDescriptor> cpCollect = admin.getDescriptor(tableName).getCoprocessorDescriptors();
    for(Iterator<CoprocessorDescriptor> iterator=cpCollect.iterator();iterator.hasNext();){
        CoprocessorDescriptor cp = iterator.next();
        if(aggregateCoprocessorClass.equals(cp.getClassName())){
            iterator.remove();
            cpExists = true;
        }
    }
    if(cpExists){
        TableDescriptor newTableDesc = TableDescriptorBuilder.newBuilder(tableName)
                .setCoprocessors(cpCollect)
                .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build())
                .build();
        if (!admin.isTableDisabled(tableName)) {
            admin.disableTable(tableName);
        }
        admin.modifyTable(newTableDesc);
        admin.enableTable(tableName);
    }
} catch (Exception e) {
    e.printStackTrace();
} catch (Throwable throwable) {
    throwable.printStackTrace();
} finally {
    if (connection != null) {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

執行結果如下:
在這裡插入圖片描述
一億條資料查詢,只花費了2分鐘,這種協處理器執行方式是將rowcount操作分散到table對應的每個region,由每個region對應的region server進行分別統計計算,最後彙總。region server使用了InternalScanner,這是距離實際資料儲存最近的Scanner介面,是目前最高效的RowCount統計方式