hbase 基本的JavaApi 數據操作及過濾
阿新 • • 發佈:2019-03-30
bool ring static 行為 大小 and tro 也會 子串
本文主要是hbase的表操作、數據操作、數據查詢過濾等,如果對JDBC或ADO有了解,容易理解HBASE API。
hbase版本是2.0。
1、為了方便先貼helper的部分代碼(文末git上有完整的測試代碼),主要是為了復用Connection。
public class HBaseHelper implements Closeable { private Configuration configuration = null; private Connection connection = null; private Admin admin = null; private HBaseHelper(Configuration configuration) throws IOException { this.configuration = configuration; this.connection = ConnectionFactory.createConnection(this.configuration); admin = this.connection.getAdmin(); } public static HBaseHelper getHBaseHelper(Configuration configuration) throwsIOException { return new HBaseHelper(configuration); } @Override public void close() throws IOException { admin.close(); connection.close(); } public Connection getConnection() { return connection; } public Configuration getConfiguration() {return configuration; }
... ...
}
初始化,用來初始化hbase配置,連接hbase,獲取本文中的hbase輔助操作類HbaseHelper,註釋是從官網摘下來的比較簡單,懶得翻譯了。
//初始化 private void setUp() throws IOException{ conf = HBaseConfiguration.create(); conf.set("hbase.master","192.168.31.10"); //The port the HBase Master should bind to. // conf.set("hbase.master.port","16000"); //The port for the HBase Master web UI. Set to -1 if you do not want a UI instance run. // conf.set("hbase.master.info.port","16010"); //The port the HBase RegionServer binds to. // conf.set("hbase.regionserver.port","16020"); //The port for the HBase RegionServer web UI Set to -1 if you do not want the RegionServer UI to run. // conf.set("hbase.regionserver.info.port","16030"); conf.set("hbase.zookeeper.quorum","192.168.31.10"); //Property from ZooKeeper’s config zoo.cfg. The port at which the clients will connect. // HBase數據庫使用的端口 //conf.set("hbase.zookeeper.property.clientPort", "2181"); //單機 conf.set("hbase.rootdir","file:///opt/hbase_data"); conf.set("hbase.zookeeper.property.dataDir","/opt/hbase_data/zookeeper"); helper = HBaseHelper.getHBaseHelper(conf); }
2、命名空間、表創建、刪除(先禁用表)、exist等
public void createNamespace(String namespace) { try { NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build(); admin.createNamespace(nd); } catch (Exception e) { System.err.println("Error: " + e.getMessage()); } } public void dropNamespace(String namespace, boolean force) { try { if (force) { TableName[] tableNames = admin.listTableNamesByNamespace(namespace); for (TableName name : tableNames) { admin.disableTable(name); admin.deleteTable(name); } } } catch (Exception e) { // ignore } try { admin.deleteNamespace(namespace); } catch (IOException e) { System.err.println("Error: " + e.getMessage()); } } public boolean existsTable(String table) throws IOException { return existsTable(TableName.valueOf(table)); } public boolean existsTable(TableName table) throws IOException { return admin.tableExists(table); } public void createTable(String table, String... colfams) throws IOException { createTable(TableName.valueOf(table), 1, null, colfams); } public void createTable(TableName table, String... colfams) throws IOException { createTable(table, 1, null, colfams); } public void createTable(String table, int maxVersions, String... colfams) throws IOException { createTable(TableName.valueOf(table), maxVersions, null, colfams); } public void createTable(TableName table, int maxVersions, String... colfams) throws IOException { createTable(table, maxVersions, null, colfams); } public void createTable(String table, byte[][] splitKeys, String... colfams) throws IOException { createTable(TableName.valueOf(table), 1, splitKeys, colfams); } public void createTable(TableName table, int maxVersions, byte[][] splitKeys, String... colfams) throws IOException { //表描述器構造器 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table); //列族描述構造器 ColumnFamilyDescriptorBuilder cfDescBuilder; //列描述器 ColumnFamilyDescriptor cfDesc; for (String cf : colfams) { cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)); cfDescBuilder.setMaxVersions(maxVersions); cfDesc = cfDescBuilder.build(); tableDescriptorBuilder.setColumnFamily(cfDesc); } //獲得表描述器 TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); if (splitKeys != null) { admin.createTable(tableDescriptor, splitKeys); } else { admin.createTable(tableDescriptor); } }
//禁用表 public void disableTable(String table) throws IOException { disableTable(TableName.valueOf(table)); } public void disableTable(TableName table) throws IOException { admin.disableTable(table); } public void dropTable(String table) throws IOException { dropTable(TableName.valueOf(table)); }
//刪除前,先禁用表 public void dropTable(TableName table) throws IOException { if (existsTable(table)) { if (admin.isTableEnabled(table)) disableTable(table); admin.deleteTable(table); } }
樣例:
//插入testtable表數據 private void initTestTable() throws IOException{ String tableNameString = "testtable"; if(helper.existsTable(tableNameString)){ helper.disableTable(tableNameString); helper.dropTable(tableNameString); } helper.createTable(tableNameString,"info","ex","memo"); helper.put(tableNameString,"row1","info","username","admin"); helper.put(tableNameString,"row1","ex","addr","北京大道"); helper.put(tableNameString,"row1","memo","detail","超級用戶,地址:北京大道"); helper.put(tableNameString,"row2","info","username","guest"); helper.put(tableNameString,"row2","ex","addr","全國各地"); helper.put(tableNameString,"row2","memo","detail","遊客,地址:全國到處都是"); helper.close(); }
2、插入(或是更新)數據
public void put(String table, String row, String fam, String qual, String val) throws IOException { put(TableName.valueOf(table), row, fam, qual, val); } //插入或更新單行 public void put(TableName table, String row, String fam, String qual, String val) throws IOException { Table tbl = connection.getTable(table); Put put = new Put(Bytes.toBytes(row)); put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), Bytes.toBytes(val)); tbl.put(put); tbl.close(); } public void put(String table, String row, String fam, String qual, long ts, String val) throws IOException { put(TableName.valueOf(table), row, fam, qual, ts, val); } //帶時間戳插入或更新單行 public void put(TableName table, String row, String fam, String qual, long ts, String val) throws IOException { Table tbl = connection.getTable(table); Put put = new Put(Bytes.toBytes(row)); put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, Bytes.toBytes(val)); tbl.put(put); tbl.close(); } //插入或者更新一個rowKey數據,一個Put裏有一個rowKey,可能有多個列族和列名 public void put(String tableNameString, Put put) throws IOException { TableName tableName = TableName.valueOf(tableNameString); Table table = connection.getTable(tableName); if (put != null && put.size() > 0) { table.put(put); } table.close(); }
2.1、批量插入,根據實際的業務來組裝數據,最終就是利用API放入put列表
//批量插入數據,list裏每個map就是一條數據,並且按照rowKey columnFamily columnName columnValue放入map的key和value public void bulkInsert(String tableNameString, List<Map<String, Object>> list) throws IOException { Table table = connection.getTable(TableName.valueOf(tableNameString)); List<Put> puts = new ArrayList<Put>(); if (list != null && list.size() > 0) { for (Map<String, Object> map : list) { Put put = new Put(Bytes.toBytes(map.get("rowKey").toString())); put.addColumn(Bytes.toBytes(map.get("columnFamily").toString()), Bytes.toBytes(map.get("columnName").toString()), Bytes.toBytes(map.get("columnValue").toString())); puts.add(put); } } table.put(puts); table.close(); } //批量插入,外部組裝put放入list public void bulkInsert2(String tableNameString, List<Put> puts) throws IOException { Table table = connection.getTable(TableName.valueOf(tableNameString)); if (puts != null && puts.size() > 0) { table.put(puts); } table.close(); }
樣例:
//批量插入 private void bulkInsertTestTable() throws IOException{ String tableNameString = "testtable"; if(!helper.existsTable(tableNameString)){ helper.createTable(tableNameString,"info","ex","memo"); } System.out.println(".........批量插入數據start........."); List<Map<String,Object>> mapList = new ArrayList<>(); for(int i=1;i<201;i++){ Map<String,Object> map = new HashMap<>(); map.put("rowKey","testKey"+i); map.put("columnFamily","info"); map.put("columnName","username"); map.put("columnValue","guest"+i); map.put("rowKey","testKey"+i); map.put("columnFamily","ex"); map.put("columnName","addr"); map.put("columnValue","北京路"+i+"號"); map.put("rowKey","testKey"+i); map.put("columnFamily","memo"); map.put("columnName","detail"); map.put("columnValue","聯合國地球村北京路第"+i+"號"); mapList.add(map); } helper.bulkInsert(tableNameString,mapList); System.out.println(".........批量插入數據end........."); } //批量插入2 private void insertByRowKey(String table,String rowKey) throws IOException{ Put put = new Put(Bytes.toBytes(rowKey)); String columnFamily ; String columnName ; String columnValue ; for(int i=0;i<10;i++){ columnFamily = "info"; columnName = "username"+i; columnValue = "user111"; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); columnFamily = "ex"; columnName = "addr"+i; columnValue = "street 111"; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); columnFamily = "memo"; columnName = "detail"+i; columnValue = "sssss zzz 111222 "; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); } System.out.println("----> put size:"+put.size()); helper.put(table,put); } private void bulkInsertTestTable2(String tableNameString) throws IOException{ // String tableNameString = "testtable"; if(!helper.existsTable(tableNameString)){ helper.createTable(tableNameString,"info","ex","memo"); } List<Put> puts = new ArrayList<>(); for(int i=0;i<10;i++){ String rowKey = "rowKey"+i; Put put = new Put(Bytes.toBytes(rowKey)); String columnFamily = "info"; String columnName = "username2"; String columnValue = "user"+i; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); columnFamily = "ex"; columnName = "addr2"; columnValue = "street "+i; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); columnFamily = "memo"; columnName = "detail2"; columnValue = "aazzdd "+i; put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(columnValue)); System.out.println("put size:"+put.size()); puts.add(put); } helper.bulkInsert2(tableNameString,puts); }
3、刪除數據,由於hbase數據是三個維度的,所以刪除數據有多種操作
//根據rowKey刪除所有行數據 public void deleteByKey(String tableNameString,String rowKey) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); table.close(); } //根據rowKey和列族刪除所有行數據 public void deleteByKeyAndFamily(String tableNameString,String rowKey,String columnFamily) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addFamily(Bytes.toBytes(columnFamily)); table.delete(delete); table.close(); } //根據rowKey、列族刪除多個列的數據 public void deleteByKeyAndFC(String tableNameString,String rowKey, String columnFamily,List<String> columnNames) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Delete delete = new Delete(Bytes.toBytes(rowKey)); for(String columnName:columnNames){ delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName)); } table.delete(delete); table.close(); }
4、基本的查詢,唯一要註意的是cell裏的value必須按位移和長度來取
//根據rowkey,獲取所有列族和列數據 public List<Cell> getRowByKey(String tableNameString,String rowKey) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); // Cell[] cells = result.rawCells(); List<Cell> list = result.listCells(); table.close(); return list; }
//從Cell取Array要加上位移和長度,不然數據不正確 public void dumpResult(Result result) { for (Cell cell : result.rawCells()) { System.out.println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } }
5、過濾,這個是HBASE查詢的重要部分
5.1、根據rowKey來過濾
//根據rowKey過濾數據,rowKey可以使用正則表達式 //返回rowKey和Cells的鍵值對 public Map<String,List<Cell>> filterByRowKeyRegex(String tableNameString,String rowKey,CompareOperator operator) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Scan scan = new Scan(); //使用正則 RowFilter filter = new RowFilter(operator,new RegexStringComparator(rowKey)); //包含子串匹配,不區分大小寫。 // RowFilter filter = new RowFilter(operator,new SubstringComparator(rowKey)); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); Map<String,List<Cell>> map = new HashMap<>(); for(Result result:scanner){ map.put(Bytes.toString(result.getRow()),result.listCells()); } table.close(); return map; }
5.2、根據列值、列值正則等方式過濾
//根據列族,列名,列值(支持正則)查找數據 //返回值:如果查詢到值,會返回所有匹配的rowKey下的各列族、列名的所有數據(即使查詢的時候這些列族和列名並不匹配) public Map<String,List<Cell>> filterByValueRegex(String tableNameString,String family,String colName, String value,CompareOperator operator) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); Scan scan = new Scan(); //正則匹配 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(colName),operator,new RegexStringComparator(value)); //完全匹配 // SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(family), // Bytes.toBytes(colName),operator,Bytes.toBytes(value)); //SingleColumnValueExcludeFilter排除列值 //要過濾的列必須存在,如果不存在,那麽這些列不存在的數據也會返回。如果不想讓這些數據返回,設置setFilterIfMissing為true filter.setFilterIfMissing(true); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); Map<String,List<Cell>> map = new HashMap<>(); for(Result result:scanner){ map.put(Bytes.toString(result.getRow()),result.listCells()); } return map; }
5.3、根據列名前綴、列名正則、多個列名等過濾
//根據列名前綴過濾數據 public Map<String,List<Cell>> filterByColumnPrefix(String tableNameString,String prefix) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); //列名前綴匹配 ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix)); //QualifierFilter 用於列名多樣性匹配過濾 // QualifierFilter filter = new QualifierFilter(CompareOperator.EQUAL,new SubstringComparator(prefix)); //多個列名前綴匹配 // MultipleColumnPrefixFilter multiFilter = new MultipleColumnPrefixFilter(new byte[][]{}); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); Map<String,List<Cell>> map = new HashMap<>(); for(Result result:scanner){ map.put(Bytes.toString(result.getRow()),result.listCells()); } return map; }
5.4、過濾器集合,多個過濾器同時按通過策略來過濾
//根據列名範圍以及列名前綴過濾數據 public Map<String,List<Cell>> filterByPrefixAndRange(String tableNameString,String colPrefix, String minCol,String maxCol) throws IOException{ Table table = connection.getTable(TableName.valueOf(tableNameString)); //列名前綴匹配 ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes(colPrefix)); //列名範圍掃描,上下限範圍包括 ColumnRangeFilter rangeFilter = new ColumnRangeFilter(Bytes.toBytes(minCol),true, Bytes.toBytes(maxCol),true); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); filterList.addFilter(filter); filterList.addFilter(rangeFilter); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); Map<String,List<Cell>> map = new HashMap<>(); for(Result result:scanner){ map.put(Bytes.toString(result.getRow()),result.listCells()); } return map; }
6、過濾器介紹
6.1、比較操作,如等於、大於、小於
public enum CompareOperator { // Keeps same names as the enums over in filter‘s CompareOp intentionally. // The convertion of operator to protobuf representation is via a name comparison. /** less than */ LESS, /** less than or equal to */ LESS_OR_EQUAL, /** equals */ EQUAL, /** not equal */ NOT_EQUAL, /** greater than or equal to */ GREATER_OR_EQUAL, /** greater than */ GREATER, /** no operation */ NO_OP, }
6.2、比較器,主要是繼承ByteArrayComparable的類
RegexStringComparator 支持正則表達式的值比較 Scan scan = new Scan(); RegexStringComparator comp = new RegexStringComparator("you."); // 以 you 開頭的字符串 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), CompareOp.EQUAL, comp); scan.setFilter(filter);
SubStringComparator 用於判斷一個子串是否存在於值中,並且不區分大小寫。 Scan scan = new Scan(); SubstringComparator comp = new SubstringComparator("substr"); // 查找包含的字符串 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), CompareOp.EQUAL, comp); scan.setFilter(filter);
BinaryComparator 二進制比較器,不用反序列化直接進行字節比較,應該比較高效。 Scan scan = new Scan(); BinaryComparator comp = new BinaryComparator(Bytes.toBytes("my hbase")); ValueFilter filter = new ValueFilter(CompareOp.EQUAL, comp); scan.setFilter(filter);
BinaryPrefixComparator 前綴二進制比較器。只比較前綴是否相同。 Scan scan = new Scan(); BinaryPrefixComparator comp = new BinaryPrefixComparator(Bytes.toBytes("test")); // SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), CompareOp.EQUAL, comp); scan.setFilter(filter);
6.3、過濾器,部分介紹
行鍵過濾器 RowFilter 對某一行的過濾。 Scan scan = new Scan(); RowFilter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1"))); scan.setFilter(filter);
列族過濾器 FamilyFilter 用於過濾列族(也可以在Scan 過程中通過設定某些列族來實現該功能) Scan scan = new Scan(); FamilyFilter filter = new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info"))); // 列族為 info scan.setFilter(filter);
列名過濾器
QualifierFilter 列名全匹配 Scan scan = new Scan(); QualifierFilter filter = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("username"))); // 列名為 username scan.setFilter(filter); ColumnPrefixFilter 用於列名(Qualifier)前綴過濾,即包含某個前綴的所有列名。 Scan scan = new Scan(); ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("addr")); // 前綴為 addr scan.setFilter(filter); MultipleColumnPrefixFilter MultipleColumnPrefixFilter 與 ColumnPrefixFilter 的行為類似,但可以指定多個列名(Qualifier)前綴。 Scan scan = new Scan(); byte[][] prefixes = new byte[][]{Bytes.toBytes("my-prefix-1"), Bytes.toBytes("my-prefix-2")}; MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefixes); 、 scan.setFilter(filter); ColumnRangeFilter 列名範圍過濾器可以進行高效的列名內部掃描。關鍵字:已排序 Scan scan = new Scan(); ColumnRangeFilter filter = new ColumnRangeFilter(Bytes.toBytes("minColumn"), true, Bytes.toBytes("maxColumn"), false); scan.setFilter(filter); DependentColumnFilter 嘗試找到該列所在的每一行,並返回該行具有相同時間戳的全部鍵值對。 Scan scan = new Scan(); DependentColumnFilter filter = new DependentColumnFilter(Bytes.toBytes("family"), Bytes.toBytes("qualifier")); scan.setFilter(filter);
列值過濾器 SingleColumnValueFilter 列值比較 列族 info 下的列 username的列值和字符串 "admin" 相等的數據 : Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("username"), CompareOp.EQUAL, Bytes.toBytes("admin")); scan.setFilter(filter);
6.4、代碼:https://github.com/asker124143222/hbaseHello
hbase 基本的JavaApi 數據操作及過濾