java實現hbase資料庫的增刪改查操作(新API)
阿新 • • 發佈:2019-01-08
操作環境:
java版本: jdk 1.7以上
hbase 版本:1.2.x
hadoop版本:2.6.0以上
實現功能: 1,建立指定表2,刪除指定表
3,根據表名,行鍵,列族,列描述符,值插入資料
4,根據指定表獲取指定行鍵rowkey和列族family的資料 並以字串的形式返回查詢到的結果
5,根據table查詢表中的所有資料 無返回值,直接在控制檯列印結果
7,根據指定表獲取指定行鍵rowKey和列族family的資料 並以Map集合的形式返回查詢到的結果
8,根據指定表獲取指定行鍵rowKey的所有資料 並以Map集合的形式返回查詢到的結果
9,根據表名獲取所有的資料
package com.hbase.util; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.util.Bytes; public class HBaseUtil{ /** * 連線池物件 */ protected static Connection connection; private static final String ZK_QUORUM = "hbase.zookeeper.quorum"; private static final String ZK_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; /** * HBase位置 */ private static final String HBASE_POS = "192.168.1.104"; /** * ZooKeeper位置 */ private static final String ZK_POS = "localhost"; /** * zookeeper服務埠 */ private final static String ZK_PORT_VALUE = "2181"; /** * 靜態構造,在呼叫靜態方法時前進行執行 * 初始化連線物件. * */ static{ Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.rootdir", "hdfs://" + HBASE_POS + ":9000/hbase"); configuration.set(ZK_QUORUM, ZK_POS); configuration.set(ZK_CLIENT_PORT, ZK_PORT_VALUE); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); }// 建立連線池 } /** * 建構函式,用於初始化內建物件 */ public HBaseUtil() { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.rootdir", "hdfs://" + HBASE_POS + ":9000/hbase"); configuration.set(ZK_QUORUM, ZK_POS); configuration.set(ZK_CLIENT_PORT, ZK_PORT_VALUE); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); }// 建立連線池 } /** * @param tableName * 建立一個表 tableName 指定的表名 seriesStr * @param seriesStr * 以字串的形式指定表的列族,每個列族以逗號的形式隔開,(例如:"f1,f2"兩個列族,分別為f1和f2) **/ public boolean createTable(String tableName, String seriesStr) { boolean isSuccess = false;// 判斷是否建立成功!初始值為false Admin admin = null; TableName table = TableName.valueOf(tableName); try { admin = connection.getAdmin(); if (!admin.tableExists(table)) { System.out.println("INFO:Hbase:: " + tableName + "原資料庫中表不存在!開始建立..."); HTableDescriptor descriptor = new HTableDescriptor(table); String[] series = seriesStr.split(","); for (String s : series) { descriptor.addFamily(new HColumnDescriptor(s.getBytes())); } admin.createTable(descriptor); System.out.println("INFO:Hbase:: "+tableName + "新的" + tableName + "表建立成功!"); isSuccess = true; } else { System.out.println("INFO:Hbase:: 該表已經存在,不需要在建立!"); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(admin); } return isSuccess; } /** * 刪除指定表名的表 * @param tableName 表名 * @throws IOException * */ public boolean dropTable(String tableName) throws IOException { boolean isSuccess = false;// 判斷是否建立成功!初始值為false Admin admin = null; TableName table = TableName.valueOf(tableName); try { admin = connection.getAdmin(); if (admin.tableExists(table)) { admin.disableTable(table); admin.deleteTable(table); isSuccess = true; } } finally { IOUtils.closeQuietly(admin); } return isSuccess; } /** * 向指定表中插入資料 * * @param tableName * 要插入資料的表名 * @param rowkey * 指定要插入資料的表的行鍵 * @param family * 指定要插入資料的表的列族family * @param qualifier * 要插入資料的qualifier * @param value * 要插入資料的值value * */ protected static void putDataH(String tableName, String rowkey, String family, String qualifier, Object value) throws IOException { Admin admin = null; TableName tN = TableName.valueOf(tableName); admin = connection.getAdmin(); if (admin.tableExists(tN)) { try (Table table = connection.getTable(TableName.valueOf(tableName .getBytes()))) { Put put = new Put(rowkey.getBytes()); put.addColumn(family.getBytes(), qualifier.getBytes(), value.toString().getBytes()); table.put(put); } catch (Exception e) { e.printStackTrace(); } } else { System.out.println("插入資料的表不存在,請指定正確的tableName ! "); } } /** * 根據指定表獲取指定行鍵rowkey和列族family的資料 並以字串的形式返回查詢到的結果 * * @param tableName * 要獲取表 tableName 的表名 * @param rowKey * 指定要獲取資料的行鍵 * @param family * 指定要獲取資料的列族元素 * @param qualifier * 指定要獲取資料的qualifier * * */ protected static String getValueBySeriesH(String tableName, String rowKey, String family,String qualifier) throws IllegalArgumentException, IOException { Table table = null; String resultStr = null; try { table = connection .getTable(TableName.valueOf(tableName.getBytes())); Get get = new Get(Bytes.toBytes(rowKey)); if( !get.isCheckExistenceOnly()){ get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result res = table.get(get); byte[] result = res.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); resultStr = Bytes.toString(result); }else{ resultStr = null; } } finally { IOUtils.closeQuietly(table); } return resultStr; } /** * 根據table查詢表中的所有資料 無返回值,直接在控制檯列印結果 * */ @SuppressWarnings("deprecation") public void getValueByTable(String tableName) throws Exception { Table table = null; try { table = connection.getTable(TableName.valueOf(tableName)); ResultScanner rs = table.getScanner(new Scan()); for (Result r : rs) { System.out.println("獲得到rowkey:" + new String(r.getRow())); for (KeyValue keyValue : r.raw()) { System.out.println("列:" + new String(keyValue.getFamily()) + ":" + new String(keyValue.getQualifier()) + "====值:" + new String(keyValue.getValue())); } } } finally { IOUtils.closeQuietly(table); } } /** * 根據指定表獲取指定行鍵rowKey和列族family的資料 並以Map集合的形式返回查詢到的結果 * * @param tableName * 要獲取表 tableName 的表名 * @param rowKey * 指定的行鍵rowKey * @param family * 指定列族family * */ protected static Map<String, String> getAllValueH(String tableName, String rowKey, String family) throws IllegalArgumentException, IOException { Table table = null; Map<String, String> resultMap = null; try { table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); if(get.isCheckExistenceOnly()){ Result res = table.get(get); Map<byte[], byte[]> result = res.getFamilyMap(family.getBytes()); Iterator<Entry<byte[], byte[]>> it = result.entrySet().iterator(); resultMap = new HashMap<String, String>(); while (it.hasNext()) { Entry<byte[], byte[]> entry = it.next(); resultMap.put(Bytes.toString(entry.getKey()), Bytes.toString(entry.getValue())); } } } finally { IOUtils.closeQuietly(table); } return resultMap; } /** * 根據指定表獲取指定行鍵rowKey的所有資料 並以Map集合的形式返回查詢到的結果 * 每條資料之間用&&&將Qualifier和Value進行區分 * @param tableName * 要獲取表 tableName 的表名 * @param rowkey * 指定的行鍵rowKey * */ public ArrayList<String> getFromRowkeyValues(String tableName, String rowkey){ Table table =null; ArrayList<String> Resultlist = new ArrayList<>(); Get get = new Get(Bytes. toBytes ( rowkey )); try { table = connection.getTable(TableName.valueOf(tableName)); Result r = table.get(get); for (Cell cell : r.rawCells()) { //每條資料之間用&&&將Qualifier和Value進行區分 String reString = Bytes. toString (CellUtil. cloneQualifier (cell))+"&&&"+Bytes. toString (CellUtil. cloneValue (cell)); Resultlist.add(reString); } table.close(); } catch (IOException e1) { e1.printStackTrace(); } return Resultlist; } /** * 根據表名獲取所有的資料 * */ @SuppressWarnings("unused") private void getAllValues(String tableName){ try { Table table= connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); ResultScanner resutScanner = table.getScanner(scan); for(Result result: resutScanner){ System.out.println("scan: " + result); } } catch (IOException e) { e.printStackTrace(); } } public static void getTestDate(String tableName) throws IOException{ Table table = null; table = connection.getTable(TableName.valueOf(tableName)); int count = 0; Scan scan = new Scan(); scan.addFamily("f".getBytes()); Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("112213.*")); scan.setFilter(filter); ResultScanner resultScanner = table.getScanner(scan); for(Result result : resultScanner){ System.out.println(result); count++; } System.out.println("INFO:Hbase:: 測試結束!共有 " + count + "條資料"); } }