hbase(二)Java操作 hbase
阿新 • • 發佈:2020-09-03
一、準備讀取檔案 hbaseFile.txt
rowkey,name:firstName,name:lastName,address:province,address:city,address:district 1,chen,allen,jiangsu,nanjing,xuanwu 2,chen,henry,jiangsu,yancheng,jianhu 3,li,pola,jiangsu,nanjing,xuanwu 4,chen,angle,anhui,hefei,daqin 5,fang,zhimin,anhui,wuhu,huijia 6,ge,you,beijing,chaoyang,henan 7,li,zhengming,jiangsu,nanjing,gulou
二、pom.xml(maven-quickstart)
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> </dependencies>
三、目錄
1.base基礎類
package cn.kb08.hbase.core; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import java.io.IOException; public class Base { /** * 根據配置資訊獲取連線物件 * @return * @throws IOException */ private Connection getCon() throws IOException { Configuration config = HBaseConfiguration.create(); // 一個是hadoop的配置,一個是hbase的配置 config.addResource("/opt/bigdata/hadoop/hadoop260/etc/hadoop/core-site.xml"); config.addResource("/opt/bigdata/hadoop/hbase120/conf/hbase-site.xml"); return ConnectionFactory.createConnection(config); } /** * 根據連線物件獲取管理物件:操作物件(比如表,名稱空間...) * @return * @throws IOException */ public Admin admin() throws IOException { return getCon().getAdmin(); } /** * 根據連線物件獲取資料表物件:操作表資料(比如:增刪改查) * @param tabName * @return * @throws IOException */ public Table table(String tabName) throws IOException { return getCon().getTable(TableName.valueOf(tabName)); } }
2.dao
package cn.kb08.hbase.core; import org.apache.commons.collections.map.ListOrderedMap; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import java.io.*; import java.util.*; public class Dao { private Base base = new Base(); private Parser parser = new Parser(); // 建立表需要:tablename + families public void createTable(String tableName,String[] colFamilies){ Admin admin = null; try { admin = base.admin(); TableName tn = TableName.valueOf(tableName); if(!admin.isTableAvailable(tn)){ HTableDescriptor table = new HTableDescriptor(tn); for (int i = 0; i < colFamilies.length; i++) { table.addFamily(new HColumnDescriptor(colFamilies[i])); } admin.createTable(table); System.out.println("建立表"+tn.getNameAsString()+"成功"); }else{ System.out.println("表"+tn.getNameAsString()+"已存在"); } } catch (IOException e) { e.printStackTrace(); } } public void dropTable(String tableName){ Admin admin = null; TableName tn = TableName.valueOf(tableName); try { admin = base.admin(); if(admin.isTableAvailable(tn)){ admin.disableTable(tn); admin.deleteTable(tn); System.out.println("刪除表"+tn.getNameAsString()+"成功"); }else{ System.out.println("表"+tn.getNameAsString()+"不存在"); } } catch (IOException e) { e.printStackTrace(); } } public void put(String tableName,String filePath){ File file = new File(filePath); if(!file.exists() || !file.isFile() || file.length()==0){ System.out.println("檔案"+file.getAbsolutePath()+"不存在"); return; } BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); String line = null; boolean first = true; Table table = base.table(tableName); Map<String, List<String>> family = new ListOrderedMap(); while(null != (line=reader.readLine())){ // 第一行:新增列族的列,構建結構 if(first){ //rowkey,name:firstname,name:lastname,address:province,... String[] families = line.split(","); String _family,_col; for (int i = 1; i < families.length; i++) { String[] cols = families[i].split(":"); _family = cols[0]; _col = cols[1]; if(family.containsKey(_family)){ family.get(_family).add(_col); }else{ List<String> cs = new ArrayList<>(); cs.add(_col); family.put(_family,cs); } } System.out.println("---TABLE STRUCTURE from FILE----"); for (Map.Entry<String, List<String>> entry : family.entrySet()) { System.out.println(entry.getKey()); for (String s : entry.getValue()) { System.out.println("- "+s); } } System.out.println("-------------------------------"); first = false; }else{ // rowkey,value1,value2,... String[] split = line.split(","); int ix = 0; // 1.Put(rowkey) 2.put.addcolumn Put put = new Put(Bytes.toBytes(split[ix++])); for (Map.Entry<String, List<String>> entry : family.entrySet()) { String _family = entry.getKey(); // _col: 每個family對應的每個列名 for (String _col : entry.getValue()) { put.addColumn(Bytes.toBytes(_family),Bytes.toBytes(_col),Bytes.toBytes(split[ix++])); } } table.put(put); } } reader.close(); System.out.println("新增表"+table.getName().getNameAsString()+"資訊成功"); return; } catch (Exception e) { e.printStackTrace(); } } public void delete(String tableName,String[] rowKeys){ Table table = null; try { table = base.table(tableName); List<Delete> dels = new ArrayList<>(rowKeys.length); for (int i = 0; i < rowKeys.length; i++) { dels.add(new Delete(Bytes.toBytes(rowKeys[i]))); } table.delete(dels); System.out.println("刪除表"+table.getName().getNameAsString()+"主鍵為 ["+ Arrays.toString(rowKeys)+"]的資料成功"); } catch (IOException e) { e.printStackTrace(); } } public void get(String tableName,String rowKey,String...others){ Get get = new Get(Bytes.toBytes(rowKey)); // 查family if(others.length==1){ get.addFamily(Bytes.toBytes(others[0])); // 查family, column }else if(others.length==2){ get.addColumn(Bytes.toBytes(others[0]),Bytes.toBytes(others[1])); } Result result = null; try { result = base.table(tableName).get(get); Table table = base.table(tableName); if(others.length==0){ // 查所有的列族 // 構建包含 列 的查詢結構 Map<String, List<String>> structure = parser.parse(table, result); parser.print(result,structure); }else if(others.length==1){ // other[0] : 某列族 // 查該列族所有的列 // 如果others有值,檢驗是否合理 Map<String, List<String>> structure = parser.parse(table, result,others[0]); parser.print(result,structure); }else{ // get一行:result.getValue(cf , c ) System.out.println(Bytes.toString(result.getValue(Bytes.toBytes(others[0]),Bytes.toBytes(others[1])))); } } catch (IOException e) { e.printStackTrace(); } } public void scan(String tableName,String...rowKeys){ Scan scan = new Scan(); String[] families = null; Table table = null; try { table = base.table(tableName); if(rowKeys.length==0){ // 查詢所有列族 for (HColumnDescriptor columnFamily : table.getTableDescriptor().getColumnFamilies()) { scan.addFamily(columnFamily.getName()); } }else{ //****** StartRow <= scan < StopRow ******* scan.setStartRow(Bytes.toBytes(rowKeys[0])); scan.setStopRow(Bytes.toBytes(rowKeys[rowKeys.length-1])); } Iterator<Result> it = table.getScanner(scan).iterator(); if(it.hasNext()){ Result next = it.next(); // 無論是否有rowkey,都需要查所有的列族 Map<String, List<String>> parse = parser.parse(table, next); System.out.println("-----TABLE STRUCTURE------"); for (Map.Entry<String, List<String>> entry : parse.entrySet()) { System.out.println(entry.getKey()); for (String s : entry.getValue()) { System.out.println("- "+s); } } System.out.println("--------------------------"); parser.print(next,parse); while (it.hasNext()){ next = it.next(); System.out.println("----------next-----------"); parser.print(next,parse); } } } catch (IOException e) { e.printStackTrace(); } } public void filter(String tableName,String columnFamily,String column,String value){ //建立單列過濾條件 SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes(columnFamily),Bytes.toBytes(column), CompareFilter.CompareOp.EQUAL,Bytes.toBytes(value)); Scan scan = new Scan(); scan.setFilter(filter);//新增過濾條件 Table table = null; try { table = base.table(tableName); for (HColumnDescriptor _columnFamily : table.getTableDescriptor().getColumnFamilies()) { scan.addFamily(_columnFamily.getName()); } Iterator<Result> it = table.getScanner(scan).iterator(); if(it.hasNext()){ Result next = it.next(); Map<String, List<String>> parse = parser.parse(table, next); System.out.println("-----TABLE STRUCTURE------"); for (Map.Entry<String, List<String>> entry : parse.entrySet()) { System.out.println(entry.getKey()); for (String s : entry.getValue()) { System.out.println("- "+s); } } System.out.println("--------------------------"); parser.print(next,parse); while (it.hasNext()){ next = it.next(); System.out.println("----------next-----------"); parser.print(next,parse); } } } catch (IOException e) { e.printStackTrace(); } } }
3.parser工具類
package cn.kb08.hbase.core; import org.apache.commons.collections.map.ListOrderedMap; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; public class Parser { /** * 根據Table和Result解析表結構 * @param table 資料表:列族 * @param result 結果集:根據列族解析列 * @param cf 可能有family * @return * @throws IOException */ public Map<String,List<String>> parse(Table table, Result result, String...cf) throws IOException { // 根據tablename獲得所有的列族 HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); Map<String,List<String>> _families = new ListOrderedMap(); for (HColumnDescriptor family : families) { //單列族過濾(在此需求中等同於多列族過濾) // if(null != cf && null !=cf[0] && !family.getNameAsString().equals(cf[0])){ // continue; // } // ******檢查演算法******多列族過濾*********(此需求中沒有用到傳參多個列族) if(null != cf && cf.length>0){ boolean notIn = true; for (String s : cf) { // 傳參cf中列族的名稱 是包含在 table 裡的, if(s.equals(family.getNameAsString())){ notIn = false; break; } } // 只有符合條件的cf才會往下走,所有不符合條件的列族都不會執行下一步 if(notIn){ continue; } } // column - value NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(family.getName()); // keyset:所有的列 for (byte[] bytes : familyMap.keySet()) { String name = family.getNameAsString(); if(_families.containsKey(name)){ // 列族 - 列名 _families.get(name).add(Bytes.toString(bytes)); }else{ List<String> cols = new ArrayList<>(); cols.add(Bytes.toString(bytes)); _families.put(name,cols); } } } return _families; } /** * 動態解析Result為字串,並輸出 * @param rst 結果集 * @param structure 表結構 */ public void print(Result rst,Map<String,List<String>> structure){ StringBuilder builder = new StringBuilder(); boolean first = true; for (Map.Entry<String, List<String>> entry : structure.entrySet()) { String family = entry.getKey(); for (String col : entry.getValue()) { if(first) first= false; else builder.append("\t"); builder.append(Bytes.toString(rst.getValue(Bytes.toBytes(family),Bytes.toBytes(col)))); } } System.out.println(builder.toString()); } }
4.app執行類
public class App { //args: 0操作型別,1表名,2 rowkey,3列族,4列 public static void main(String[] args) throws IOException { Dao dao = new Dao(); String type = args[0]; if(type.startsWith("tb")){ if(args.length<2){ return; } if(type.endsWith("Create")){ //建立表: 0操作型別, 1表名, 2列族 String[] families = new String[args.length - 2]; System.arraycopy(args,2,families,0,families.length); dao.createTable(args[1],families); }else if(type.endsWith("Drop")){ //刪除表 dao.dropTable(args[1]); } }else if(type.startsWith("dt")){ //表資料操作 if(args.length<2){ return; } if(type.endsWith("Put")){ //新增資料:資料來源為檔案 if(args.length<3){ return; } dao.put(args[1],args[2]); }else if(type.endsWith("Del")){ //刪除資料:根據入口引數提供的RowKeys if(args.length<3){ return; } String[] rks = new String[args.length - 2]; System.arraycopy(args,2,rks,0,rks.length); dao.delete(args[1],rks); }else if(type.endsWith("Get")){ //Get查詢: 一行,一族,一列 if(args.length<3){ return; } // 列族 or 列族 列 String[] rks = new String[args.length - 3]; System.arraycopy(args,3,rks,0,rks.length); // 塞進表名+rowkey+() dao.get(args[1],args[2],rks); }else if(type.endsWith("Scan")){ //查詢表中所有資料 String[] rks = new String[args.length - 2]; System.arraycopy(args,2,rks,0,rks.length); dao.scan(args[1],rks); }else if(type.endsWith("Filter")){ //條件查詢 if(args.length<5){ return; } dao.filter(args[1],args[2],args[3],args[4]); } } } }
四、打jar包:打成胖jar包(包含maven依賴,這樣linux上不安裝maven也可以用)
*** 一定要在 MENIFEST.MF 中新增啟動入口:Main-Class: cn.kb08.hbase.App
才可以使用 java -jar xxx.jar 傳參a b c
五、測試命令:
tbCreate student name address
dtPut student /root/kb08/hbase/hbaseFile.txt
dtDel student 1 2
dtGet student 3
dtGet student 3 address
dtGet student 3 address city
dtScan student
dtScan student 3
dtScan student 3 5 【查的是rowkey= 3 & 4 的資料!】
dtFilter student address city nanjing 根據<表名-列族-列的值>查詢【等同於查詢rowkey】