1. 程式人生 > >HBase 常用java api獲得客戶端,創建表,查詢,刪除

HBase 常用java api獲得客戶端,創建表,查詢,刪除

size 列族 war pen 需要 java pool 數據類型 init

1,前期準備

(1) 本文采用的hbase是采用三臺服務器搭建的集群,zookeeper也是相同服務器搭建的集群,集群ip分別是192.168.183.101; 192.168.183.102; 192.168.183.103。其中102是主節點(HMaster),101以及103都是HRegionServer

(2) 這次測試安裝的hbase的版本是 hbase-0.99.2.-bin.tar

(3)java api引用的maven依賴路徑如下

 <dependency>
     <groupId>org.apache.hbase</groupId>
     <
artifactId>hbase-client</artifactId> <version>1.0.2</version> </dependency>

(4)配置本地的hosts文件(在本地配置集群的ip與主機名的映射關系)

zookeeper集群的內部有時候通過主機名來進行尋址。如果不在配置hosts文件,在獲得下面的Connection連接時,程序一直會在createConenction這個方法上,測試時等待了3分鐘程序一直卡著。

C:\Windows\System32\drivers\etc\hosts的配置如下:

192.168.183.101 mini01
192.168.183.102 mini02
192.168.183.103 mini03

2,獲取Connection對象

引入maven後,先獲取hbase的java操作HBase的Connection對象,傳入zookeeper的地址以及zookeeper的端口號zookeeper,通過ConnectionFactory可以獲取hbase的連接Connection.

    /**
     * 獲取Connection對象 
     */
    static Configuration config = null;
    private Connection connection = null;
    private Table table = null;

    Logger LOG = LoggerFactory.getLogger(HbaseGetImpl.class);
    @Before
    public void init() throws Exception {
        config = HBaseConfiguration.create();// 配置
        config.set("hbase.zookeeper.quorum", "mini01,mini02,mini03");// zookeeper地址
        config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
        connection = ConnectionFactory.createConnection(config);
    }

下面看一下ConnectionFactory.createConnection(config)的方法註解

/**

 * Create a new Connection instance using the passed <code>conf</code> instance. Connection
 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
 * created from returned connection share zookeeper connection, meta cache, and connections
 * to region servers and masters.
 * <br>
 * The caller is responsible for calling {@link Connection#close()} on the returned
 * connection instance.
 *  調用方在調用拿到connection之後,有責任在隨後的代碼中調用Connection.close()方法來關閉連接
 * Typical usage:
   典型用法如下:
 * <pre>
 * Connection connection = ConnectionFactory.createConnection(conf);
 * Table table = connection.getTable(TableName.valueOf("table1"));
 * try {
 *   table.get(...);
 *   ...
 * } finally {
 *   table.close();
 *   connection.close();
 * }
 * </pre>
 *
 * @param conf configuration
 * @param user the user the connection is for
 * @param pool the thread pool to use for batch operations
 * @return Connection object for <code>conf</code>
 */
public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
  if (user == null) {
    UserProvider provider = UserProvider.instantiate(conf);
    user = provider.getCurrent();
  }

  return createConnection(conf, false, pool, user);
}

3,創建hbase的表

利用connection對象,可以指定HBase表的表名以及表的列族,可以調用admin.createTable(htd, splits);的方法來創建Hbase的表。下面說明以下三點註意事項:

(1)hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)這個方法可以指定hbase數據塊的編碼類型,可以選擇的是DIFF,FAST_DIFF,PREFIX這幾種的編碼類型。

(2)hcd.setCompressionType的方法,可以指定數據壓縮算法,分別是GZ和SNAPPY。值得註意的是在我的本機集群測試時,當我選擇SNAPPY作為壓縮算法時,在調用testCreateTable時會卡住,並且不能創建表。當我把這行代碼註釋掉後,創建hbase表順利。懷疑是我本機的hbase版本與Snappy的配置項不匹配。

