1. 程式人生 > >21 大資料 --javaapi 訪問 hbase

21 大資料 --javaapi 訪問 hbase

Hbase介紹
HBASE是一個高可靠性、高效能、面向列、可伸縮的分散式儲存系統,利用HBASE技術可在廉價PC Server上搭建起大規模結構化儲存叢集。
HBASE的目標是儲存並處理大型的資料,更具體來說是僅需使用普通的硬體配置,就能夠處理由成千上萬的行和列所組成的大型資料。
HBASE是Google Bigtable的開源實現,但是也有很多不同之處。比如:Google Bigtable利用GFS作為其檔案儲存系統,HBASE利用Hadoop HDFS作為其檔案儲存系統;Google執行MAPREDUCE來處理Bigtable中的海量資料,HBASE同樣利用Hadoop MapReduce來處理HBASE中的海量資料;Google Bigtable利用Chubby作為協同服務,HBASE利用Zookeeper作為對應。

  <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>0.99.2</version>
    </dependency>

 1、建立表

public class HBaseTest {

    Configuration config = null;
    private Connection connection = null;
    private Table table = null;

    @Before
    public void init() throws Exception {
        config = HBaseConfiguration.create();// 配置
        config.set("hbase.zookeeper.quorum", "192.168.25.127,192.168.25.129,192.168.25.130");// zookeeper地址
        config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper埠
        connection = ConnectionFactory.createConnection(config);
        table = connection.getTable(TableName.valueOf("user1"));
    }
    /**
     * 建立表
     * @throws Exception
     */
    @Test
    public void testCreateTable() throws Exception{
        //建立表管理類
        HBaseAdmin admin = new HBaseAdmin(config);
        //建立表描述類
        TableName tableName = TableName.valueOf("user2");
        HTableDescriptor descriptor = new HTableDescriptor(tableName);
        //建立列族描述類
        HColumnDescriptor info1 = new HColumnDescriptor("info1");
        //列族加入表中
        descriptor.addFamily(info1);
        HColumnDescriptor info2 = new HColumnDescriptor("info2");
        descriptor.addFamily(info2);
        //建立表
        admin.createTable(descriptor);
    }
}

如果執行之後發現程式卡住不動,或者過了很久之後出現下面的異常

org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=35, exceptions:
...
Caused by: org.apache.hadoop.hbase.MasterNotRunningException: com.google.protobuf.ServiceException: java.net.UnknownHostException: unknown host: mini1
...
Caused by: com.google.protobuf.ServiceException: java.net.UnknownHostException: unknown host: mini1
...

首先確保關閉了hadoop的安全模式,然後linux下的ip地址跟主機名必須對應,最後windows下的ip地址跟主機名也要對應,我這在linux下/etc/hosts檔案中

