JavaAPI操作hbase
阿新 • • 發佈:2017-11-27
info r文件 lean lose tar 需要 vol buffere 實例化
JavaAPI操作Hbase
準備工作:
- 確保集群各節點服務運行正常
- 確保zookeeper可以正常工作
- 已經開始hbase-master,hbase-regionserver
- 環境:windows7下eclipse,集群正常工作。
- 由於我們是在windows7下運行,所以我們將服務器中的hadoop程序拷貝到本地。
- 在系統的環境變量中設置HADOOP_HOME,並且將%HADOOP_HOME%/bin添加到path當中。
- 接下來在運行中,執行程序的時候eclipse會報錯說找不到winutils.exe。所以我們還要下載一個winutils.exe,我這裏提供一個github上的下載鏈接https://github.com/srccodes/hadoop-common-2.2.0-bin,下載以後解壓到bin下即可。(版本雖然很舊,但是可以使用)
連接hbase並進行簡單操作:
- 創建JavaProject,導入需要的jar包,jar包來自於服務器上hbase中的lib文件夾下的jar文件,所以將lib直接拷貝到當前工程中,並build path(導入這些額外的包)。
- 同時拷貝hbase下的log4j.properties文件到項目中,在執行過程中可以查看到執行過程中產生的日誌。
- 創建連接hbase需要的配置信息
- Java客戶端其實就是shell客戶端的一種實現,操作命令基本上就是shell客戶端命令的一個映射。
- Java客戶端使用的配置信息是被映射到了HbaseConfiguration的實例對象中的,使用create方法創建實例化對象的時候,會從classpath中獲取hbase-site.xml文件並進行配置文件內容的讀取。同時也會讀取hadoop的配置文件信息。這裏我們給定zookeeper的相關配置信息即可。
- 流程:先通過zookeeper拿到hbase:namespace的路徑,然後從這個路徑中拿到hbase:meta表的信息,接著就拿到了用戶表的路徑
- 代碼實現如下
1 package com.hblink.demo; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 8 public class Hblink { 9 /** 10 * 獲取hbase的配置文件信息
- 有了配置信息以後,我們開始通過配置信息連接hbase
HBaseAdmin類:是主要進行DDL操作相關的一個接口類,主要包括命名空間管理,用戶表管理。通過該接口我們可以創建、刪除、獲取用戶表,也可以進行用戶表的分割,緊縮等操作。
HTable類:是hbase中的用戶表的一個映射的java實例,通過該類進行表數據的操作,包括數據的增刪改查,也就是在這裏我們可以類似shell中put,get和sacn進行數據的操作。HTableDescriptor類:是hbase用戶表的具體描述信息類,一般我們創建表獲取表信息,就是通過該類進行的。
1 package com.hblink.test; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.hbase.HColumnDescriptor; 7 import org.apache.hadoop.hbase.HTableDescriptor; 8 import org.apache.hadoop.hbase.TableName; 9 import org.apache.hadoop.hbase.client.HBaseAdmin; 10 11 import com.hblink.demo.Hblink; 12 13 public class HbTest { 14 public static void main(String[] args) throws Exception { 15 Configuration configuration = Hblink.getHBaseConfiguration(); 16 HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); 17 try { 18 createTestTable(hBaseAdmin); 19 } finally { 20 hBaseAdmin.close(); // 資源釋放 21 } 22 } 23 24 /** 25 * 測試創建表table 26 * 27 * @throws IOException 28 */ 29 static void createTestTable(HBaseAdmin hbAdmin) throws IOException { 30 TableName tableName = TableName.valueOf("stock-info"); // 創建表名 31 HTableDescriptor hDescriptor = new HTableDescriptor(tableName); 32 hDescriptor.addFamily(new HColumnDescriptor("f"));// 給定列族 33 hbAdmin.createTable(hDescriptor); 34 System.out.println("創建表成功!"); 35 } 36 }
- 接下來是數據插入
1 package com.hblink.test; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileInputStream; 6 import java.io.FileNotFoundException; 7 import java.io.IOException; 8 import java.io.InputStreamReader; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.hbase.client.HTable; 12 import org.apache.hadoop.hbase.client.Put; 13 import org.apache.hadoop.hbase.util.Bytes; 14 15 import com.hblink.demo.Hblink; 16 17 public class TableTest { 18 public static int count = 0; 19 20 public static void main(String[] args) throws IOException { 21 22 HTable hTable = null; 23 Configuration configuration = Hblink.getHBaseConfiguration(); 24 25 hTable = new HTable(configuration, "stock-info"); 26 27 testPut(hTable);//插入數據 28 29 hTable.close();//釋放資源 30 31 } 32 33 /** 34 * 測試往表裏插入數據 35 * 36 * @param hTable 37 * @throws IOException 38 */ 39 static void testPut(HTable hTable) throws IOException { 40 41 File file = new File("./20171120sh.txt"); //獲取本地文件 42 InputStreamReader isr = null; 43 try { 44 isr = new InputStreamReader(new FileInputStream(file), "utf-8"); 45 } catch (FileNotFoundException e) { 46 e.printStackTrace(); 47 } 48 if (isr == null) { 49 return; 50 } 51 BufferedReader br = new BufferedReader(isr); 52 String re = ""; 53 while ((re = br.readLine()) != null) { 54 String[] sarr = re.split(","); 55 // System.out.println(sarr[0] + "-" + sarr[1] + "-" + sarr[2] + "-" + sarr[3] + 56 // sarr[4] + "-" + sarr[5] + "-" + sarr[6]); 57 // System.out.println(sarr[0]); 58 59 Put put = new Put(Bytes.toBytes(sarr[0])); 60 put.add(Bytes.toBytes("f"), Bytes.toBytes("Stock"), Bytes.toBytes(sarr[1])); 61 put.add(Bytes.toBytes("f"), Bytes.toBytes("Date"), Bytes.toBytes(sarr[2])); 62 put.add(Bytes.toBytes("f"), Bytes.toBytes("Top"), Bytes.toBytes(sarr[3])); 63 put.add(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"), Bytes.toBytes(sarr[4])); 64 put.add(Bytes.toBytes("f"), Bytes.toBytes("Volume"), Bytes.toBytes(sarr[5])); 65 put.add(Bytes.toBytes("f"), Bytes.toBytes("Turnover"), Bytes.toBytes(sarr[6])); 66 hTable.put(put); 67 count++; 68 } 69 System.out.println("插入" + (count - 1) + "條數據成功!"); 70 } 71 }
- 對表中數據的查詢
1 package com.hblink.test; 2 3 import java.io.IOException; 4 import java.util.Scanner; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.Result; 9 import org.apache.hadoop.hbase.client.ResultScanner; 10 import org.apache.hadoop.hbase.client.Scan; 11 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 12 import org.apache.hadoop.hbase.filter.Filter; 13 import org.apache.hadoop.hbase.filter.RegexStringComparator; 14 import org.apache.hadoop.hbase.filter.RowFilter; 15 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 16 import org.apache.hadoop.hbase.util.Bytes; 17 18 import com.hblink.demo.Hblink; 19 20 public class ScanHbase { 21 public static Boolean flag = true; 22 public static String string = null; 23 24 public static void main(String[] args) throws IOException { 25 while (flag) { 26 HTable hTable = null; 27 Configuration configuration = Hblink.getHBaseConfiguration(); 28 hTable = new HTable(configuration, "stock-info"); 29 30 Scanner sc = new Scanner(System.in); 31 System.out.print("請輸入需要查詢的股票代碼:"); 32 string = sc.next(); 33 34 // scanTestCell(hTable); 35 scanTestRow(hTable); 36 hTable.close(); 37 if (string.equals("quit")) { 38 flag = false; 39 } 40 } 41 } 42 43 /** 44 * 通過列查詢 45 * 46 * @param hTable 47 * @throws IOException 48 */ 49 static void scanTestCell(HTable hTable) throws IOException { 50 51 // 設置過濾器 52 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("f"), 53 Bytes.toBytes("Date"), CompareOp.EQUAL, Bytes.toBytes(string)); 54 // 設置全表掃描封裝類 55 Scan scan = new Scan(); 56 // 添加過濾器(通過股票代碼查詢) 57 scan.setFilter(singleColumnValueFilter); 58 // 掃描 59 ResultScanner resultScanner = hTable.getScanner(scan); 60 for (Result result : resultScanner) { 61 byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date")); 62 byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock")); 63 byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top")); 64 byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate")); 65 byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume")); 66 byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover")); 67 68 System.out.print(Bytes.toString(data) + ";"); 69 System.out.print(Bytes.toString(stock) + ";"); 70 if (Bytes.toString(top).equals("--")) { 71 System.out.print(Bytes.toString(top) + ";"); 72 } else { 73 System.out.print(Bytes.toInt(top) + ";"); 74 } 75 System.out.print(Bytes.toString(change_rate) + ";"); 76 System.out.print(Bytes.toString(volume) + ";"); 77 System.out.print(Bytes.toString(turnover)); 78 System.out.println(); 79 80 } 81 82 } 83 84 /** 85 * 通過正則--匹配行鍵 86 * 87 * @param hTable 88 * @throws IOException 89 */ 90 static void scanTestRow(HTable hTable) throws IOException { 91 RegexStringComparator re = new RegexStringComparator("^" + string + ""); 92 Filter filter = new RowFilter(CompareOp.EQUAL, re); 93 Scan scan = new Scan(); 94 // 添加過濾器(通過股票代碼查詢) 95 scan.setFilter(filter); 96 // 掃描 97 ResultScanner resultScanner = hTable.getScanner(scan); 98 for (Result result : resultScanner) { 99 byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date")); 100 byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock")); 101 byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top")); 102 byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate")); 103 byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume")); 104 byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover")); 105 106 System.out.print(Bytes.toString(data) + ";"); 107 System.out.print(Bytes.toString(stock) + ";"); 108 if (Bytes.toString(top).equals("--")) { 109 System.out.print(Bytes.toString(top) + ";"); 110 } else { 111 System.out.print(Bytes.toInt(top) + ";"); 112 } 113 System.out.print(Bytes.toString(change_rate) + ";"); 114 System.out.print(Bytes.toString(volume) + ";"); 115 System.out.print(Bytes.toString(turnover)); 116 System.out.println(); 117 118 } 119 } 120 }
ps.繼續學習中=====
JavaAPI操作hbase