1. 程式人生 > >流式大資料計算實踐(5)----HBase使用

流式大資料計算實踐(5)----HBase使用

一、前言

1、上文中我們搭建好了一套HBase叢集環境,這一文我們學習一下HBase的基本操作和客戶端API的使用

二、shell操作

先通過命令進入HBase的命令列操作

/work/soft/hbase-1.2.2/bin/hbase shell

1、建表

create 'test', 'cf'

(1)以上命令是建立一個test表,裡面有一個列族cf

(2)與RDS不同,HBase的列不是必須的,當向列族中插入一個單元格資料時,才有了列

2、檢視所有表

list

3、查看錶屬性

describe 'test'

4、增加列族

alter '
test', 'cf2'

5、插入資料

put 'test', 'row1', 'cf:name', 'jack'

(1)命令解釋:向test表中的row1行插入列族cf,列名name的資料jack

6、查詢行資料

scan 'test', {STARTROW => 'row3'}
scan 'test', {ENDROW => 'row4'}

(1)命令解釋:查詢test表中rowkey大於等於row3的資料

(2)命令解釋:查詢test表中rowkey小於row4的資料(不包括row4)

7、查詢單元格資料

get 'test', 'row7
', 'cf:name'

8、刪除資料

delete 'test', 'row4', 'cf:name'

(1)命令解釋:刪除test表中row4行的cf:name列的單元格資料

三、客戶端API

正常開發中操作HBase多數情況下通過客戶端API操作,我們這裡使用Java來操作,jdk要求至少1.7以上,編譯器我這裡用的是IntelliJ IDEA

(1)新建一個maven工程

(2)開啟pom檔案,引入HBase的依賴

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1
.1</version> </dependency>

(3)將HBase的相關配置檔案引入到我們的maven專案中,拷貝HBase目錄下的hbase-site.xml和Hadoop目錄下的core-site.xml,將兩個檔案複製到src/main/resources目錄下

(4)記得將前文中虛擬機器的IP和hostname對映配置到寫程式碼這臺機器的hosts檔案中(比如win7的hosts目錄為C:\Windows\System32\drivers\etc)

(5)新建一個類,編寫CRUD的示例程式碼,下面程式碼用了jdk1.7的一個語法糖:try-with-resources,在try()裡面宣告的物件,會自動幫你呼叫物件的close方法來關閉物件,不用手動呼叫close(),非常方便

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.net.URISyntaxException;