[[email protected] ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6 localhost.jinbm
192.168.25.127 mini1
192.168.25.129 mini2
192.168.25.130 mini3

但是當時出現windows下的hosts檔案沒有配置後三個對映導致出現上面的異常。

執行之後去hbase叢集檢視
user2表已經建立

hbase(main):002:0> list
TABLE                                                                                                                                                                             
user1                                                                                                                                                                             
user2                                                                                                                                                                             
2 row(s) in 0.0130 seconds

=> ["user1", "user2"]

2、刪除表
刪除表跟shell命令一樣,也是要先disable表之後才能刪除

 @Test
    public void testDeleteTable() throws Exception{
        HBaseAdmin admin = new HBaseAdmin(config);
        admin.disableTable("user2");
        admin.deleteTable("user2");
    }

3、單條插入(修改)

  /**
     * 向表中插入資料
     * 單條插入(包括修改)
     * @throws Exception
     */
    @Test
    public void testPut() throws Exception{
        //rowkey
        Put put = new Put(Bytes.toBytes("1234"));
        //列族,列,值
        put.add(Bytes.toBytes("info1"), Bytes.toBytes("gender"), Bytes.toBytes("1"));
        put.add(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("wangwu"));
        table.put(put);
        //提交
        table.flushCommits();
    }

檢視

hbase(main):008:0> scan 'user1'
ROW                                           COLUMN+CELL                                                                                                                         
 1234                                         column=info2:age, timestamp=1509315527064, value=18                                                                                 
 1234                                         column=info2:name, timestamp=1509315500250, value=zhangsan                                                                          
 12345                                        column=info2:age, timestamp=1509315533683, value=18                                                                                 
 12345                                        column=info2:name, timestamp=1509315548481, value=lisi                                                                              
2 row(s) in 0.1030 seconds

hbase(main):009:0> scan 'user1'
ROW                                           COLUMN+CELL                                                                                                                         
 1234                                         column=info1:gender, timestamp=1509315890353, value=1                                                                               
 1234                                         column=info2:age, timestamp=1509315527064, value=18                                                                                 
 1234                                         column=info2:name, timestamp=1509315890353, value=wangwu                                                                            
 12345                                        column=info2:age, timestamp=1509315533683, value=18                                                                                 
 12345                                        column=info2:name, timestamp=1509315548481, value=lisi                                                                              
2 row(s) in 0.0440 seconds

發現添加了一條資料和修改了一條資料

4、批量插入資料
Table有2個過載的方法,一個是table.put(Put put)也就是單條插入,一個是table.put(list)list泛型是Put,這就是批量插入。

  /**
     * 向表中插入資料
     * 多條插入,使用list
     * @throws Exception
     */
    @Test
    public void testPut2() throws Exception{
        //可以通過將自動重新整理設定為false來啟用緩衝區
        table.setAutoFlushTo(false);
        //設定資料將被寫入的緩衝區大小
        table.setWriteBufferSize(534534534);
        List<Put> putList = new ArrayList<>();
        for (int i=20;i<=30;i++){
            //rowkey
            Put put = new Put(Bytes.toBytes("jbm_"+i));
            //列族,列,值
            put.add(Bytes.toBytes("info1"), Bytes.toBytes("age"), Bytes.toBytes(i));
            put.add(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lucy"+i));
            putList.add(put);
        }
        table.put(putList);
        //提交
        table.flushCommits();
    }

執行之後檢視

hbase(main):010:0> scan 'user1'

5、修改資料

/**
     * 修改資料
     * @throws Exception
     */
    @Test
    public void testUpdate() throws Exception{
        Put put = new Put(Bytes.toBytes("1234"));
        put.add(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("tom"));
        table.put(put);
        table.flushCommits();
    }

執行之後檢視

hbase(main):010:0> scan 'user1'
ROW                                           COLUMN+CELL                                                                                                                         
 1234                                         column=info1:gender, timestamp=1509315890353, value=1                                                                               
 1234                                         column=info2:age, timestamp=1509315527064, value=18                                                                                 
 1234                                         column=info2:name, timestamp=1509315890353, value=wangwu  
 ...
 hbase(main):013:0> scan 'user1'
ROW                                           COLUMN+CELL                                                                                                                         
 1234                                         column=info1:gender, timestamp=1509315890353, value=1                                                                               
 1234                                         column=info2:age, timestamp=1509315527064, value=18                                                                                 
 1234                                         column=info2:name, timestamp=1509316978243, value=tom    

發現wangwu已被改為了tom

6、刪除整行資料
刪除rowkey為1234整行資料

 /**
     * 刪除資料
     * @throws Exception
     */
    @Test
    public void testDeleteData() throws Exception{
        Delete delete = new Delete(Bytes.toBytes("1234"));
        table.delete(delete);
        table.flushCommits();
    }

7、單條查詢

  /**
     * 單條查詢
     * @throws Exception
     */
    @Test
    public void testGetSingle() throws Exception{
        //rowkey
        Get get = new Get(Bytes.toBytes("12345"));
        Result result = table.get(get);
        //列族,列名
        byte[] name = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"));
        byte[] age = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"));
        System.out.println(Bytes.toString(name));
        System.out.println(Bytes.toString(age));
    }

執行後hbase檢視和控制檯檢視

hbase(main):001:0> scan 'user1'
 12345                                        column=info2:age, timestamp=1509315533683, value=18                                                                                 
 12345                                        column=info2:name, timestamp=1509315548481, value=lisi  
...
控制檯輸出
lisi
18

8、多條查詢
這裡叫做掃描更適合吧,先用全表掃描,和命令列的scan ‘表名’一樣

 /**
     * 多條查詢
     * 全表掃描
     * @throws Exception
     */
    @Test
    public void testGetMany() throws Exception{

        Scan scan = new Scan();
        //字典序   類似於分頁
        scan.setStartRow(Bytes.toBytes("jbm_20"));
        scan.setStopRow(Bytes.toBytes("jbm_30"));
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            //Single row result of a Get or Scan query. Result
            //Result 一次獲取一個rowkey對應的記錄
            //列族,列名
            byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
            byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
            System.out.print(Bytes.toString(name)+",");
            System.out.print(Bytes.toInt(age));
            System.out.println();
        }

    }

執行控制檯輸出結果

lucy20,20
lucy21,21
lucy22,22
lucy23,23
lucy24,24
lucy25,25
lucy26,26
lucy27,27
lucy28,28
lucy29,29

9、Hbase過濾器
1)、列值過濾器
SingleColumnValueFilter
過濾列值的相等、不等、範圍

   /**
     * 全表掃描過濾器
     * 列值過濾器
     * @throws Exception
     */
    @Test
    public void testFilter() throws Exception{

        Scan scan = new Scan();
        //列值過濾器
        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info1"), 
                Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("lisi"));
        //設定過濾器
        scan.setFilter(columnValueFilter);
        //獲取結果集
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            byte[] name = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"));
            byte[] age = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"));
            System.out.print(Bytes.toString(name)+",");
            System.out.print(Bytes.toString(age));
            System.out.println();
        }
    }

