HBase 常用java api獲得客戶端,創建表,查詢,刪除
1,前期準備
(1) 本文采用的hbase是采用三臺服務器搭建的集群,zookeeper也是相同服務器搭建的集群,集群ip分別是192.168.183.101; 192.168.183.102; 192.168.183.103。其中102是主節點(HMaster),101以及103都是HRegionServer
(2) 這次測試安裝的hbase的版本是 hbase-0.99.2.-bin.tar
(3)java api引用的maven依賴路徑如下
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.2</version> </dependency>
(4)配置本地的hosts文件(在本地配置集群的ip與主機名的映射關系)
zookeeper集群的內部有時候通過主機名來進行尋址。如果不在配置hosts文件,在獲得下面的Connection連接時,程序一直會在createConenction這個方法上,測試時等待了3分鐘程序一直卡著。
C:\Windows\System32\drivers\etc\hosts的配置如下:
192.168.183.101 mini01 192.168.183.102 mini02 192.168.183.103 mini03
2,獲取Connection對象
引入maven後,先獲取hbase的java操作HBase的Connection對象,傳入zookeeper的地址以及zookeeper的端口號zookeeper,通過ConnectionFactory可以獲取hbase的連接Connection.
/** * 獲取Connection對象 */ static Configuration config = null; private Connection connection = null; private Table table = null; Logger LOG = LoggerFactory.getLogger(HbaseGetImpl.class); @Before public void init() throws Exception { config = HBaseConfiguration.create();// 配置 config.set("hbase.zookeeper.quorum", "mini01,mini02,mini03");// zookeeper地址 config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口 connection = ConnectionFactory.createConnection(config); }
下面看一下ConnectionFactory.createConnection(config)的方法註解
/** * Create a new Connection instance using the passed <code>conf</code> instance. Connection * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces * created from returned connection share zookeeper connection, meta cache, and connections * to region servers and masters. * <br> * The caller is responsible for calling {@link Connection#close()} on the returned * connection instance. * 調用方在調用拿到connection之後,有責任在隨後的代碼中調用Connection.close()方法來關閉連接 * Typical usage: 典型用法如下: * <pre> * Connection connection = ConnectionFactory.createConnection(conf); * Table table = connection.getTable(TableName.valueOf("table1")); * try { * table.get(...); * ... * } finally { * table.close(); * connection.close(); * } * </pre> * * @param conf configuration * @param user the user the connection is for * @param pool the thread pool to use for batch operations * @return Connection object for <code>conf</code> */ public static Connection createConnection(Configuration conf, ExecutorService pool, User user) throws IOException { if (user == null) { UserProvider provider = UserProvider.instantiate(conf); user = provider.getCurrent(); } return createConnection(conf, false, pool, user); }
3,創建hbase的表
利用connection對象,可以指定HBase表的表名以及表的列族,可以調用admin.createTable(htd, splits);的方法來創建Hbase的表。下面說明以下三點註意事項:
(1)hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)這個方法可以指定hbase數據塊的編碼類型,可以選擇的是DIFF,FAST_DIFF,PREFIX這幾種的編碼類型。
(2)hcd.setCompressionType的方法,可以指定數據壓縮算法,分別是GZ和SNAPPY。值得註意的是在我的本機集群測試時,當我選擇SNAPPY作為壓縮算法時,在調用testCreateTable時會卡住,並且不能創建表。當我把這行代碼註釋掉後,創建hbase表順利。懷疑是我本機的hbase版本與Snappy的配置項不匹配。
(3)在創建表時,admin.createTable(htd, splits)方法可以指定指定splits的參數,預定義表的region分區,這邊預定了4個region分區。
(4)在創建hbase表結束後,需要將connection關閉。
@Test public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("student2")); // Set the column family name to info. HColumnDescriptor hcd = new HColumnDescriptor("info"); // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX // and PREFIX_TREE hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY // hcd.setCompressionType(Compression.Algorithm.SNAPPY); htd.addFamily(hcd); Admin admin = null; //註[2] try { // Instantiate an Admin object. admin = connection.getAdmin(); if (!admin.tableExists(TableName.valueOf("student2"))) { LOG.info("Creating table..."); // admin.createTable(htd); // 創建一個預劃分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits); LOG.info(String.valueOf(admin.getClusterStatus())); LOG.info(String.valueOf(admin.listNamespaceDescriptors())); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
2,測試往hbase表中put數據
(1)先通過connnection以及tableName獲得Table對象
(2)再構造Put對象時,Put put = new Put(rowkey),在構造方法裏面的是hbase表的唯一鍵rowkey(相當於mysql中的id)。 此外還要指定兩個參數,分別是familyName 列族的名字 ,qulifiers列名。這邊需要註意的是tableName以及familyName是在創建表時就已經確定好了的。而hbase中的列名qulifers不同於mysql中的列名,mysql中的列名是在創建mysql表時就已經確定的。qulfiers可以在插入輸入時隨意指定名稱,不是預定義的。
(3)hbase存儲的數據是不帶數據類型的,全是Bytes。所以在插入hbase表前,需要調用Byte.toBytes()方法,將數據轉成Bytes後再進行插入。
@Test public void testPut() { // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // byte[] familyName1 = Bytes.toBytes(""); // Specify the column name. byte[][] qualifiers = { Bytes.toBytes("colu1") // Bytes.toBytes("l2"), // Bytes.toBytes("l3") }; Table table = null; try { // Instantiate an HTable object. table = connection.getTable(TableName.valueOf("student2")); List<Put> puts = new ArrayList<Put>(); for(int i=1;i<10;i=i+1){ // Instantiate a Put object. String rowkey = UUID.randomUUID().toString();
Put put = new Put(Bytes.toBytes(rowkey)); //high_risk put.addColumn(familyName, qualifiers[0], Bytes.toBytes("tommy1")); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(80)); // put.addColumn(familyName, qualifiers[0], Bytes.toBytes(UUID.randomUUID().toString())); //temporary_plate puts.add(put); } // Submit a put request. table.put(puts); LOG.info("Put successfully."); } catch (IOException e) { LOG.error("Put failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testPut."); }
3,從hbase中獲取數據
獲取數據的方式一共有兩種:
(1)用hbase中的唯一鍵:rowkey,通過get方法獲取數據
Get get = new Get(rowkey) ,通過指定的rowkey來構造Get對象,另外在還可以通過Get對象指定你需要獲取的列名get.addColumn(familyName,qulifiers)
這個方法中需要註意的地方在: 獲得cell對象後:(for(Cell cell: results.rawCells()) ) ,從cell中取出數據時,需要將hbase的存儲的數據類型Bytes還原成原來的數據類型。 CellUtil.cloneValue(cell)拿到的是Bytes類型的數據,需要通過Bytes.toString() 或Bytes.toInt()等不同的方法轉為原來的類型。
例如:插入時原始數據是Int,那麽必須調用Bytes.toInt()的方法還原回int,初始數據如果是long類型,必須調用Bytes.toLong()的方法還原回Long, 總結來說就是插入hbase之前是什麽樣的,出來必須與插入前是相同的數據類型。(否則的話,取出來的數據是亂碼的)
@Test public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("f1"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("l1")}; // Specify RowKey. byte[] rowKey = Bytes.toBytes("105f1fd2-7048-4fd3-8c7a-65cf04542be2"); Table table = null; try { // Create the Configuration instance. table = connection.getTable(TableName.valueOf("table")); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell)));
// + Bytes.toLong(CellUtil.cloneValue(cell));
// + Bytes.toInt(CellUtil.cloneValue(cell));
// + Bytes.toBoolean(CellUtil.cloneValue(cell));
} LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }
(2)構建簡單的scan對象掃描獲取數據
這邊也可以構建簡單的scan對象,通過表掃描的來獲取數據,獲得數據的數據類型也是Bytes類型的。關於scan對象我們還可以設置scan的開始rowkey, scan的結束rowkey,scan可以設置緩存大小,可以設置rowkey過濾器,column過濾器等等過濾器。這些會在下一篇章記錄。
@Test public void testScanData() { logger.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = connection.getTable(TableName.valueOf(TABLENAME)); // Instantiate a Get object. Scan scan = new Scan(); // scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l1")); // scan.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("l2")); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { logger.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + "\"" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\":"+"\"" + Bytes.toString(CellUtil.cloneValue(cell))+"\""); } } logger.info("Scan data successfully."); } catch (IOException e) { logger.error("Scan data failed ", e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { logger.error("Close table failed ", e); } } } logger.info("Exiting testScanData."); }
HBase 常用java api獲得客戶端,創建表,查詢,刪除