public class HelloHBase
{
    public static void main(String[] args) throws URISyntaxException
    {
        // 載入HBase的配置
        Configuration configuration = HBaseConfiguration.create();

        // 讀取配置檔案
        configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
        configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));

        try (// 建立一個HBase連線
             Connection connection = ConnectionFactory.createConnection(configuration);
             // 獲得執行操作的管理介面
             Admin admin = connection.getAdmin();)
        {
            // 新建一個表名為mytable的表
            TableName tableName = TableName.valueOf("mytable");
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

            // 新建一個列族名為mycf的列族
            HColumnDescriptor mycf = new HColumnDescriptor("mycf");
            // 將列族新增到表中
            tableDescriptor.addFamily(mycf);
            // 執行建表操作
            createOrOverwrite(admin, tableDescriptor);

            // 設定列族的壓縮方式為GZ
            mycf.setCompactionCompressionType(Compression.Algorithm.GZ);
            // 設定最大版本數量(ALL_VERSIONS實際上就是Integer.MAX_VALUE)
            mycf.setMaxVersions(HConstants.ALL_VERSIONS);
            // 列族更新到表中
            tableDescriptor.modifyFamily(mycf);
            // 執行更新操作
            admin.modifyTable(tableName, tableDescriptor);

            // 新增一個列族
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("newcf");
            hColumnDescriptor.setCompactionCompressionType(Compression.Algorithm.GZ);
            hColumnDescriptor.setMaxVersions(HConstants.ALL_VERSIONS);
            // 執行新增操作
            admin.addColumnFamily(tableName, hColumnDescriptor);

            // 獲取表物件
            Table table = connection.getTable(tableName);

            // 建立一個put請求,用於新增資料或者更新資料
            Put put = new Put(Bytes.toBytes("row1"));
            put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("jack"));
            table.put(put);

            // 建立一個append請求,用於在資料後面新增內容
            Append append = new Append(Bytes.toBytes("row1"));
            append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("son"));
            table.append(append);

            // 建立一個long型別的列
            Put put1 = new Put(Bytes.toBytes("row2"));
            put1.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), Bytes.toBytes(6L));
            // 建立一個增值請求,將值增加10L
            Increment increment = new Increment(Bytes.toBytes("row2"));
            increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L);
            table.increment(increment);

            // 建立一個查詢請求,查詢一行資料
            Get get = new Get(Bytes.toBytes("row1"));
            // 由於HBase的一行可能非常大,所以限定要取出的列族
            get.addFamily(Bytes.toBytes("mycf"));
            // 建立一個結果請求
            Result result = table.get(get);
            // 從查詢結果中取出name列,然後列印(這裡預設取最新版本的值,如果要取其他版本要使用Cell物件)
            byte[] name = result.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));

            // 建立一個掃描請求,查詢多行資料
            Scan scan = new Scan(Bytes.toBytes("row1"));
            // 設定掃描器的快取數量,遍歷資料時不用發多次請求,預設100,適當的快取可以提高效能
            scan.setCaching(150);
            // 建立掃描結果,這個時候不會真正從HBase查詢資料,下面的遍歷才是去查詢
            ResultScanner resultScanner = table.getScanner(scan);
            for (Result r : resultScanner)
            {
                String data = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
                System.out.println(data);
            }
            // 使用完畢關閉
            resultScanner.close();

            // 建立一個刪除請求
            Delete delete = new Delete(Bytes.toBytes("row2"));
            // 可以自定義一些篩選條件
            delete.addFamily(Bytes.toBytes("age"));
            table.delete(delete);

            // 停用表
            admin.disableTable(tableName);
            // 刪除列族
            admin.deleteColumnFamily(tableName, "mycf".getBytes());
            // 刪除表
            admin.deleteTable(tableName);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("ok");
    }

    public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException
    {
        // 獲取table名
        TableName tableName = table.getTableName();
        // 判斷table是否存在,如果存在則先停用並刪除
        if (admin.tableExists(tableName))
        {
            // 停用表
            admin.disableTable(tableName);
            // 刪除表
            admin.deleteTable(tableName);
        }
        // 建立表
        admin.createTable(table);
    }
}

 四、API的高階用法

上一章介紹了API的基本使用方法,這一章總結一些高階用法

1、過濾器:通過get或者scan查詢資料時,經常需要加入一些條件來查詢

(1)值過濾器:相當於傳統sql的where column like '%jack%',但是會對所有的列都做過濾,如果需要對單個列過濾,可以使用SingleColumnValueFilter,如果需要查詢值相等的過濾器,可以使用BinaryComparator

CompareFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("jack"));
scan.setFilter(filter);

(2)分頁過濾器:相當於傳統sql的limit,但是不能指定起始頁碼,所以需要自己記錄最後一個row key,並通過scan.setStartRow()設定,在做分頁時有個小技巧,如果你通過scan.setStartRow()設定最後一個row key時,下一頁的資料依然會包含上一頁的最後一個數據,所以你可以將最後一個row key的末尾加一個0,就可以不包含最後一個數據了,因為row key是按照字典順序排序的

Filter filter1 = new PageFilter(10L);
scan.setFilter(filter1);

(3)過濾器列表:用於組合多個過濾器,實現複雜一些的查詢場景,注意這個過濾器列表是有順序的,FilterList的第一個引數用來指定多個條件的連線方式(and、or),MUST_PASS_ALL相當於and連線,MUST_PASS_ONE相當於or連線

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(filter);
filterList.addFilter(filter1);
scan.setFilter(filterList);

(4)還有一些其他的過濾器,使用方法大同小異,比如行鍵過濾器、列過濾器、單元格過濾器,甚至可以自定義過濾器,其他高階用法可以等用到再看