1. 程式人生 > 實用技巧 >HBase操作工具類

HBase操作工具類

宣告

本文轉自:https://www.cnblogs.com/jonban/p/10805971.html

正文

新增依賴

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.5</version>
        </dependency>

工具類

package javax.utils;

import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes;
/** * Hbase 操作工具類 * * @author Logan * @version 1.0.0 * @createDate 2019-05-03 * */ public class HbaseUtils { // ===============Common===================================== /** * 根據表名獲取Table物件 * * @param name 表名,必要時可指定名稱空間,比如:“default:user” * @return Hbase Table 物件 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static Table getTable(String name) throws IOException { TableName tableName = TableName.valueOf(name); Connection connection = ConnectionFactory.createConnection(); return connection.getTable(tableName); } // =============== Put ===================================== /** * 根據rowKey生成一個Put物件 * * @param rowKey rowKey * @return Put物件 */ public static Put createPut(String rowKey) { return new Put(Bytes.toBytes(rowKey)); } /** * 在Put物件上增加Cell * * @param put Put物件 * @param cell cell物件 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void addCellOnPut(Put put, Cell cell) throws IOException { put.add(cell); } /** * 在Put物件上增加值 * * @param put Put物件 * @param family 列簇 * @param qualifier 列 * @param value 字串型別的值 */ public static void addValueOnPut(Put put, String family, String qualifier, String value) { addValueOnPut(put, family, qualifier, Bytes.toBytes(value)); } /** * 在Put物件上增加值 * * @param put Put物件 * @param family 列簇 * @param qualifier 列 * @param value 位元組陣列型別的值,可以是任意物件序列化而成 */ public static void addValueOnPut(Put put, String family, String qualifier, byte[] value) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value); } /** * 在Put物件上增加值 * * @param put Put物件 * @param family 列簇 * @param qualifier 列 * @param ts Timestamp時間戳 * @param value 字串型別的值 */ public static void addValueOnPut(Put put, String family, String qualifier, long ts, String value) { addValueOnPut(put, family, qualifier, ts, Bytes.toBytes(value)); } /** * 在Put物件上增加值 * * @param put Put物件 * @param family 列簇 * @param qualifier 列 * @param ts Timestamp時間戳 * @param value 位元組陣列型別的值,可以是任意物件序列化而成 */ public static void addValueOnPut(Put put, String family, String qualifier, long ts, byte[] value) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, value); } /** * 按表名插入一個Put物件包含的資料 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param put 要插入的資料物件 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void put(String tableName, Put put) throws IOException { try ( Table table = getTable(tableName); ) { table.put(put); } } /** * 按表名批量插入Put物件包含的資料 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param puts 要插入的資料物件集合 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void put(String tableName, List<Put> puts) throws IOException { try ( Table table = getTable(tableName); ) { table.put(puts); } } // =============== Get ===================================== /** * 根據rowKey生成一個查詢的Get物件 * * @param rowKey rowKey * @return Get 物件 */ public static Get createGet(String rowKey) { return new Get(Bytes.toBytes(rowKey)); } /** * 對查詢的Get物件增加指定列簇 * * @param get * @param family */ public static void addFamilyOnGet(Get get, String family) { get.addFamily(Bytes.toBytes(family)); } /** * 對查詢的Get物件增加指定列簇和列 * * @param get * @param family * @param qualifier */ public static void addColumnOnGet(Get get, String family, String qualifier) { get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); } /** * 根據表名和rowKey查詢結果(包含全部列簇和列) * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param rowKey 查詢rowKey * @return 查詢結果Result * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static Result get(String tableName, String rowKey) throws IOException { Get get = createGet(rowKey); return get(tableName, get); } /** * 根據表名和rowKey陣列批量查詢結果(包含全部列簇和列) * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param rowKeys 查詢rowKey陣列 * @return 查詢結果Result陣列 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static Result[] get(String tableName, String[] rowKeys) throws IOException { List<Get> gets = new ArrayList<Get>(); for (String rowKey : rowKeys) { gets.add(createGet(rowKey)); } return get(tableName, gets); } /** * 根據表名和Get物件查詢結果 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param get Hbase查詢物件 * @return 查詢結果Result * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static Result get(String tableName, Get get) throws IOException { try ( Table table = getTable(tableName); ) { return table.get(get); } } /** * 根據表名和Get物件陣列查詢結果 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param gets 多個Hbase查詢物件組成的陣列 * @return 查詢結果Result陣列 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static Result[] get(String tableName, List<Get> gets) throws IOException { try ( Table table = getTable(tableName); ) { return table.get(gets); } } // =============== Scan ===================================== /** * 根據startRow和stopRow建立掃描物件 * * @param startRow 掃描開始行,結果包含該行 * @param stopRow 掃描結束行,結果不包含該行 * @return Scan物件 */ public static Scan createScan(String startRow, String stopRow) { Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(startRow)); scan.withStopRow(Bytes.toBytes(stopRow)); return scan; } /** * 對掃描物件設定列簇 * * @param scan 掃描物件 * @param family 列簇 */ public static void addFamilyOnScan(Scan scan, String family) { scan.addFamily(Bytes.toBytes(family)); } /** * 對掃描物件設定列 * * @param scan 掃描物件 * @param family 列簇 * @param qualifier 列簇下對應的列 */ public static void addColumnOnScan(Scan scan, String family, String qualifier) { scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); } /** * 根據表名和掃描物件掃描資料 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param scan 掃描物件 * @return 掃描結果集物件ResultScanner * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static ResultScanner scan(String tableName, Scan scan) throws IOException { try ( Table table = getTable(tableName); ) { return table.getScanner(scan); } } /** * 根據表名、開始行和結束行掃描資料(結果包含開始行,不包含結束行,半開半閉區間[startRow, stopRow)) * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param startRow 掃描開始行 * @param stopRow 掃描結束行 * @return 掃描結果集物件ResultScanner * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static ResultScanner scan(String tableName, String startRow, String stopRow) throws IOException { return scan(tableName, createScan(startRow, stopRow)); } // =============== Delete ===================================== /** * 根據rowKey生成一個查詢的Delete物件 * * @param rowKey rowKey * @return Delete物件 */ public static Delete createDelete(String rowKey) { return new Delete(Bytes.toBytes(rowKey)); } /** * 在Delete物件上增加Cell * * @param delete Delete物件 * @param cell cell物件 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void addCellOnDelete(Delete delete, Cell cell) throws IOException { delete.add(cell); } /** * 對刪除物件增加指定列簇 * * @param delete Delete物件 * @param family 列簇 */ public static void addFamilyOnDelete(Delete delete, String family) { delete.addFamily(Bytes.toBytes(family)); } /** * 對刪除物件增加指定列簇和列 * * @param delete Delete物件 * @param family 列簇 * @param qualifier 列 */ public static void addColumnOnDelete(Delete delete, String family, String qualifier) { delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); } /** * 按表名刪除一個Delete物件指定的資料 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param delete Delete物件 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void delete(String tableName, Delete delete) throws IOException { try ( Table table = getTable(tableName); ) { table.delete(delete); } } /** * 按表名批量刪除Delete物件集合包含的指定資料 * * @param tableName 表名,必要時可指定名稱空間,比如:“default:user” * @param deletes Delete物件集合 * @throws IOException 有異常丟擲,由呼叫者捕獲處理 */ public static void delete(String tableName, List<Delete> deletes) throws IOException { try ( Table table = getTable(tableName); ) { table.delete(deletes); } } }

測試類

package com.java.demo;

import java.util.ArrayList;
import java.util.List;

import javax.utils.HbaseUtils;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

/**
 * Hbase 客戶端測試
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-03
 *
 */
public class HbaseClientDemo {

    /**
     * 向user表中插入資料
     */
    @Test
    public void put() {

        String tableName = "default:user";
        try {

            List<Put> puts = new ArrayList<Put>();
            Put put = HbaseUtils.createPut("key1005");
            HbaseUtils.addValueOnPut(put, "info", "name", "孫悟空");
            HbaseUtils.addValueOnPut(put, "info", "age", "500");
            HbaseUtils.addValueOnPut(put, "info", "address", "花果山");
            // HbaseUtils.put(tableName, put);
            puts.add(put);

            put = HbaseUtils.createPut("key1006");
            HbaseUtils.addValueOnPut(put, "info", "name", "沙悟淨");
            HbaseUtils.addValueOnPut(put, "info", "age", "1000");
            HbaseUtils.addValueOnPut(put, "info", "address", "流沙河");
            puts.add(put);

            HbaseUtils.put(tableName, puts);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 按rowKey批量查詢user表中全部列簇全部列的值
     */
    @Test
    public void getAllFamily() {
        try {
            String tableName = "default:user";
            String[] rowKeys = { "key1001", "key1002", "key1003", "key1005", "key1006" };

            // 按表名和rowKey查詢所有列
            Result[] results = HbaseUtils.get(tableName, rowKeys);
            for (Result result : results) {

                // 列印查詢結果
                printResult(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 按rowKey查詢user表中指定列簇指定列的值
     */
    @Test
    public void get() {
        try {
            String tableName = "default:user";
            String rowKey = "key1002";

            Get get = HbaseUtils.createGet(rowKey);

            HbaseUtils.addColumnOnGet(get, "info", "name");
            HbaseUtils.addColumnOnGet(get, "info", "age");

            // 不存在的列,查詢結果不顯示
            HbaseUtils.addColumnOnGet(get, "info", "address");

            // 如果在增加列後增加已有的列簇,會返回該列簇的全部列資料,覆蓋前邊的增加列
            // HbaseUtils.addFamilyOnGet(get, "info");

            Result result = HbaseUtils.get(tableName, get);
            printResult(result);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void scan() {
        try {
            String tableName = "default:user";
            String startRow = "key1001";
            String stopRow = "key1006";
            ResultScanner resultScanner = HbaseUtils.scan(tableName, startRow, stopRow);
            for (Result result : resultScanner) {
                printResult(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 列印查詢結果
     * 
     * @param result 查詢結果物件
     */
    private void printResult(Result result) {
        Cell[] cells = result.rawCells();

        // 從Result中讀取 rowKey
        System.out.println(Bytes.toString(result.getRow()));

        String print = "%s\t %s:%s \t %s";
        for (Cell cell : cells) {

            // 從Cell中取rowKey
            String row = Bytes.toString(CellUtil.cloneRow(cell));
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));

            System.out.println(String.format(print, row, family, qualifier, value));

        }
    }

    /**
     * 刪除指定列
     */
    @Test
    public void deleteColumn() {
        try {
            String tableName = "default:user";
            List<Delete> deletes = new ArrayList<Delete>();
            Delete delete = HbaseUtils.createDelete("key1005");
            HbaseUtils.addColumnOnDelete(delete, "info", "age");
            HbaseUtils.addColumnOnDelete(delete, "info", "address");
            // HbaseUtils.delete(tableName, delete);
            deletes.add(delete);

            delete = HbaseUtils.createDelete("key1006");
            HbaseUtils.addColumnOnDelete(delete, "info", "address");
            deletes.add(delete);

            HbaseUtils.delete(tableName, deletes);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 刪除指定列簇
     */
    @Test
    public void deleteFamily() {
        try {
            String tableName = "default:user";
            List<Delete> deletes = new ArrayList<Delete>();
            Delete delete = HbaseUtils.createDelete("key1005");
            HbaseUtils.addFamilyOnDelete(delete, "info");
            // HbaseUtils.delete(tableName, delete);
            deletes.add(delete);

            delete = HbaseUtils.createDelete("key1006");
            HbaseUtils.addFamilyOnDelete(delete, "info");
            deletes.add(delete);

            HbaseUtils.delete(tableName, deletes);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}