hbase命令實踐與java api程式碼實踐
名稱空間(namespace): 預設為default名稱空間
list_namespace
create_namespace ‘my_ns’
create ‘my_ns:my_table’,’fam’
exists ‘my_ns:my_table’
list
list_namespace
disable ‘my_ns:my_table’
drop ‘my_ns:my_table’
drop_namespace ‘my_ns’
rowkey與family: rowkey行鍵唯一,family列簇,每一個數據項都有時間戳
put ‘my_ns:my_table’,’rowkey1’,’fam:f1’,’r1f1’
put ‘my_ns:my_table’,’rowkey1’,’fam:f2’,’r1f2’
put ‘my_ns:my_table’,’rowkey1’,’fam:f3’,’r1f3’
put ‘my_ns:my_table’,’rowkey2’,’fam:f1’,’r2f1’
put ‘my_ns:my_table’,’rowkey2’,’fam:f2’,’r2f2’
put ‘my_ns:my_table’,’rowkey2’,’fam:f3’,’r2f3’
put ‘my_ns:my_table’,’rowkey3’,’fam:f1’,’r3f1’
put ‘my_ns:my_table’,’rowkey3’,’fam:f2’,’r3f2’
put ‘my_ns:my_table’,’rowkey3’,’fam:f3’,’r3f3’
list ‘my_ns:my_table’
scan ‘my_ns:my_table’
get ‘my_ns:my_table’,’rowkey1’
get ‘my_ns:my_table’,’rowkey1’,’fam’
get ‘my_ns:my_table’,’rowkey1’,’fam:f1’
delete ‘my_ns:my_table’,’rowkey1’,’fam:f1’
delete ‘my_ns:my_table’,’rowkey2’,’fam:f2’
delete ‘my_ns:my_table’,’rowkey3’,’fam:f3’
scan ‘my_ns:my_table’
hbase java api:
步驟一:關閉hbase機器上的防火牆,確保在本地能與遠端hbase建立tcp連線:service iptable stop
步驟二: maven專案中新增依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.0</version>
</dependency>
注意:有個小問題,jdk1.6的tools.jar自動下載不到,得自己手動去下個包把依賴寫上,hbase-client裡面依賴了這個包,如下
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>E:/downloads/jar/tools.jar</systemPath>
</dependency>
步驟三:本地機器需要加上hbase伺服器hostname的ip對映,我在hosts檔案裡添加了如下一行:
192.168.137.10 centos001
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseClient {
private Configuration configuration;
private Connection connection;
private Admin admin;
private static HBaseClient instance;
private HBaseClient() throws IOException{
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum",
"192.168.137.10:2181,192.168.137.10:2182,192.168.137.10:2183");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
};
public static HBaseClient getInstance() throws IOException{
if(instance==null){
synchronized (HBaseClient.class) {
if(instance==null){
instance = new HBaseClient();
}
}
}
return instance;
}
public void close() throws IOException{
if(admin!=null){
admin.close();
}
if(connection!=null){
connection.close();
}
}
/**
* 判斷名稱空間是否存在
* @param strNamespace 名稱空間
* @return true-存在,false-不存在
* @throws IOException
*/
public boolean isExistsNamespace(String strNamespace) throws IOException{
NamespaceDescriptor[] namespaces = admin.listNamespaceDescriptors();
for(int i=0; i<namespaces.length; i++){
if(strNamespace.equals(namespaces[i].getName())){
return true;
}
}
return false;
}
/**
* 建立名稱空間
* @param strNamespace 名稱空間
* @return true-建立成功,false-存在該namespace
* @throws IOException
*/
public boolean createNamespace(String strNamespace) throws IOException{
if(isExistsNamespace(strNamespace)){
return false;
}else{
admin.createNamespace(NamespaceDescriptor.create(strNamespace).build());
return true;
}
}
/**
* 判斷表是否存在
* @param strTableName 表名
* @return true-存在,false-不存在
* @throws IOException
*/
public boolean isExistsTable(String strTableName) throws IOException{
TableName tableName = TableName.valueOf(strTableName);
return admin.tableExists(tableName);
}
/**
* 建立表,存在同名表時不刪除同名表也不新建表
* @param strTableName 表名
* @param strFamily 列簇名
* @return true-成功建立,false-已存在同名表,未新建表
* @throws IOException
*/
public boolean createTable(String strTableName, String strFamily) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
if (admin.tableExists(tableName)) {
return false;
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor family = new HColumnDescriptor(strFamily);
hTableDescriptor.addFamily(family);
admin.createTable(hTableDescriptor);
return true;
}
/**
* 建立表,存在同名表時刪除同名表然後新建表
* @param strTableName 表名
* @param strFamily 列簇名
* @return 表建立完成後返回true
* @throws IOException
*/
public boolean createTableForced(String strTableName, String strFamily) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
if (admin.tableExists(tableName)) {
deleteTable(strTableName);
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor family = new HColumnDescriptor(strFamily);
hTableDescriptor.addFamily(family);
admin.createTable(hTableDescriptor);
return true;
}
/**
* 插入資料
* @param strTableName 表名
* @param put 插入資料物件
* @throws IOException
*/
public void insertData(String strTableName, Put put) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
table.put(put);
}
/**
* 插入資料
* @param strTableName 表名
* @param putList 插入資料物件集合
* @throws IOException
*/
public void insertData(String strTableName, List<Put> putList) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
table.put(putList);
}
/**
* 掃描表
* @param strTableName 表名
* @return ResultScanner 掃描結果物件
* @throws IOException
*/
public ResultScanner queryTable(String strTableName) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
ResultScanner scanner = table.getScanner(new Scan());
return scanner;
}
/**
* 獲取指定行的資料
* @param strTableName 表名
* @param get 查詢條件物件
* @return 查詢結果物件
* @throws IOException
*/
public Result queryTableByRowKey(String strTableName, Get get) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
Result result = table.get(get);
return result;
}
/**
* 根據Filter條件物件掃描表
* @param strTableName 表名
* @param filter Filter條件物件
* @return 掃描結果物件
* @throws IOException
*/
public ResultScanner queryTableByFilter(String strTableName, Filter filter) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
return scanner;
}
/**
* 根據Filter條件物件列表掃描表
* @param strTableName 表名
* @param filters Filter條件物件集合
* @return 掃描結果集合
* @throws IOException
*/
public ResultScanner queryTableByFilters(String strTableName, List<Filter> filters) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
FilterList filterList = new FilterList(filters);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
return scanner;
}
/**
* 新增列
* @param strTableName 表名
* @param strColumn 列簇名
* @throws IOException
*/
public void addColumn(String strTableName, String strColumn) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
HColumnDescriptor columnDescriptor = new HColumnDescriptor(strColumn);
admin.addColumn(tableName, columnDescriptor);
}
/**
* 刪除列
* @param strTableName 表名
* @param strColumn 列簇名
* @throws IOException
*/
public void deleteColumn(String strTableName, String strColumn) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
admin.deleteColumn(tableName, strColumn.getBytes());
}
/**
* 根據rowkey刪除行
* @param strTableName 表名
* @param rowkey 行名
* @throws IOException
*/
public void deleteByRowKey(String strTableName, String rowkey) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}
/**
* 刪除行
* @param strTableName 表名
* @param list 刪除資料集合
* @throws IOException
*/
public void deleteRow(String strTableName, List<Delete> list) throws IOException {
Table table = connection.getTable(TableName.valueOf(strTableName));
table.delete(list);
}
/**
* 根據Filter條件物件刪除行
* @param strTableName 表名
* @param filter Filter條件物件
* @throws IOException
*/
public void deleteByFilter(String strTableName, Filter filter) throws IOException {
ResultScanner scanner = queryTableByFilter(strTableName, filter);
List<Delete> list = new ArrayList<Delete>();
for (Result result : scanner) {
Delete delete = new Delete(result.getRow());
list.add(delete);
}
deleteRow(strTableName, list);
scanner.close();
}
/**
* 截斷表
* @param strTableName 表名
* @throws IOException
*/
public void truncateTable(String strTableName) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
admin.disableTable(tableName);
admin.truncateTable(tableName, true);
}
/**
* 刪除表
* @param strTableName 表名
* @throws IOException
*/
public void deleteTable(String strTableName) throws IOException {
TableName tableName = TableName.valueOf(strTableName);
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
public static void main(String[] args) throws IOException {
HBaseClient test = HBaseClient.getInstance();
test.testCreateNamespace();
test.testCreateTable();
test.testInsertData();
test.testQuery();
test.testQueryByRow();
test.testQueryByFilter();
test.testQueryByFilters();
test.testAddColumn();
test.testDeleteColumn();
test.testDeleteRow();
test.testDeleteTable();
test.close();
}
public void testCreateNamespace() throws IOException{
createNamespace("ns1");
}
public void testCreateTable() throws IOException{
createTableForced("ns1:t1", "cf");
}
public void testInsertData() throws IOException {
List<Put> putList = new ArrayList<Put>();
Put put = null;
for (int i = 0; i < 10; i++) {
put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c1"), Bytes.toBytes("r" + i + "c1"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("c2"), Bytes.toBytes("r" + i + "c2"));
putList.add(put);
}
insertData("ns1:t1", putList);
}
public void testQuery() throws IOException{
ResultScanner scanner = queryTable("ns1:t1");
for (Result result : scanner) {
byte[] row = result.getRow();
System.out.println("row key is:" + new String(row));
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) {
byte[] familyArray = cell.getFamilyArray();
byte[] qualifierArray = cell.getQualifierArray();
byte[] valueArray = cell.getValueArray();
System.out.println("row value is:" + new String(familyArray)
+ new String(qualifierArray) + new String(valueArray));
}
}
scanner.close();
}
public void testQueryByRow() throws IOException{
Get get = new Get("row5".getBytes());
Result result = queryTableByRowKey("ns1:t1", get);
byte[] row = result.getRow();
System.out.println("row key is:" + new String(row));
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) {
byte[] familyArray = cell.getFamilyArray();
byte[] qualifierArray = cell.getQualifierArray();
byte[] valueArray = cell.getValueArray();
System.out.println("row value is:" + new String(familyArray)
+ new String(qualifierArray) + new String(valueArray));
}
}
public void testQueryByFilter() throws IOException{
Filter filter = new SingleColumnValueFilter(
Bytes.toBytes("cf"), Bytes.toBytes("c1"),
CompareOp.EQUAL, Bytes.toBytes("r3c1"));
ResultScanner scanner = queryTableByFilter("ns1:t1", filter);
for (Result result : scanner) {
byte[] row = result.getRow();
System.out.println("row key is:" + new String(row));
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) {
byte[] familyArray = cell.getFamilyArray();
byte[] qualifierArray = cell.getQualifierArray();
byte[] valueArray = cell.getValueArray();
System.out.println("row value is:" + new String(familyArray)
+ new String(qualifierArray) + new String(valueArray));
}
}
scanner.close();
}
public void testQueryByFilters() throws IOException{
List<Filter> filters = new ArrayList<Filter>();
Filter filter1 = new SingleColumnValueFilter(
Bytes.toBytes("cf"), Bytes.toBytes("c1"),
CompareOp.EQUAL, Bytes.toBytes("r5c1"));
Filter filter2 = new SingleColumnValueFilter(
Bytes.toBytes("cf"), Bytes.toBytes("c2"),
CompareOp.EQUAL, Bytes.toBytes("r5c2"));
filters.add(filter1);
filters.add(filter2);
ResultScanner scanner = queryTableByFilters("ns1:t1", filters);
for (Result result : scanner) {
byte[] row = result.getRow();
System.out.println("row key is:" + new String(row));
List<Cell> listCells = result.listCells();
for (Cell cell : listCells) {
byte[] familyArray = cell.getFamilyArray();
byte[] qualifierArray = cell.getQualifierArray();
byte[] valueArray = cell.getValueArray();
System.out.println("row value is:" + new String(familyArray)
+ new String(qualifierArray, "UTF-8") + new String(valueArray, "UTF-8"));
}
}
scanner.close();
}
public void testAddColumn() throws IOException{
addColumn("ns1:t1", "fam");
testQueryByRow();
}
public void testDeleteColumn() throws IOException{
deleteColumn("ns1:t1", "fam");
testQueryByRow();
}
public void testDeleteRow() throws IOException{
List<Delete> list = new ArrayList<Delete>();
Delete delete = new Delete(Bytes.toBytes("row4"));
list.add(delete);
deleteRow("ns1:t1",list);
testQuery();
}
public void testDeleteTable() throws IOException{
truncateTable("ns1:t1");
testQuery();
deleteTable("ns1:t1");
System.out.println(admin.tableExists(TableName.valueOf("ns1:t1")));
}
}