Hbase批量匯入資料,支援多執行緒同時操作
阿新 • • 發佈:2019-01-07
/** * HBase操作工具類:快取模式多執行緒批量提交作業到hbase * * @Auther: ning.zhang * @Email: [email protected] * @CreateDate: 2018/7/30 */ public class HBaseUtils { ThreadLocal<List<Put>> threadLocal = new ThreadLocal<List<Put>>(); HBaseAdmin admin = null; Connection conn = null; private HBaseUtils() { Configuration configuration = new Configuration(); configuration.set("hbase.zookeeper.quorum", ServerConfigs.ZK); configuration.set("hbase.rootdir", "hdfs://hadoop-23:8020/hbase"); try { conn = ConnectionFactory.createConnection(configuration); admin = new HBaseAdmin(configuration); } catch (IOException e) { e.printStackTrace(); } } private static HBaseUtils instance = null; public static synchronized HBaseUtils getInstance() { if (null == instance) { instance = new HBaseUtils(); } return instance; } /** * 根據表名獲取到HTable例項 */ public HTable getTable(String tableName) { HTable table = null; try { // table = new HTable(configuration, tableName); final TableName tname = TableName.valueOf(tableName); table = (HTable) conn.getTable(tname); } catch (IOException e) { e.printStackTrace(); } return table; } /** * 批量新增記錄到HBase表,同一執行緒要保證對相同表進行新增操作! * * @param tableName HBase表名 * @param rowkey HBase表的rowkey * @param cf HBase表的columnfamily * @param column HBase表的列key * @param value 寫入HBase表的值value */ public void bulkput(String tableName, String rowkey, String cf, String column, String value) { try { List<Put> list = threadLocal.get(); if (list == null) { list = new ArrayList<Put>(); } Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value)); list.add(put); if (list.size() >= ServerConfigs.CACHE_LIST_SIZE) { HTable table = getTable(tableName); table.put(list); list.clear(); } else { threadLocal.set(list); } // table.flushCommits(); } catch (IOException e) { e.printStackTrace(); } } /** * 新增單條記錄到HBase表 * * @param tableName HBase表名 * @param rowkey HBase表的rowkey * @param cf HBase表的columnfamily * @param column HBase表的列key * @param value 寫入HBase表的值value */ public void put(String tableName, String rowkey, String cf, String column, String value) { HTable table = getTable(tableName); Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value)); try { table.put(put); } catch (IOException e) { e.printStackTrace(); } } /** * test * * @param args */ public static void main(String[] args) { //HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount"); //System.out.println(table.getName().getNameAsString()); long start = System.currentTimeMillis(); String tableName = "t1"; // String rowkey = "1"; for (int i = 0; i < 100000; i++) { HBaseUtils.getInstance().bulkput(tableName, i + "", "f1", "id", String.valueOf(100321 + i)); } new Thread(new Runnable() { public void run() { for (int i = 100000; i < 200000; i++) { HBaseUtils.getInstance().bulkput("t1", i + "", "f1", "id", String.valueOf(100321 + i)); } } }).start(); System.out.println(System.currentTimeMillis() - start); } }
其中ServerConfigs配置如下:
/** * 配置檔案 * @Auther: ning.zhang * @Email: [email protected] * @CreateDate: 2018/7/30 */ public class ServerConfigs { public static final String ZK = "172.17.245.23:2181,172.17.245.25:2181,172.17.245.26:2181"; public static final String TOPIC = "ad_upload_event"; public static final String BROKER_LIST = "172.17.245.23:9092"; public static final String GROUP_ID = "test_group"; public static final int CACHE_LIST_SIZE = 100; //批量提交資料條數 }
測試結果:20w條資料,雙執行緒插入Hbase 耗時5.32s