執行檢視輸出

hbase(main):001:0> scan 'user1'
ROW                                           COLUMN+CELL                                                                                                                         
 12345                                        column=info2:age, timestamp=1509315533683, value=18                                                                                 
 12345                                        column=info2:name, timestamp=1509315548481, value=lisi                                                                              
 jbm_20                                       column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x14   
 ...
控制檯輸出
lisi,18

2)、rowkey過濾器
RowFilter 通過正則,過濾rowKey值。

 /**
     * 全表掃描過濾器
     * rowkey過濾
     * @throws Exception
     */
    @Test
    public void testRowkeyFilter() throws Exception{

        Scan scan = new Scan();
        //rowkey過濾器
        //匹配以jbm開頭的
        RowFilter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm"));
        //設定過濾器
        scan.setFilter(filter);
        //獲取結果集
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
            byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
            System.out.print(Bytes.toString(name)+",");
            System.out.print(Bytes.toInt(age));
            System.out.println();
        }
    }

控制檯輸出

lucy20,20
lucy21,21
lucy22,22
lucy23,23
lucy24,24
lucy25,25
lucy26,26
lucy27,27
lucy28,28
lucy29,29
lucy30,30

3)、列名字首過濾器
ColumnPrefixFilter列名字首過濾

 /**
     * 全表掃描過濾器
     * 列名字首過濾
     * @throws Exception
     */
    @Test
    public void testColumnPrefixFilter() throws Exception{

        Scan scan = new Scan();
        //列名字首過濾器 列名字首為na(注:不是指值的字首)
        ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));
        //設定過濾器
        scan.setFilter(filter);
        //獲取結果集
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
            byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
            if(name!=null){
                System.out.print(Bytes.toString(name)+" ");
            }
            if(age!=null){
                System.out.print(age);
            }
            System.out.println();
        }
    }

從輸出結果就能看到,只會拿到name列,age列是拿不到的

lucy20 
lucy21 
lucy22 
lucy23 
lucy24 
lucy25 
lucy26 
lucy27 
lucy28 
lucy29 
lucy30

4)、過濾器集合

/**
     * 全表掃描過濾器
     * 過濾器集合
     * @throws Exception
     */
    @Test
    public void testFilterList() throws Exception{
        Scan scan = new Scan();
        //過濾器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
        FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
        //ROWKEY過濾器
        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm"));
        //列值過濾器     age大於25
        SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info1"), 
                Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes(25));
        filterList.addFilter(columnValueFilter);
        filterList.addFilter(rowFilter);
        //設定過濾器
        scan.setFilter(filterList);
        //獲取結果集
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
            byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
            if(name!=null){
                System.out.print(Bytes.toString(name)+" ");
            }
            if(age!=null){
                System.out.print(Bytes.toInt(age)+" ");
            }
            System.out.println();
        }
    }

輸出

lucy26 26 
lucy27 27 
lucy28 28 
lucy29 29 
lucy30 30