(3)在創建表時,admin.createTable(htd, splits)方法可以指定指定splits的參數,預定義表的region分區,這邊預定了4個region分區。

(4)在創建hbase表結束後,需要將connection關閉。

@Test
    public void testCreateTable() {

        LOG.info("Entering testCreateTable.");
// Specify the table descriptor.
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("student2"));
// Set the column family name to info.
        HColumnDescriptor hcd = new HColumnDescriptor("info");
// Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX
// and PREFIX_TREE
        hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
// Set compression methods, HBase provides two default compression
// methods:GZ and SNAPPY
// GZ has the highest compression rate,but low compression and
// decompression effeciency,fit for cold data
// SNAPPY has low compression rate, but high compression and
// decompression effeciency,fit for hot data.
// it is advised to use SNAANPPY
//   hcd.setCompressionType(Compression.Algorithm.SNAPPY);
        htd.addFamily(hcd);
        Admin admin = null; //註[2]
        try {
// Instantiate an Admin object.
            admin = connection.getAdmin();
            if (!admin.tableExists(TableName.valueOf("student2"))) {
                LOG.info("Creating table...");
//                admin.createTable(htd);
                // 創建一個預劃分region的表
                byte[][] splits = new byte[4][];
                splits[0] = Bytes.toBytes("A");
                splits[1] = Bytes.toBytes("H");
                splits[2] = Bytes.toBytes("O");
                splits[3] = Bytes.toBytes("U");
                admin.createTable(htd, splits);
                LOG.info(String.valueOf(admin.getClusterStatus()));
                LOG.info(String.valueOf(admin.listNamespaceDescriptors()));
                LOG.info("Table created successfully.");
            } else {
                LOG.warn("table already exists");
            }
        } catch (IOException e) {
            LOG.error("Create table failed " ,e);
        } finally {
            if (admin != null) {
                try {
// Close the Admin object.
                    admin.close();
                } catch (IOException e) {
                    LOG.error("Failed to close admin " ,e);
                }
            }
        }
        LOG.info("Exiting testCreateTable.");
    }

2,測試往hbase表中put數據

(1)先通過connnection以及tableName獲得Table對象

(2)再構造Put對象時,Put put = new Put(rowkey),在構造方法裏面的是hbase表的唯一鍵rowkey(相當於mysql中的id)。 此外還要指定兩個參數,分別是familyName 列族的名字 ,qulifiers列名。這邊需要註意的是tableName以及familyName是在創建表時就已經確定好了的。而hbase中的列名qulifers不同於mysql中的列名,mysql中的列名是在創建mysql表時就已經確定的。qulfiers可以在插入輸入時隨意指定名稱,不是預定義的。

