HBase 系列(六)——HBase Java API 的基本使用
一、簡述
截至到目前 (2019.04),HBase 有兩個主要的版本,分別是 1.x 和 2.x ,兩個版本的 Java API 有所不同,1.x 中某些方法在 2.x 中被標識為 @deprecated
過時。所以下面關於 API 的樣例,我會分別給出 1.x 和 2.x 兩個版本。完整的程式碼見本倉庫:
Java API 1.x Examples
Java API 2.x Examples
同時你使用的客戶端的版本必須與服務端版本保持一致,如果用 2.x 版本的客戶端程式碼去連線 1.x 版本的服務端,會丟擲 NoSuchColumnFamilyException
等異常。
二、Java API 1.x 基本使用
2.1 新建Maven工程,匯入專案依賴
要使用 Java API 操作 HBase,需要引入 hbase-client
。這裡選取的 HBase Client
的版本為 1.2.0
。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
2.2 API 基本使用
public class HBaseUtils { private static Connection connection; static { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); // 如果是叢集 則主機名用逗號分隔 configuration.set("hbase.zookeeper.quorum", "hadoop001"); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } } /** * 建立 HBase 表 * * @param tableName 表名 * @param columnFamilies 列族的陣列 */ public static boolean createTable(String tableName, List<String> columnFamilies) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); if (admin.tableExists(tableName)) { return false; } HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); columnFamilies.forEach(columnFamily -> { HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily); columnDescriptor.setMaxVersions(1); tableDescriptor.addFamily(columnDescriptor); }); admin.createTable(tableDescriptor); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除 hBase 表 * * @param tableName 表名 */ public static boolean deleteTable(String tableName) { try { HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); // 刪除表前需要先禁用表 admin.disableTable(tableName); admin.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } return true; } /** * 插入資料 * * @param tableName 表名 * @param rowKey 唯一標識 * @param columnFamilyName 列族名 * @param qualifier 列標識 * @param value 資料 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier, String value) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value)); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 插入資料 * * @param tableName 表名 * @param rowKey 唯一標識 * @param columnFamilyName 列族名 * @param pairList 列標識和值的集合 */ public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue()))); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 根據 rowKey 獲取指定行的資料 * * @param tableName 表名 * @param rowKey 唯一標識 */ public static Result getRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); return table.get(get); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 獲取指定行指定列 (cell) 的最新版本的資料 * * @param tableName 表名 * @param rowKey 唯一標識 * @param columnFamily 列族 * @param qualifier 列標識 */ public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); if (!get.isCheckExistenceOnly()) { get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); Result result = table.get(get); byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); return Bytes.toString(resultValue); } else { return null; } } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索全表 * * @param tableName 表名 */ public static ResultScanner getScanner(String tableName) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定資料 * * @param tableName 表名 * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 檢索表中指定資料 * * @param tableName 表名 * @param startRowKey 起始 RowKey * @param endRowKey 終止 RowKey * @param filterList 過濾器 */ public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); return table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 刪除指定行記錄 * * @param tableName 表名 * @param rowKey 唯一標識 */ public static boolean deleteRow(String tableName, String rowKey) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); } catch (IOException e) { e.printStackTrace(); } return true; } /** * 刪除指定行的指定列 * * @param tableName 表名 * @param rowKey 唯一標識 * @param familyName 列族 * @param qualifier 列標識 */ public static boolean deleteColumn(String tableName, String rowKey, String familyName, String qualifier) { try { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier)); table.delete(delete); table.close(); } catch (IOException e) { e.printStackTrace(); } return true; } }
2.3 單元測試
以單元測試的方式對上面封裝的 API 進行測試。
public class HBaseUtilsTest { private static final String TABLE_NAME = "class"; private static final String TEACHER = "teacher"; private static final String STUDENT = "student"; @Test public void createTable() { // 新建表 List<String> columnFamilies = Arrays.asList(TEACHER, STUDENT); boolean table = HBaseUtils.createTable(TABLE_NAME, columnFamilies); System.out.println("表建立結果:" + table); } @Test public void insertData() { List<Pair<String, String>> pairs1 = Arrays.asList(new Pair<>("name", "Tom"), new Pair<>("age", "22"), new Pair<>("gender", "1")); HBaseUtils.putRow(TABLE_NAME, "rowKey1", STUDENT, pairs1); List<Pair<String, String>> pairs2 = Arrays.asList(new Pair<>("name", "Jack"), new Pair<>("age", "33"), new Pair<>("gender", "2")); HBaseUtils.putRow(TABLE_NAME, "rowKey2", STUDENT, pairs2); List<Pair<String, String>> pairs3 = Arrays.asList(new Pair<>("name", "Mike"), new Pair<>("age", "44"), new Pair<>("gender", "1")); HBaseUtils.putRow(TABLE_NAME, "rowKey3", STUDENT, pairs3); } @Test public void getRow() { Result result = HBaseUtils.getRow(TABLE_NAME, "rowKey1"); if (result != null) { System.out.println(Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name")))); } } @Test public void getCell() { String cell = HBaseUtils.getCell(TABLE_NAME, "rowKey2", STUDENT, "age"); System.out.println("cell age :" + cell); } @Test public void getScanner() { ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME); if (scanner != null) { scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); scanner.close(); } } @Test public void getScannerWithFilter() { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes(STUDENT), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Jack")); filterList.addFilter(nameFilter); ResultScanner scanner = HBaseUtils.getScanner(TABLE_NAME, filterList); if (scanner != null) { scanner.forEach(result -> System.out.println(Bytes.toString(result.getRow()) + "->" + Bytes .toString(result.getValue(Bytes.toBytes(STUDENT), Bytes.toBytes("name"))))); scanner.close(); } } @Test public void deleteColumn() { boolean b = HBaseUtils.deleteColumn(TABLE_NAME, "rowKey2", STUDENT, "age"); System.out.println("刪除結果: " + b); } @Test public void deleteRow() { boolean b = HBaseUtils.deleteRow(TABLE_NAME, "rowKey2"); System.out.println("刪除結果: " + b); } @Test public void deleteTable() { boolean b = HBaseUtils.deleteTable(TABLE_NAME); System.out.println("刪除結果: " + b); } }
三、Java API 2.x 基本使用
3.1 新建Maven工程,匯入專案依賴
這裡選取的 HBase Client
的版本為最新的 2.1.4
。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.4</version>
</dependency>
3.2 API 的基本使用
2.x 版本相比於 1.x 廢棄了一部分方法,關於廢棄的方法在原始碼中都會指明新的替代方法,比如,在 2.x 中建立表時:HTableDescriptor
和 HColumnDescriptor
等類都標識為廢棄,取而代之的是使用 TableDescriptorBuilder
和 ColumnFamilyDescriptorBuilder
來定義表和列族。
以下為 HBase 2.x 版本 Java API 的使用示例:
public class HBaseUtils {
private static Connection connection;
static {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 如果是叢集 則主機名用逗號分隔
configuration.set("hbase.zookeeper.quorum", "hadoop001");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 建立 HBase 表
*
* @param tableName 表名
* @param columnFamilies 列族的陣列
*/
public static boolean createTable(String tableName, List<String> columnFamilies) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
columnFamilies.forEach(columnFamily -> {
ColumnFamilyDescriptorBuilder cfDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
cfDescriptorBuilder.setMaxVersions(1);
ColumnFamilyDescriptor familyDescriptor = cfDescriptorBuilder.build();
tableDescriptor.setColumnFamily(familyDescriptor);
});
admin.createTable(tableDescriptor.build());
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 刪除 hBase 表
*
* @param tableName 表名
*/
public static boolean deleteTable(String tableName) {
try {
HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
// 刪除表前需要先禁用表
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 插入資料
*
* @param tableName 表名
* @param rowKey 唯一標識
* @param columnFamilyName 列族名
* @param qualifier 列標識
* @param value 資料
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, String qualifier,
String value) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 插入資料
*
* @param tableName 表名
* @param rowKey 唯一標識
* @param columnFamilyName 列族名
* @param pairList 列標識和值的集合
*/
public static boolean putRow(String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
pairList.forEach(pair -> put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getKey()), Bytes.toBytes(pair.getValue())));
table.put(put);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 根據 rowKey 獲取指定行的資料
*
* @param tableName 表名
* @param rowKey 唯一標識
*/
public static Result getRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 獲取指定行指定列 (cell) 的最新版本的資料
*
* @param tableName 表名
* @param rowKey 唯一標識
* @param columnFamily 列族
* @param qualifier 列標識
*/
public static String getCell(String tableName, String rowKey, String columnFamily, String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (!get.isCheckExistenceOnly()) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = table.get(get);
byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
return Bytes.toString(resultValue);
} else {
return null;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 檢索全表
*
* @param tableName 表名
*/
public static ResultScanner getScanner(String tableName) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 檢索表中指定資料
*
* @param tableName 表名
* @param filterList 過濾器
*/
public static ResultScanner getScanner(String tableName, FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 檢索表中指定資料
*
* @param tableName 表名
* @param startRowKey 起始 RowKey
* @param endRowKey 終止 RowKey
* @param filterList 過濾器
*/
public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey,
FilterList filterList) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
return table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 刪除指定行記錄
*
* @param tableName 表名
* @param rowKey 唯一標識
*/
public static boolean deleteRow(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 刪除指定行指定列
*
* @param tableName 表名
* @param rowKey 唯一標識
* @param familyName 列族
* @param qualifier 列標識
*/
public static boolean deleteColumn(String tableName, String rowKey, String familyName,
String qualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
table.delete(delete);
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
}
四、正確連線Hbase
在上面的程式碼中,在類載入時就初始化了 Connection 連線,並且之後的方法都是複用這個 Connection,這時我們可能會考慮是否可以使用自定義連線池來獲取更好的效能表現?實際上這是沒有必要的。
首先官方對於 Connection
的使用說明如下:
Connection Pooling For applications which require high-end multithreaded
access (e.g., web-servers or application servers that may serve many
application threads in a single JVM), you can pre-create a Connection,
as shown in the following example:
對於高併發多執行緒訪問的應用程式(例如,在單個 JVM 中存在的為多個執行緒服務的 Web 伺服器或應用程式伺服器),
您只需要預先建立一個 Connection。例子如下:
// Create a connection to the cluster.
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tablename))) {
// use table as needed, the table returned is lightweight
}
之所以能這樣使用,這是因為 Connection 並不是一個簡單的 socket 連線,介面文件 中對 Connection 的表述是:
A cluster connection encapsulating lower level individual connections to actual servers and a
connection to zookeeper. Connections are instantiated through the ConnectionFactory class.
The lifecycle of the connection is managed by the caller, who has to close() the connection
to release the resources.
Connection 是一個叢集連線,封裝了與多臺伺服器(Matser/Region Server)的底層連線以及與 zookeeper 的連線。
連線通過 ConnectionFactory 類例項化。連線的生命週期由呼叫者管理,呼叫者必須使用 close() 關閉連線以釋放資源。
之所以封裝這些連線,是因為 HBase 客戶端需要連線三個不同的服務角色:
- Zookeeper :主要用於獲取
meta
表的位置資訊,Master 的資訊; - HBase Master :主要用於執行 HBaseAdmin 介面的一些操作,例如建表等;
- HBase RegionServer :用於讀、寫資料。
Connection 物件和實際的 Socket 連線之間的對應關係如下圖:
上面兩張圖片引用自部落格:連線 HBase 的正確姿勢
在 HBase 客戶端程式碼中,真正對應 Socket 連線的是 RpcConnection
物件。HBase 使用 PoolMap
這種資料結構來儲存客戶端到 HBase 伺服器之間的連線。PoolMap
的內部有一個 ConcurrentHashMap
例項,其 key 是 ConnectionId
(封裝了伺服器地址和使用者 ticket),value 是一個 RpcConnection
物件的資源池。當 HBase 需要連線一個伺服器時,首先會根據 ConnectionId
找到對應的連線池,然後從連線池中取出一個連線物件。
@InterfaceAudience.Private
public class PoolMap<K, V> implements Map<K, V> {
private PoolType poolType;
private int poolMaxSize;
private Map<K, Pool<V>> pools = new ConcurrentHashMap<>();
public PoolMap(PoolType poolType) {
this.poolType = poolType;
}
.....
HBase 中提供了三種資源池的實現,分別是 Reusable
,RoundRobin
和 ThreadLocal
。具體實現可以通 hbase.client.ipc.pool.type
配置項指定,預設為 Reusable
。連線池的大小也可以通過 hbase.client.ipc.pool.size
配置項指定,預設為 1,即每個 Server 1 個連線。也可以通過修改配置實現:
config.set("hbase.client.ipc.pool.type",...);
config.set("hbase.client.ipc.pool.size",...);
connection = ConnectionFactory.createConnection(config);
由此可以看出 HBase 中 Connection 類已經實現了對連線的管理功能,所以我們不必在 Connection 上在做額外的管理。
另外,Connection 是執行緒安全的,但 Table 和 Admin 卻不是執行緒安全的,因此正確的做法是一個程序共用一個 Connection 物件,而在不同的執行緒中使用單獨的 Table 和 Admin 物件。Table 和 Admin 的獲取操作 getTable()
和 getAdmin()
都是輕量級,所以不必擔心效能的消耗,同時建議在使用完成後顯示的呼叫 close()
方法來關閉它們。
參考資料
- 連線 HBase 的正確姿勢
- Apache HBase ™ Reference Guide
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南