hbase建立預切割-開啟多執行緒實現大批量插入
阿新 • • 發佈:2019-02-07
建立表同時預切割區域 可以防止資料傾斜
------------------$hbase>create 'ns1:t3', 'f1', SPLITS => ['row3000000', 'row600000']
/** * 新增批量資料 */ public void bigput(int start , int end) throws Exception { DecimalFormat df = new DecimalFormat("000000") ; // Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t3")); //關閉自動清理緩衝區 table.setAutoFlushTo(false); for (int i = start; i < end; i++) { byte[] rowkey = Bytes.toBytes("row" + df.format(i)); Put put = new Put(rowkey); put.setWriteToWAL(false); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100)); table.put(put); if (i % 2000 == 0) { //清理提交 table.flushCommits(); } } //清理提交 table.flushCommits(); table.close(); //System.out.println(System.currentTimeMillis() - start); } @Test public void testBigInsertMulti() throws InterruptedException { long start = System.currentTimeMillis() ; Thread t1 = new Thread(){ public void run() { try { bigput(0, 300000) ; } catch (Exception e) { e.printStackTrace(); } } }; Thread t2 = new Thread(){ public void run() { try { bigput(300000, 600000) ; } catch (Exception e) { e.printStackTrace(); } } }; Thread t3 = new Thread(){ public void run() { try { bigput(600000, 1000000) ; } catch (Exception e) { e.printStackTrace(); } } }; t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); System.out.println(System.currentTimeMillis() - start); }