使用Java API連線和操作HBase資料庫
阿新 • • 發佈:2019-01-26
建立的資料庫儲存如下資料
表結構
java程式碼
public class HbaseTest {
/**
* 配置ss
*/
static Configuration config = null;
private Connection connection = null;
private Table table = null;
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "192.168.33.61");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper埠
connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf("dept"));
}
/**
* 建立資料庫表dept,並增加列族info和subdept
*
* @throws Exception
*/
@Test
public void createTable() throws Exception {
// 建立表管理類
HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
// 建立表描述類
TableName tableName = TableName.valueOf("dept"); // 表名稱
HTableDescriptor desc = new HTableDescriptor(tableName);
// 建立列族的描述類
HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
// 將列族新增到表中
desc.addFamily(family);
HColumnDescriptor family2 = new HColumnDescriptor("subdept"); // 列族
// 將列族新增到表中
desc.addFamily(family2);
// 建立表
admin.createTable(desc); // 建立表
System.out.println("建立表成功!");
}
/**
* 向hbase中插入前三行網路部、開發部、測試部的相關資料,
* 即加入表中的前三條資料
*
* @throws Exception
*/
@SuppressWarnings({ "deprecation", "resource" })
@Test
public void insertData() throws Exception {
table.setAutoFlushTo(false);
table.setWriteBufferSize(534534534);
ArrayList<Put> arrayList = new ArrayList<Put>();
Put put = new Put(Bytes.toBytes("0_1"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("網路部"));
put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept1"), Bytes.toBytes("1_1"));
put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept2"), Bytes.toBytes("1_2"));
arrayList.add(put);
Put put1 = new Put(Bytes.toBytes("1_1"));
put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部"));
put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
Put put2 = new Put(Bytes.toBytes("1_2"));
put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試部"));
put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
for (int i = 1; i <= 100; i++) {
put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("2_"+i));
put2.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("3_"+i));
}
arrayList.add(put1);
arrayList.add(put2);
//插入資料
table.put(arrayList);
//提交
table.flushCommits();
System.out.println("資料插入成功!");
}
/**
* 向hbase中插入開發部、測試部下的所有子部門資料
* @throws Exception
*/
@Test
public void insertOtherData() throws Exception {
table.setAutoFlushTo(false);
table.setWriteBufferSize(534534534);
ArrayList<Put> arrayList = new ArrayList<Put>();
for (int i = 1; i <= 100; i++) {
Put put_development = new Put(Bytes.toBytes("2_"+i));
put_development.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發"+i+"組"));
put_development.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_1"));
arrayList.add(put_development);
Put put_test = new Put(Bytes.toBytes("3_"+i));
put_test.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試"+i+"組"));
put_test.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_2"));
arrayList.add(put_test);
}
//插入資料
table.put(arrayList);
//提交
table.flushCommits();
System.out.println("插入其他資料成功!");
}
/**
* 查詢所有一級部門(沒有上級部門的部門)
* @throws Exception
*/
@Test
public void scanDataStep1() throws Exception {
// 建立全表掃描的scan
Scan scan = new Scan();
System.out.println("查詢到的所有一級部門如下:");
// 列印結果集
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("f_pid")) == null) {
for (KeyValue kv : result.raw()) {
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " = ");
System.out.print(new String(kv.getValue()));
System.out.print(" timestamp = " + kv.getTimestamp() + "\n");
}
}
}
}
/**
* 已知rowkey,查詢該部門的所有(直接)子部門資訊 rowkey=1_1
* @throws Exception
*/
@Test
public void scanDataStep2() throws Exception {
Get g = new Get("1_1".getBytes());
g.addFamily("subdept".getBytes());
// 列印結果集
Result result = table.get(g);
for (KeyValue kv : result.raw()) {
Get g1 = new Get(kv.getValue());
Result result1 = table.get(g1);
for (KeyValue kv1 : result1.raw()) {
System.out.print(new String(kv1.getRow()) + " ");
System.out.print(new String(kv1.getFamily()) + ":");
System.out.print(new String(kv1.getQualifier()) + " = ");
System.out.print(new String(kv1.getValue()));
System.out.print(" timestamp = " + kv1.getTimestamp() + "\n");
}
}
}
/**
* 已知rowkey,向該部門增加一個子部門
* rowkey:0_1
* 增加的部門名:我增加的部門
* @throws Exception
*/
@Test
public void scanDataStep3() throws Exception {
//新增一個部門
Put put = new Put(Bytes.toBytes("4_1"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("我增加的部門"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
//插入資料
table.put(put);
//提交
table.flushCommits();
//更新網路部
Put put1 = new Put(Bytes.toBytes("0_1"));
put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept3"), Bytes.toBytes("4_1"));
//插入資料
table.put(put1);
//提交
table.flushCommits();
}
/**
* 已知rowkey(且該部門存在子部門),刪除該部門資訊,該部門所有(直接)子部門被調整到其他部門中
* @throws Exception
*/
@Test
public void scanDataStep4() throws Exception {
/**
* 向部門"我增加的部門"新增兩個子部門"
*/
table.setAutoFlushTo(false);
table.setWriteBufferSize(534534534);
ArrayList<Put> arrayList = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("5_1"));
put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門1"));
put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
Put put2 = new Put(Bytes.toBytes("5_2"));
put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門2"));
put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
arrayList.add(put1);
arrayList.add(put2);
//插入資料
table.put(arrayList);
//提交
table.flushCommits();
/**
* 目的:刪除"我增加的部門"的部門資訊,該部門所有(直接)子部門被調整到其他部門中
* 使用策略:更新部門名就可以了,也就是說一個部門可能有多個rowkey
*/
Put put = new Put(Bytes.toBytes("4_1"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部"));
//插入資料
table.put(put);
//提交
table.flushCommits();
}
@After
public void close() throws Exception {
table.close();
connection.close();
}
}