(3)hbase存儲的數據是不帶數據類型的,全是Bytes。所以在插入hbase表前,需要調用Byte.toBytes()方法,將數據轉成Bytes後再進行插入。

 @Test
    public void testPut() {
        // Specify the column family name.
        byte[] familyName = Bytes.toBytes("info");
//        byte[] familyName1 = Bytes.toBytes("");
        // Specify the column name.
        byte[][] qualifiers = {
                Bytes.toBytes("colu1")
//                Bytes.toBytes("l2"),
//                Bytes.toBytes("l3")
        };
        Table table = null;
        try {
            // Instantiate an HTable object.
            table = connection.getTable(TableName.valueOf("student2"));
            List<Put> puts = new ArrayList<Put>();
            for(int i=1;i<10;i=i+1){
                // Instantiate a Put object.
               String rowkey = UUID.randomUUID().toString(); 
Put put = new Put(Bytes.toBytes(rowkey)); //high_risk put.addColumn(familyName, qualifiers[0], Bytes.toBytes("tommy1")); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(80)); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(UUID.randomUUID().toString())); //temporary_plate puts.add(put); } // Submit a put request. table.put(puts); LOG.info("Put successfully."); } catch (IOException e) { LOG.error("Put failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testPut."); }

3,從hbase中獲取數據

獲取數據的方式一共有兩種:

(1)用hbase中的唯一鍵:rowkey,通過get方法獲取數據

Get get = new Get(rowkey) ,通過指定的rowkey來構造Get對象,另外在還可以通過Get對象指定你需要獲取的列名get.addColumn(familyName,qulifiers)

這個方法中需要註意的地方在: 獲得cell對象後:(for(Cell cell: results.rawCells()) ) ,從cell中取出數據時,需要將hbase的存儲的數據類型Bytes還原成原來的數據類型。 CellUtil.cloneValue(cell)拿到的是Bytes類型的數據,需要通過Bytes.toString() 或Bytes.toInt()等不同的方法轉為原來的類型。

例如:插入時原始數據是Int,那麽必須調用Bytes.toInt()的方法還原回int,初始數據如果是long類型,必須調用Bytes.toLong()的方法還原回Long, 總結來說就是插入hbase之前是什麽樣的,出來必須與插入前是相同的數據類型。(否則的話,取出來的數據是亂碼的)

@Test
    public void testGet() {
        LOG.info("Entering testGet.");
        // Specify the column family name.
        byte[] familyName = Bytes.toBytes("f1");
        // Specify the column name.
        byte[][] qualifier = { Bytes.toBytes("l1")};
        // Specify RowKey.
        byte[] rowKey = Bytes.toBytes("105f1fd2-7048-4fd3-8c7a-65cf04542be2");
        Table table = null;
        try {
            // Create the Configuration instance.
            table = connection.getTable(TableName.valueOf("table"));
            // Instantiate a Get object.
            Get get = new Get(rowKey);
            // Set the column family name and column name.
            get.addColumn(familyName, qualifier[0]);
            // Submit a get request.
            Result result = table.get(get);
            // Print query results.
            for (Cell cell : result.rawCells()) {
                LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
                        + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
                        + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
                        + Bytes.toString(CellUtil.cloneValue(cell)));
// + Bytes.toLong(CellUtil.cloneValue(cell));
// + Bytes.toInt(CellUtil.cloneValue(cell));
// + Bytes.toBoolean(CellUtil.cloneValue(cell));
} LOG.info(
"Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }

(2)構建簡單的scan對象掃描獲取數據

這邊也可以構建簡單的scan對象,通過表掃描的來獲取數據,獲得數據的數據類型也是Bytes類型的。關於scan對象我們還可以設置scan的開始rowkey, scan的結束rowkey,scan可以設置緩存大小,可以設置rowkey過濾器,column過濾器等等過濾器。這些會在下一篇章記錄。

@Test
    public void testScanData() {
        logger.info("Entering testScanData.");
        Table table = null;
        // Instantiate a ResultScanner object.
        ResultScanner rScanner = null;
        try {
            // Create the Configuration instance.
            table = connection.getTable(TableName.valueOf(TABLENAME));
            // Instantiate a Get object.
            Scan scan = new Scan();
//            scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l1"));
//            scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l2"));
            // Set the cache size.
            scan.setCaching(1000);
            // Submit a scan request.
            rScanner = table.getScanner(scan);
            // Print query results.
            for (Result r = rScanner.next(); r != null; r = rScanner.next()) {
                for (Cell cell : r.rawCells()) {
                    logger.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
                            + "\"" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\":"+"\""
                            + Bytes.toString(CellUtil.cloneValue(cell))+"\"");
                }
            }
            logger.info("Scan data successfully.");
        } catch (IOException e) {
            logger.error("Scan data failed ", e);
        } finally {
            if (rScanner != null) {
                // Close the scanner object.
                rScanner.close();
            }
            if (table != null) {
                try {
                    // Close the HTable object.
                    table.close();
                } catch (IOException e) {
                    logger.error("Close table failed ", e);
                }
            }
        }
        logger.info("Exiting testScanData.");
    }








HBase 常用java api獲得客戶端,創建表,查詢,刪除