Hbase基本操作類
阿新 • • 發佈:2019-02-05
package com.test.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fangdd.cdw2.data.dao.HBasePageModel; public class HBaseOperator { private static Logger LOG = LoggerFactory.getLogger(HBaseOperator.class); private Configuration conf; private Connection connection; /** 載入hbase配置檔案 */ public HBaseOperator(String zkConfs, String zkNode) { this.conf = HBaseConfiguration.create(); this.conf.set("hbase.zookeeper.quorum", zkConfs); this.conf.set("zookeeper.znode.parent", zkNode); } private synchronized Connection getHConnection() { try { if (this.connection == null || this.connection.isClosed()) { this.connection = ConnectionFactory.createConnection(conf); LOG.info("hbase connection init.................."); } return connection; } catch (IOException e) { e.printStackTrace(); } return null; } /** * 描述:獲取所有表 * @return TableName陣列 * @throws Exception */ public TableName[] listTable() throws Exception { Admin admin = this.getHConnection().getAdmin(); TableName[] tableNames = admin.listTableNames(); admin.close(); return tableNames; } /** * 描述:建立表 * @param tableName 表名稱 * @param cfNames column family名稱陣列 * @param version 支援版本 * @throws IOException */ public void createTable(String tableName, String[] cfNames, int version) throws IOException { Admin admin = this.getHConnection().getAdmin(); TableName table = TableName.valueOf(tableName); if (admin.tableExists(table)) { admin.disableTable(table); admin.deleteTable(table); } HTableDescriptor tableDescriptor = new HTableDescriptor(table); HColumnDescriptor hd = null; for (int i = 0; i < cfNames.length; i++) { hd = new HColumnDescriptor(cfNames[i]); hd.setMaxVersions(version); tableDescriptor.addFamily(hd); } admin.createTable(tableDescriptor); admin.close(); } /** * 描述:向表中rowKey對應的記錄新增column->value * @param tableName 表名 * @param rowKey rowkey * @param columnFamily 列族名 * @param qualifier 列識別符號 * @param value 列名對應的值 * @throws IOException */ public void put(String tableName, String rowKey, String columnFamily, String qualifier, String value) throws IOException { Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(value)); htable.put(put); htable.close(); } /** * 描述:獲取單條記錄中的columnFamily:qualifier最新的記錄 * @param tableName 表名 * @param rowKey * @param columnFamily 列族名 * @param qualifier 列識別符號 * @return 列識別符號最新版本值 * @throws IOException */ public String getQualifierValue(String tableName, String rowKey, String columnFamily, String qualifier) throws IOException { String lastestValue = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = htable.get(get); byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); if (value != null && value.length > 0) { lastestValue = Bytes.toString(value); } htable.close(); return lastestValue; } /** * 描述:獲rowkey資料組中所有rowkey對應columnFamily:qualifier的值 * @param tableName 表名 * @param rowKeys rowkey陣列 * @param columnFamily 列族名 * @param qualifier 列識別符號 * @return 所有rowkey對應的列值陣列 * @throws IOException */ public List<String> getQualifierValue(String tableName, String[] rowKeys, String columnFamily, String qualifier) throws IOException { List<String> values = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); List<Get> getList = new ArrayList<Get>(); for (String rowKey : rowKeys) { Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); getList.add(get); } Result[] results = htable.get(getList); if (results != null && results.length > 0) { for (Result result : results) { byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); if (value != null) { if (values == null) { values = new ArrayList<String>(); } String qualifierValue = Bytes.toString(value); values.add(qualifierValue); } } } htable.close(); return values; } /** * 描述:獲取單條記錄中的columnFamily:qualifier所有版本記錄值 * @param tableName 表名 * @param rowKey * @param columnFamily 列族名 * @param qualifier 列識別符號 * @return 列識別符號最新版本值 * @throws IOException */ public List<String> getAllVersionValues(String tableName, String rowKey, String columnFamily, String qualifier) throws IOException { List<String> values = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.setMaxVersions(); // 設定最大版本號 get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = htable.get(get); List<Cell> cells = result.listCells(); if (cells != null && cells.size() > 0) { values = new ArrayList<String>(); for (Cell cell : cells) { String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); values.add(value); } } htable.close(); return values; } /** * 描述:獲取某個rowkey下指定的column family的所有qualifier值 * @param tableName 表名 * @param rowKey * @param columnFamily 列族名 * @return * @throws IOException */ public Map<String, String> getQualifierMapValue(String tableName, String rowKey, String columnFamily) throws IOException { Map<String, String> valuesMap = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(columnFamily)); Result result = htable.get(get); NavigableMap<byte[],byte[]> qualifierValues = result.getFamilyMap(Bytes.toBytes(columnFamily)); if (qualifierValues != null && qualifierValues.size() > 0) { valuesMap = new HashMap<String, String>(); Iterator<byte[]> it = qualifierValues.keySet().iterator(); while (it.hasNext()) { byte[] qualifier = it.next(); byte[] value = qualifierValues.get(qualifier); valuesMap.put(Bytes.toString(qualifier), Bytes.toString(value)); } } htable.close(); return valuesMap; } /** * 描述:獲取一條記錄下column family的所有qualifier值 * @param tableName * @param rowKey * @param columnFamily * @return * @throws IOException */ public Map<String, Map<String, String>> getQualifierMapValue(String tableName, String rowKey) throws IOException { Map<String, Map<String, String>> valuesMap = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = htable.get(get); NavigableMap<byte[], NavigableMap<byte[],byte[]>> familyMap = result.getNoVersionMap(); if (familyMap != null && familyMap.size() > 0) { valuesMap = new HashMap<String, Map<String, String>>(); Iterator<byte[]> it = familyMap.keySet().iterator(); while (it.hasNext()) { byte[] familyBytes = it.next(); String familyName= Bytes.toString(familyBytes); NavigableMap<byte[], byte[]> qualifierMap = familyMap.get(familyBytes); if (qualifierMap != null && qualifierMap.size() > 0) { Map<String, String> qualifierValueMap = new HashMap<String, String>(); for (byte[] qualifierBytes: qualifierMap.keySet()) { String qualifierName = Bytes.toString(qualifierBytes); String qualifierValue = Bytes.toString(qualifierMap.get(qualifierBytes)); if (qualifierValue != null) { qualifierValueMap.put(qualifierName, qualifierValue); } } if (qualifierValueMap.size() > 0) { valuesMap.put(familyName, qualifierValueMap); } } } } htable.close(); return valuesMap; } /** * 描述 :獲取指定的qualifier中從startTimestamp->endTimestamp(不包含endTimestamp)的所有版本值 * @param tableName 表名 * @param rowKey * @param columnFamily 列族名稱 * @param qualifier 列名 * @param startTimestamp 起始版本號 * @param endTimestamp 結束版本號 * @return * @throws IOException */ public List<String> getQualifierValuesByRange(String tableName, String rowKey, String columnFamily, String qualifier, long startTimestamp, long endTimestamp) throws IOException { List<String> values = null; Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); get.setMaxVersions(); get.setTimeRange(startTimestamp, endTimestamp); Result result = htable.get(get); List<Cell> cells = result.listCells(); if (cells != null && cells.size() > 0) { values = new ArrayList<String>(); for (Cell cell : cells) { String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); values.add(value); } } htable.close(); return values; } /** * 描述:獲取起始startRowKey,endRowKey之間並且滿足條件的rowkeys * @param tableName 表名 * @param startRowKey 起始rowKey * @param endRowKey 結束rowKey * @param filterList 過濾器 * @return rowkey列表 * @throws IOException */ public List<String> scanRowkeys(String tableName, String startRowKey, String endRowKey, FilterList filterList) throws IOException { if(startRowKey == null || startRowKey.equals("") || endRowKey == null || endRowKey.equals("")) { return null; } List<String> list = null; final byte[] POSTFIX = new byte[] { 0x00 }; Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX)); // 包含endRowkey if (filterList != null) { scan.setFilter(filterList); } Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = htable.getScanner(scan); for (Result result : resultScanner) { byte[] rowBytes = result.getRow(); if (rowBytes != null) { String rowkey = Bytes.toString(rowBytes); if (list == null) { list = new ArrayList<String>(); } list.add(rowkey); } } resultScanner.close(); htable.close(); return list; } /** * 描述:獲取起始startRowKey,endRowKey之間並且滿足條件的記錄 * @param tableName 表名 * @param startRowKey 起始rowKey * @param endRowKey 結束rowKey * @param filterList 過濾器 * @return * @throws IOException */ public List<Map<String, Map<String, String>>> scanRangeRows(String tableName, String startRowKey, String endRowKey, FilterList filterList) throws IOException { if(startRowKey == null || startRowKey.equals("") || endRowKey == null || endRowKey.equals("")) { return null; } List<Map<String, Map<String, String>>> list = null; final byte[] POSTFIX = new byte[] { 0x00 }; Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX)); // 包含endRowkey if (filterList != null) { scan.setFilter(filterList); } Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = htable.getScanner(scan); for (Result result : resultScanner) { Map<String, Map<String, String>> familyMap = getResultRow(result); if (familyMap != null && familyMap.size() > 0) { if (list == null) { list = new ArrayList<Map<String, Map<String, String>>>(); } list.add(familyMap); } } resultScanner.close(); htable.close(); return list; } /** * 描述:根據過濾條件範圍掃描並進行資料分頁操作 * @param tableName 表名 * @param startRowKey 開始rowkey * @param endRowKey 結束rowkey * @param pageIndex 第幾頁 * @param pageSize 分頁每頁記錄數 * @param filterList 過濾器 * @return 分頁模型資料物件 * @throws IOException */ public HBasePageModel scanPageRows(String tableName, String startRowKey, String endRowKey, Integer pageIndex, Integer pageSize, FilterList filterList) throws IOException { if(startRowKey == null || startRowKey.equals("") || endRowKey == null || endRowKey.equals("")) { return null; } List<Map<String, Map<String, String>>> resultList = null; if (pageIndex == null || pageIndex <= 0) { pageIndex = 1; // 預設第一頁 } if (pageSize == null || pageSize <= 0) { pageSize = 50; // 預設每頁50條 } int pageStartIndex = (pageIndex - 1) * pageSize; int pageEndIndex = pageStartIndex + pageSize; final byte[] POSTFIX = new byte[] { 0x00 }; Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.add(Bytes.toBytes(endRowKey), POSTFIX)); // 包含endRowkey if (filterList != null) { scan.setFilter(filterList); } Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = htable.getScanner(scan); int i = 0; for (Result result : resultScanner) { Map<String, Map<String, String>> familyMap = getResultRow(result); if (familyMap != null && familyMap.size() > 0) { if (i >= pageStartIndex && i < pageEndIndex) { // 在分頁index範圍內 if (resultList == null) { resultList = new ArrayList<Map<String, Map<String, String>>>(); } resultList.add(familyMap); } } i++; } HBasePageModel pageModel = new HBasePageModel(pageIndex, pageSize, i); pageModel.setResultList(resultList); resultScanner.close(); htable.close(); return pageModel; } private List<Map<String, Map<String, String>>> scanPageRows(String tableName, String lastRowKey, int pageSize) throws IOException { if(lastRowKey == null || lastRowKey.equals("")) { return null; } final byte[] POSTFIX = new byte[] { 0x00 }; List<Map<String, Map<String, String>>> list = null; Scan scan = new Scan(); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); Filter pageFilter = new PageFilter(pageSize); filterList.addFilter(pageFilter); scan.setFilter(filterList); scan.setStartRow(Bytes.add(Bytes.toBytes(lastRowKey), POSTFIX)); Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = htable.getScanner(scan); for (Result result : resultScanner) { Map<String, Map<String, String>> familyMap = getResultRow(result); if (familyMap != null && familyMap.size() > 0) { if (list == null) { list = new ArrayList<Map<String, Map<String, String>>>(); } list.add(familyMap); } } resultScanner.close(); htable.close(); return list; } /** * 描述:獲取一行記錄的所有列族中列的值 * @param result rowkey對應的Result物件 * @return 列族中列的值map */ private Map<String, Map<String, String>> getResultRow(Result result) { if (result == null) { return null; } Map<String, Map<String, String>> familyMap = new HashMap<String, Map<String, String>>(); String rowkey = Bytes.toString(result.getRow()); for (Cell cell : result.rawCells()) { String columnFamilyName = Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); String qualifierName = Bytes.toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); String qualifierValue = Bytes.toStringBinary(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); if (!familyMap.containsKey(columnFamilyName)) { Map<String, String> qualifierMap = new HashMap<String, String>(); qualifierMap.put(qualifierName, qualifierValue); familyMap.put(columnFamilyName, qualifierMap); } else { familyMap.get(columnFamilyName).put(qualifierName, qualifierValue); } } return familyMap; } /** * 描述:刪除rowkeys記錄 * @param tableName 表名 * @param rowKeys * @throws IOException */ public void deleteRow(String tableName, String[] rowKeys) throws IOException { Table htable = this.getHConnection().getTable(TableName.valueOf(tableName)); List<Delete> list = new ArrayList<Delete>(); for (int i = 0; i < rowKeys.length; i++) { Delete delete = new Delete(Bytes.toBytes(Long.valueOf(rowKeys[i]))); list.add(delete); } htable.delete(list); htable.close(); } /** * 描述:判斷表是否存在 * @param tableName 表名 * @return boolean * @throws IOException */ public boolean isExist(String tableName) throws IOException { Admin admin = this.getHConnection().getAdmin(); TableName table = TableName.valueOf(tableName); boolean exist = admin.tableExists(table); admin.close(); return exist; } /** * 描述:刪除表 * @param tableName 表名稱 * @throws IOException */ public void deleteTable(String tableName) throws IOException { Admin admin = this.getHConnection().getAdmin(); TableName table = TableName.valueOf(tableName); if (admin.tableExists(table)) { admin.disableTable(table); admin.deleteTable(table); } admin.close(); } private static void dumpCell(Cell cell) { System.out.println( Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "_" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "_" + Bytes.toString( cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) + "_" + cell.getTimestamp() ); } }