HBase【操作Java api】
阿新 • • 發佈:2020-07-16
一.匯入依賴
建立模組,匯入以下依賴,maven預設編譯版本是1.5,用1.8編譯。
pom.xml
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.0.5</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>2.0.5</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
二.HBase工具類
package com.bigdata.hbasedemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; /** * @description: TODO HBase操作工具類 * @author: HaoWu * @create: 2020/7/15 22:51 */ public class HBaseUtils { private static Admin admin = null; private static Configuration configuration = null; private static Connection connection = null; /** * 私有構造方法 */ private HBaseUtils() { //建立配置資訊並配置 configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181"); configuration.set("hbase.rootdir", "hdfs:hadoop102:8020/hbase"); try { //獲取HBase連線物件 connection = ConnectionFactory.createConnection(configuration); //獲取Admin物件 admin = connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } } /** * 私有成員變數,單例模式 */ private static HBaseUtils instance = null; public static synchronized HBaseUtils getInstance() { if (null == instance) { instance = new HBaseUtils(); } return instance; } /** * 關閉連線 */ public static void close() { try { if (admin != null) { admin.close(); } if (connection != null) { connection.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * 根據表名獲取Table例項 * * @param tableName * @return */ public static Table getTable(String nameSpace, String tableName) { Table table = null; try { table = connection.getTable(TableName.valueOf(nameSpace, tableName)); } catch (IOException e) { e.printStackTrace(); } return table; } /** * 建立表 * * @param nameSpace:名稱空間 * @param tableName:表名 * @param splitsKey:預分割槽 * @param columnFamilies:可變列簇,可設定多個 * @return */ public static void createTable(String nameSpace, String tableName, byte[][] splitsKey, String... columnFamilies) { //獲取TableName物件 TableName name = TableName.valueOf(nameSpace, tableName); try { //判斷表是否存在,存在就刪除 if (admin.tableExists(name)) { admin.disableTable(name); admin.deleteTable(name); } //列簇集合描述 ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); for (String columnFamily : columnFamilies) { ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily.getBytes()).build(); columnFamilyDescriptors.add(columnFamilyDescriptor); } //設定列簇,獲取表描述器 TableDescriptor build = TableDescriptorBuilder.newBuilder(name).setColumnFamilies(columnFamilyDescriptors).build(); admin.createTable(build, splitsKey); } catch (IOException e) { e.printStackTrace(); } } /** * 修改表結構:刪除列簇1,修改列簇2的VERSION為3,新增一個列簇3 * * @param nameSpace * @param tableName * @param columnFamfily1 * @param columnFamfily2 * @param columnFamfily3 TODO 異常處理沒做好,圖方便,建議自己每個操作單獨寫方法進行異常處理 */ public static void alterTable(String nameSpace, String tableName, String columnFamfily1, String columnFamfily2, String columnFamfily3) { TableName name = TableName.valueOf(nameSpace, tableName); try { if (admin.tableExists(name)) { //1.刪除列簇1 admin.deleteColumnFamily(name, Bytes.toBytes(columnFamfily1)); //2.修改列簇2 ColumnFamilyDescriptor build2 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamfily2)).setMinVersions(4).setMaxVersions(4).build(); admin.modifyColumnFamily(name, build2); //3.增加列簇3 ColumnFamilyDescriptor build3 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamfily3)).build(); admin.addColumnFamily(name, build3); } } catch (IOException e) { e.printStackTrace(); } } /** * 刪除表 * * @param nameSpace * @param tableName */ public static void deleteTable(String nameSpace, String tableName) { TableName name = TableName.valueOf(nameSpace, tableName); try { //1.禁用表 admin.disableTable(name); //2.刪除表 admin.deleteTable(name); } catch (IOException e) { e.printStackTrace(); } } /** * 插入一條資料,值為String-單行單列簇單列 * * @param tableName:表名 * @param timeStamp:時間戳 * @param rowKey:RowKey * @param columnFamily:列簇名 * @param column:列限定符 * @param value:String型別值 */ public static void putStringValue(String nameSpace, String tableName, long timeStamp, String rowKey, String columnFamily, String column, String value) { //1.獲取表操作物件 Table table = getTable(nameSpace, tableName); //2.封裝Put物件,包括rowKey,timeStamp,列簇,列,列值 Put put = new Put(Bytes.toBytes(rowKey), timeStamp); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); try { table.put(put); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 插入一條資料,值為int-單行單列簇單列 * * @param tableName:表名 * @param timeStamp:時間戳 * @param rowKey:RowKey * @param columnFamily:列簇名 * @param column:列限定符 * @param value:int型別值 */ public static void putintValue(String nameSpace, String tableName, long timeStamp, String rowKey, String columnFamily, String column, int value) { //1.獲取表操作物件 Table table = getTable(nameSpace, tableName); //2.封裝Put物件,包括rowKey,timeStamp,列簇,列,列值 Put put = new Put(Bytes.toBytes(rowKey), timeStamp); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); try { table.put(put); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 插入多行,多列,單列簇 * * @param nameSpace * @param tableName * @param columnFamily * @param mapList:將一個Cell資料封裝成一個map<String,String>,所有的值型別要求為String。 mapList資料格式:{{ column=name, rowkey=1001, value=lisi},{ column=age, rowkey=1001, value=20},{ column=address, rowkey=1001, value="BeiJing"}...} */ public static void putMultipleRowMultipleColumnOneFamily(String nameSpace, String tableName, String columnFamily, List<Map<String, String>> mapList) { //1.獲取表操作物件 Table table = getTable(nameSpace, tableName); //2.建立List<Put>物件 List<Put> puts = new ArrayList<>(); for (Map<String, String> map : mapList) { //3.將每個map資料封裝成一個Put物件 Put put = new Put(Bytes.toBytes(map.get("rowkey"))); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(map.get("column")), Bytes.toBytes(map.get("value"))); puts.add(put); } try { //4.執行put操作 table.put(puts); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 插入單行,多列,單列簇 * * @param nameSpace * @param tableName * @param columnFamily * @param mapList:將一個Cell資料封裝成一個map<String,String>,所有的值型別要求為String。 mapList資料格式:{{ column=name, rowkey=1001, value=lisi},{ column=age, rowkey=1001, value=20},{ column=address, rowkey=1001, value="BeiJing"}...} */ public static void putOneRowMultipleColumnOneFamily(String nameSpace, String tableName, String rowKey, String columnFamily, List<Map<String, String>> mapList) { Table table = getTable(nameSpace, tableName); List<Put> puts = new ArrayList<>(); for (Map<String, String> map : mapList) { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(map.get("column")), Bytes.toBytes(map.get("value"))); puts.add(put); } try { table.put(puts); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** *修改資料和新增資料一致 */ /** * 刪除單行單列簇單列資料 * * @param nameSpace * @param tableName * @param family * @param column * @param rowkey */ public static void deleteOneRowOneCloumn(String nameSpace, String tableName, String family, String column, String rowkey) { Table table = getTable(nameSpace, tableName); Delete delete = new Delete(Bytes.toBytes(rowkey)); delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); try { table.delete(delete); } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 查詢單行單列簇-可變列 * @param nameSpace * @param tableName * @param family * @param columns * @param rowkey */ public static void get(String nameSpace, String tableName, String family, String rowkey,String... columns) { //1.建立table物件 Table table = getTable(nameSpace, tableName); //2.建立Get物件 Get get = new Get(Bytes.toBytes(rowkey)); //3.新增列描述 for (String column : columns) { get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); } try { //4.執行查詢獲取結果 Result result = table.get(get); //5.獲取Cell集合,然後遍歷列印 List<Cell> cells = result.listCells(); for (Cell cell : cells) { //列簇 System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); //列 System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); //值 if ("age".equals(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()))) { //值為int型別 System.out.println(Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); }else { //值為String型別 System.out.println(Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));} System.out.println("***************************************************************************"); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * Scan 掃描表-單列簇 * @param nameSpace * @param tableName * @param family */ public static void scanTable(String nameSpace, String tableName,String family){ //1.建立table物件 Table table = getTable(nameSpace, tableName); //2.建立Scan物件 Scan scan = new Scan(); //3.指定列簇 scan.addFamily(Bytes.toBytes(family)); try { //是否結果快取 scan.setCacheBlocks(false); //4.執行掃描,遍歷結果 ResultScanner resultScanners = table.getScanner(scan); Iterator<Result> iterator = resultScanners.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); List<Cell> cells = result.listCells(); for (Cell cell : cells) { //RowKey System.out.print(Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength())+":"); //列 System.out.print(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength())+":"); //值 if("age".equals(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()))){ //值為int型別 System.out.print(Bytes.toInt(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":"); }else { //值為String型別 System.out.print(Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":"); } System.out.println("***************************************************************************"); } } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) try { table.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 獲取單值比較器:針對單列族單列進行String值過濾{小於|小於等於|等於|不等於|大於等於|大於} * @param family 列簇 * @param column 列名 * @param op 列舉類{LESS|LESS_OR_EQUAL|EQUAL|NOT_EQUAL|GREATER_OR_EQUAL|GREATER} * {小於|小於等於|等於|不等於|大於等於|大於} * @param value 值 * @return */ public static SingleColumnValueFilter getStringSingleColumnValueFilter(String family,String column,CompareOperator op,String value){ return new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column), op, Bytes.toBytes(value)); } /** * 獲取單值比較器:針對單列族單列進行int值過濾{小於|小於等於|等於|不等於|大於等於|大於} * @param family 列簇 * @param column 列名 * @param op 列舉類{LESS|LESS_OR_EQUAL|EQUAL|NOT_EQUAL|GREATER_OR_EQUAL|GREATER} * {小於|小於等於|等於|不等於|大於等於|大於} * @param value 值 * @return */ public static SingleColumnValueFilter getIntSingleColumnValueFilter(String family,String column,CompareOperator op,int value){ return new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column), op, Bytes.toBytes(value)); } /** * 列簇過濾器 * @param family */ public static FamilyFilter getFamilyFilter(String family){ return new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(family))); } /** *列過濾器 * @param column * @return */ public static QualifierFilter getQualifierFilter(String column){ return new QualifierFilter(CompareOperator.EQUAL,new BinaryComparator(Bytes.toBytes(column))); } /** * 針對全表的值:過濾出包含值包含"value"的行 * 在 where name like %value% 結合列簇->列->值過濾達到效果 * @param value * @return */ public static ValueFilter getValueFilter(String value){ return new ValueFilter(CompareOperator.EQUAL, new SubstringComparator(value)); } /** * 獲取過濾器鏈:必須全滿足 filter1 and filter2 and filter3 * @param filters * @return */ public static FilterList getFilterListALL(Filter... filters){ FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); for (Filter filter : filters) { filterList.addFilter(filter); } return filterList; } /** * 獲取過濾器鏈:最少有一個滿足 filter1 or filter2 or filter3 * @param filters * @return */ public static FilterList getFilterListOne(Filter... filters){ FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); for (Filter filter : filters) { filterList.addFilter(filter); } return filterList; } }
三.測試類
package com.bigdata.hbasedemo; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @description: HBaseUtils工具測試類 * @author: HaoWu * @create: 2020/7/16 8:16 */ public class HBaseUtilsTest { //獲取工具類物件 HBaseUtils hBaseUtils = HBaseUtils.getInstance(); /** * 測試建立表 */ @Test public void createTable() { String nameSpace = "bigdata"; String tableName = "user6"; byte[][] splitsKey = new byte[][]{ Bytes.toBytes("10"), Bytes.toBytes("20"), Bytes.toBytes("30"), Bytes.toBytes("40")}; String columnFamily1 = "info1"; String columnFamily2 = "info2"; hBaseUtils.createTable(nameSpace, tableName, splitsKey, columnFamily1, columnFamily2); } /** * 測試修改表 */ @Test public void alterTable() { hBaseUtils.alterTable("bigdata", "user", "info1", "info2", "info3"); } /** * 刪除表 */ @Test public void deleteTable(){ hBaseUtils.deleteTable("bigdata","user5"); } /** * 插入一條資料,值為String型別 */ @Test public void putStringValue() { hBaseUtils.putStringValue("bigdata", "user", 1221312312352L, "1004", "info2", "name", "lisi"); } /** * 插入一條資料,值為int型別 */ @Test public void putintValue() { hBaseUtils.putintValue("bigdata", "user", 1221312312321L, "1004", "info2", "age", 25); } /** * 插入多行,多列,多值,單列簇 */ @Test public void putMultipleRowMultipleColumnOneFamily() { //資料格式:{{ "column"="name", "rowkey"="1001", "value"="lisi"},{ "column"="address", "rowkey"="1001", "value"="BeiJing"}} List<Map<String, String>> mapList = new ArrayList<>(); ConcurrentHashMap<String, String> map1 = new ConcurrentHashMap<>(); map1.put("column","name"); map1.put("rowkey","1003"); map1.put("value","lisi1"); System.out.println(map1); ConcurrentHashMap<String, String> map2 = new ConcurrentHashMap<>(); map2.put("column","address"); map2.put("rowkey","1003"); map2.put("value","BeiJing"); ConcurrentHashMap<String, String> map3 = new ConcurrentHashMap<>(); map3.put("column","name"); map3.put("rowkey","1004"); map3.put("value","wangwu1"); mapList.add(map1); mapList.add(map2); mapList.add(map3); System.out.println(mapList.toString()); hBaseUtils.putMultipleRowMultipleColumnOneFamily("bigdata", "user", "info2", mapList); } /** * 刪除單行單列簇單列 */ @Test public void deleteOnerowOneColumn(){ hBaseUtils.deleteOneRowOneCloumn("bigdata","user","info2","address","1001"); } /** * 查詢單行單列簇-可變列 */ @Test public void get(){ hBaseUtils.get("bigdata","user","info2","1001","age","name"); } /** * scan 掃描表-單列簇 */ @Test public void scanTable(){ hBaseUtils.scanTable("bigdata","user","info2"); } /** * 根據條件過濾:where name="wangwu" or(name="lisi" and age>30) * */ @Test public void testFilter1() throws IOException { Table table = hBaseUtils.getTable("bigdata", "user"); Scan scan = new Scan(); //name="lisi" SingleColumnValueFilter filter1 = hBaseUtils.getStringSingleColumnValueFilter("info2", "name", CompareOperator.EQUAL, "lisi"); //age>30 SingleColumnValueFilter filter2 = hBaseUtils.getIntSingleColumnValueFilter("info2", "age", CompareOperator.GREATER,30); //name="lisi" and age>30 FilterList filterList = hBaseUtils.getFilterListALL(filter1, filter2); //name="wangwu" SingleColumnValueFilter filter3 = hBaseUtils.getStringSingleColumnValueFilter("info2", "name", CompareOperator.EQUAL, "wangwu"); //name="wangwu" or(name="lisi" and age>30) FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ONE); filters.addFilter(filter3); filters.addFilter(filterList); //設定過濾器 scan.setFilter(filters); ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); List<Cell> cells = result.listCells(); for (Cell cell : cells) { //RowKey System.out.print(Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength())+":"); //列 System.out.print(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength())+":"); //值 if("age".equals(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()))){ //值為int型別 System.out.print(Bytes.toInt(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":"); }else { //值為String型別 System.out.print(Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":"); } System.out.println("***************************************************************************"); } } } /** * 關閉admin, connection */ @Test public void close(){ hBaseUtils.close(); } }