流式大資料計算實踐(5)----HBase使用
一、前言
1、上文中我們搭建好了一套HBase叢集環境,這一文我們學習一下HBase的基本操作和客戶端API的使用
二、shell操作
先通過命令進入HBase的命令列操作
/work/soft/hbase-1.2.2/bin/hbase shell
1、建表
create 'test', 'cf'
(1)以上命令是建立一個test表,裡面有一個列族cf
(2)與RDS不同,HBase的列不是必須的,當向列族中插入一個單元格資料時,才有了列
2、檢視所有表
list
3、查看錶屬性
describe 'test'
4、增加列族
alter 'test', 'cf2'
5、插入資料
put 'test', 'row1', 'cf:name', 'jack'
(1)命令解釋:向test表中的row1行插入列族cf,列名name的資料jack
6、查詢行資料
scan 'test', {STARTROW => 'row3'}
scan 'test', {ENDROW => 'row4'}
(1)命令解釋:查詢test表中rowkey大於等於row3的資料
(2)命令解釋:查詢test表中rowkey小於row4的資料(不包括row4)
7、查詢單元格資料
get 'test', 'row7', 'cf:name'
8、刪除資料
delete 'test', 'row4', 'cf:name'
(1)命令解釋:刪除test表中row4行的cf:name列的單元格資料
三、客戶端API
正常開發中操作HBase多數情況下通過客戶端API操作,我們這裡使用Java來操作,jdk要求至少1.7以上,編譯器我這裡用的是IntelliJ IDEA
(1)新建一個maven工程
(2)開啟pom檔案,引入HBase的依賴
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.1</version> </dependency>
(3)將HBase的相關配置檔案引入到我們的maven專案中,拷貝HBase目錄下的hbase-site.xml和Hadoop目錄下的core-site.xml,將兩個檔案複製到src/main/resources目錄下
(4)記得將前文中虛擬機器的IP和hostname對映配置到寫程式碼這臺機器的hosts檔案中(比如win7的hosts目錄為C:\Windows\System32\drivers\etc)
(5)新建一個類,編寫CRUD的示例程式碼,下面程式碼用了jdk1.7的一個語法糖:try-with-resources,在try()裡面宣告的物件,會自動幫你呼叫物件的close方法來關閉物件,不用手動呼叫close(),非常方便
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.net.URISyntaxException; public class HelloHBase { public static void main(String[] args) throws URISyntaxException { // 載入HBase的配置 Configuration configuration = HBaseConfiguration.create(); // 讀取配置檔案 configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI())); configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI())); try (// 建立一個HBase連線 Connection connection = ConnectionFactory.createConnection(configuration); // 獲得執行操作的管理介面 Admin admin = connection.getAdmin();) { // 新建一個表名為mytable的表 TableName tableName = TableName.valueOf("mytable"); HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); // 新建一個列族名為mycf的列族 HColumnDescriptor mycf = new HColumnDescriptor("mycf"); // 將列族新增到表中 tableDescriptor.addFamily(mycf); // 執行建表操作 createOrOverwrite(admin, tableDescriptor); // 設定列族的壓縮方式為GZ mycf.setCompactionCompressionType(Compression.Algorithm.GZ); // 設定最大版本數量(ALL_VERSIONS實際上就是Integer.MAX_VALUE) mycf.setMaxVersions(HConstants.ALL_VERSIONS); // 列族更新到表中 tableDescriptor.modifyFamily(mycf); // 執行更新操作 admin.modifyTable(tableName, tableDescriptor); // 新增一個列族 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("newcf"); hColumnDescriptor.setCompactionCompressionType(Compression.Algorithm.GZ); hColumnDescriptor.setMaxVersions(HConstants.ALL_VERSIONS); // 執行新增操作 admin.addColumnFamily(tableName, hColumnDescriptor); // 獲取表物件 Table table = connection.getTable(tableName); // 建立一個put請求,用於新增資料或者更新資料 Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("jack")); table.put(put); // 建立一個append請求,用於在資料後面新增內容 Append append = new Append(Bytes.toBytes("row1")); append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("son")); table.append(append); // 建立一個long型別的列 Put put1 = new Put(Bytes.toBytes("row2")); put1.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), Bytes.toBytes(6L)); // 建立一個增值請求,將值增加10L Increment increment = new Increment(Bytes.toBytes("row2")); increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L); table.increment(increment); // 建立一個查詢請求,查詢一行資料 Get get = new Get(Bytes.toBytes("row1")); // 由於HBase的一行可能非常大,所以限定要取出的列族 get.addFamily(Bytes.toBytes("mycf")); // 建立一個結果請求 Result result = table.get(get); // 從查詢結果中取出name列,然後列印(這裡預設取最新版本的值,如果要取其他版本要使用Cell物件) byte[] name = result.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")); System.out.println(Bytes.toString(name)); // 建立一個掃描請求,查詢多行資料 Scan scan = new Scan(Bytes.toBytes("row1")); // 設定掃描器的快取數量,遍歷資料時不用發多次請求,預設100,適當的快取可以提高效能 scan.setCaching(150); // 建立掃描結果,這個時候不會真正從HBase查詢資料,下面的遍歷才是去查詢 ResultScanner resultScanner = table.getScanner(scan); for (Result r : resultScanner) { String data = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"))); System.out.println(data); } // 使用完畢關閉 resultScanner.close(); // 建立一個刪除請求 Delete delete = new Delete(Bytes.toBytes("row2")); // 可以自定義一些篩選條件 delete.addFamily(Bytes.toBytes("age")); table.delete(delete); // 停用表 admin.disableTable(tableName); // 刪除列族 admin.deleteColumnFamily(tableName, "mycf".getBytes()); // 刪除表 admin.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } System.out.println("ok"); } public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException { // 獲取table名 TableName tableName = table.getTableName(); // 判斷table是否存在,如果存在則先停用並刪除 if (admin.tableExists(tableName)) { // 停用表 admin.disableTable(tableName); // 刪除表 admin.deleteTable(tableName); } // 建立表 admin.createTable(table); } }
四、API的高階用法
上一章介紹了API的基本使用方法,這一章總結一些高階用法
1、過濾器:通過get或者scan查詢資料時,經常需要加入一些條件來查詢
(1)值過濾器:相當於傳統sql的where column like '%jack%',但是會對所有的列都做過濾,如果需要對單個列過濾,可以使用SingleColumnValueFilter,如果需要查詢值相等的過濾器,可以使用BinaryComparator
CompareFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("jack")); scan.setFilter(filter);
(2)分頁過濾器:相當於傳統sql的limit,但是不能指定起始頁碼,所以需要自己記錄最後一個row key,並通過scan.setStartRow()設定,在做分頁時有個小技巧,如果你通過scan.setStartRow()設定最後一個row key時,下一頁的資料依然會包含上一頁的最後一個數據,所以你可以將最後一個row key的末尾加一個0,就可以不包含最後一個數據了,因為row key是按照字典順序排序的
Filter filter1 = new PageFilter(10L); scan.setFilter(filter1);
(3)過濾器列表:用於組合多個過濾器,實現複雜一些的查詢場景,注意這個過濾器列表是有順序的,FilterList的第一個引數用來指定多個條件的連線方式(and、or),MUST_PASS_ALL相當於and連線,MUST_PASS_ONE相當於or連線
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); filterList.addFilter(filter); filterList.addFilter(filter1); scan.setFilter(filterList);
(4)還有一些其他的過濾器,使用方法大同小異,比如行鍵過濾器、列過濾器、單元格過濾器,甚至可以自定義過濾器,其他高階用法可以等用到再看