1. 程式人生 > >Hbase批量匯入資料,支援多執行緒同時操作

Hbase批量匯入資料,支援多執行緒同時操作


/**
 * HBase操作工具類:快取模式多執行緒批量提交作業到hbase
 *
 * @Auther: ning.zhang
 * @Email: [email protected]
 * @CreateDate: 2018/7/30
 */
public class HBaseUtils {

    ThreadLocal<List<Put>> threadLocal = new ThreadLocal<List<Put>>();
    HBaseAdmin admin = null;
    Connection conn = null;

    private HBaseUtils() {
        Configuration configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum", ServerConfigs.ZK);
        configuration.set("hbase.rootdir", "hdfs://hadoop-23:8020/hbase");

        try {
            conn = ConnectionFactory.createConnection(configuration);
            admin = new HBaseAdmin(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static HBaseUtils instance = null;

    public static synchronized HBaseUtils getInstance() {
        if (null == instance) {
            instance = new HBaseUtils();
        }
        return instance;
    }


    /**
     * 根據表名獲取到HTable例項
     */
    public HTable getTable(String tableName) {

        HTable table = null;
        try {
//            table = new HTable(configuration, tableName);
            final TableName tname = TableName.valueOf(tableName);
            table = (HTable) conn.getTable(tname);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return table;
    }

    /**
     * 批量新增記錄到HBase表,同一執行緒要保證對相同表進行新增操作!
     *
     * @param tableName HBase表名
     * @param rowkey    HBase表的rowkey
     * @param cf        HBase表的columnfamily
     * @param column    HBase表的列key
     * @param value     寫入HBase表的值value
     */
    public void bulkput(String tableName, String rowkey, String cf, String column, String value) {
        try {
            List<Put> list = threadLocal.get();
            if (list == null) {
                list = new ArrayList<Put>();
            }
            Put put = new Put(Bytes.toBytes(rowkey));
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
            list.add(put);
            if (list.size() >= ServerConfigs.CACHE_LIST_SIZE) {
                HTable table = getTable(tableName);
                table.put(list);
                list.clear();
            } else {
                threadLocal.set(list);
            }
//            table.flushCommits();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 新增單條記錄到HBase表
     *
     * @param tableName HBase表名
     * @param rowkey    HBase表的rowkey
     * @param cf        HBase表的columnfamily
     * @param column    HBase表的列key
     * @param value     寫入HBase表的值value
     */
    public void put(String tableName, String rowkey, String cf, String column, String value) {

        HTable table = getTable(tableName);
        Put put = new Put(Bytes.toBytes(rowkey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * test
     *
     * @param args
     */
    public static void main(String[] args) {

        //HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
        //System.out.println(table.getName().getNameAsString());
        long start = System.currentTimeMillis();

        String tableName = "t1";
//        String rowkey = "1";
        for (int i = 0; i < 100000; i++) {
            HBaseUtils.getInstance().bulkput(tableName, i + "", "f1", "id", String.valueOf(100321 + i));
        }
        new Thread(new Runnable() {
            public void run() {
                for (int i = 100000; i < 200000; i++) {
                    HBaseUtils.getInstance().bulkput("t1", i + "", "f1", "id", String.valueOf(100321 + i));
                }
            }
        }).start();

        System.out.println(System.currentTimeMillis() - start);
    }

}

其中ServerConfigs配置如下:

/**
 * 配置檔案
 * @Auther: ning.zhang
 * @Email:  [email protected]
 * @CreateDate: 2018/7/30
 */
public class ServerConfigs {

    public static final String ZK = "172.17.245.23:2181,172.17.245.25:2181,172.17.245.26:2181";

    public static final String TOPIC = "ad_upload_event";

    public static final String BROKER_LIST = "172.17.245.23:9092";

    public static final String GROUP_ID = "test_group";

    public static final int CACHE_LIST_SIZE = 100; //批量提交資料條數

}

測試結果:20w條資料,雙執行緒插入Hbase 耗時5.32s