HBase客戶端API使用
篇幅中使用的HBase版本為1.1.2
Java API
HBase提供了一套Java API來支援Java程式對HBase資料庫的請求操作,在hbase shell中能夠使用的都可以通過這套API來實現
HBase有兩套API,分別是1.0和2.0,在較新版本的HBase中使用1.0的API時,很多類和方法都被標記為Deprecated,官方表示舊版本的API將會在3.0版本中刪除,所以推薦使用2.0
本篇中使用的API均為2.0
2.0官方API文件
需要匯入的jar包
- com.google.protobuf:rpc通訊依賴
- org.apache.zookeeper:連線zk依賴
- hbase-client:hbase客戶端
- hbase-common:hbase元件
使用maven可以簡單方便地管理jar包,pom檔案示例如下:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>1.1.2</hbase.version>
<zookeeper.version>3.4.5</zookeeper.version >
</properties>
<dependencis>
<!--hadoop/hbase都要依賴(RPC通訊)-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencis>
開發流程
HBase API的使用可以歸納為一下幾個步驟:
1.獲得Configuration例項:其中儲存了環境和配置資訊
2.在Configuration中設定zk和master的相關資訊,如果hbase的配置檔案在環境變數中則不需要配置
3.獲得Connection例項連線到zk
4.通過Connection例項獲得Admin和Table例項呼叫其方法進行操作
其中Admin和Table為HBase API中提供的一個統一操作介面,在1.0中對應的是HAdmin和HTable
Admin對應的是DDL操作
Table對應的是相關表的DML操作
程式碼示例
//1.獲得Configuration例項並進行相關設定
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "localhost:2181");
configuration.set("hbase.master", "localhost:16010");
//2.獲得Connection例項
Connection connection = ConnectionFactory.createConnection(configuration);
//3.1獲得Admin介面
Admin admin = connection.getAdmin();
//3.2獲得Table介面,需要傳入表名
Table table = connection.getTable(tableName)
為了程式的可維護性和方便呼叫,這裡將HBase API提供的介面劃分成了三個類
1.HBaseInfo:儲存了Configuration和Connection,並進行一些初始化的設定,如zk地址等
2.HBaseDDLUtil:繼承HBaseInfo,通過父類的Configuration和Connection獲得Admin例項並使用其提供的方式進行DDL操作
3.HBaseDMLUtil:繼承HBaseInfo,通過父類的Configuration和Connection獲得Table例項並使用其提供的方式進行DML操作
DDL操作
/**
* 建立表
* @param tableName 表名
* @param familyNames 列族名
* */
public static void createTable(String tableName, String... familyNames) throws IOException {
if (admin.tableExists(TableName.valueOf(tableName))) {
return;
}
//通過HTableDescriptor類來描述一個表,HColumnDescriptor描述一個列族
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String familyName : familyNames) {
tableDescriptor.addFamily(new HColumnDescriptor(familyName));
}
admin.createTable(tableDescriptor);
}
/**
* 刪除表
* @param tableName 表名
* */
public static void dropTable(String tableName) throws IOException {
//刪除之前要將表disable
if (!admin.isTableDisabled(TableName.valueOf(tableName))) {
admin.disableTable(TableName.valueOf(tableName));
}
admin.deleteTable(TableName.valueOf(tableName));
}
DML操作
HBase中的CRUD都是通過對應的物件來操作的,例如:
Put為新增,如果記錄已經存在會用新值覆蓋,相當於修改
Delete為刪除
Get為查詢
/**
* 指定行/列中插入資料
* @param tableName 表名
* @param rowKey 主鍵rowkey
* @param family 列族
* @param column 列
* @param value 值
* TODO: 批量PUT
*/
public static void insert(String tableName, String rowKey, String family, String column, String value) throws IOException {
table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
}
/**
* 刪除表中的指定行
* @param tableName 表名
* @param rowKey rowkey
* TODO: 批量刪除
*/
public static void delete(String tableName, String rowKey) throws IOException {
table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
篇幅有限,HBase工具類的程式碼已上傳至Github
API文件中還有許多實用的函式沒有接觸到,希望在實踐中可以操作一下
MapReduceApi
要使用HBase的MapReduce API需要在pom檔案中新增以下依賴:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
HBase實現了TableInputFormat和TableOutputFormat用於讀寫HBase表
TableMapper類和TableReducer類,在使用MapReduce操作HBase的時候可以藉助這兩個類從HBase中讀資料和寫資料
TableInputFormat
用於讀取HBase表資料並生成鍵值對
將資料表按照Region分割成split,既有多少個Regions就有多個splits
然後將Region按行鍵分成
TableMapper
和普通的Mapper的區別在於
TableMapper將輸入的
TableReducer
該類將Reducer的輸出型別限制為Mutation,Mutation是HBase中Delete/Put/Get/Append類的父類,也就是說TableReducer將輸出型別限制在這幾個類之中
自定義的Reducer類繼承TableReducer,指定其輸入的
TableOutputFormat
該類負責將Reducer的輸出資料寫入到HBase中
程式碼示例
MyTableMapper
public class MyTableMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
ImmutableBytesWritable k = new ImmutableBytesWritable();
ImmutableBytesWritable v = new ImmutableBytesWritable();
/**
* 表中的每行都會呼叫一次map函式
* */
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//遍歷改行中的結果集
for (Cell cell : value.rawCells()) {
//獲得rowkey
byte[] row = CellUtil.cloneRow(cell);
//獲得值
byte[] rowValue = CellUtil.cloneValue(cell);
k.set(row);
v.set(rowValue);
context.write(k, v);
}
}
}
MyTableReducer
public class MyTableReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
for (ImmutableBytesWritable value : values) {
Put put = new Put(key.get());
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c3"), value.get());
context.write(key, put);
}
}
}
驅動程式
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
//設定讀取的表
configuration.set(TableInputFormat.INPUT_TABLE, "client");
Job job = Job.getInstance(configuration, "hbase-mr-api");
//可以和普通mr程式一樣進行設定
job.setJarByClass(Driver.class);
job.setInputFormatClass(TableInputFormat.class);
job.setMapperClass(MyTableMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(ImmutableBytesWritable.class);
//也可以使用hbase提供的工具類來設定job
TableMapReduceUtil.initTableReducerJob("t1", MyTableReducer.class, job);
job.waitForCompletion(true);
}
關於驅動程式中設定job的方法
在上面的程式碼中可以看到,設定job的方法有兩種:MapReduce常用方式和HBase提供的TableMapReduceUtil工具類
兩種方式的設定效果是相同的,通過普通的方式進行設定的時候需要配置TableInputFormat/TableOutputFormat的相關屬性,如上的
//設定讀取的表
configuration.set(TableInputFormat.INPUT_TABLE, "client");
而是使用TableMapReduceUtil的initTableMapperJob/initTableReducerJob則是通過引數傳遞這些配置,下面給出具體的配置項
TableInputFormat
TableOutputFormat
initTableMapperJob
initTableReducerJob
作者:@小黑