Hbase 基礎API
阿新 • • 發佈:2018-09-18
rem else get 指定時間 新版 cal mem max 下一個
本文參考:https://www.cnblogs.com/skyl/p/4803738.html
package com.Hbase import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.SingleColumnValueFilter object HbaseDDL { def main(args: Array[String]): Unit = { // 確認表是否存在 def ensureHbaseTableExist(tableName:TableName) = { // 配置 Hbase val hbaseconf = HBaseConfiguration.create() //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183" //hbaseconf.set("hbase.zookeeper.quorum", zkConn) val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val ifExist = adminHbase.tableExists(tableName) ifExist } // 確認表是否存在 測試 // val result = ensureHbaseTableExist("user_info1") // if (result) { // println("表存在") // } else { // println("表不存在") // } /** * Hbase建表 兩個參數 * @param tableName 形式為 ns:tb 或者 tb API 創建 namespace 機會不多,一般通過 hbase shell 創建 * @param columnFamilys cf1,cf2,cf3 */ def createHbaseTable(tableName:String, columnFamilys:String) = { // 配置 Hbase val hbaseconf = HBaseConfiguration.create() //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183" //hbaseconf.set("hbase.zookeeper.quorum", zkConn) val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() // 列簇逗號分隔 val CFS = columnFamilys.split(",") // 表名,判斷是否帶了namespace,帶了則判斷是否存在 namespace, 不存在則創建 val nameSpace = tableName.split(":")(0) if (nameSpace != tableName) { adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build()) println("NameSpace 創建成功!") } // 判斷表是否存在,不存在新建,存在則提示 if (!ensureHbaseTableExist(TableName.valueOf(tableName))) { // 實例化 HTableDescriptor val htable = new HTableDescriptor(TableName.valueOf(tableName)) // 循環添加所有列簇 for ( columnFamily <- CFS) { // 實例化 HColumnDescriptor val htableColumnFamily1 = new HColumnDescriptor((columnFamily)) // 調用 HColumnDescriptor 設置列簇屬性 htableColumnFamily1.setMaxVersions(3) // 表增加列族 htable.addFamily(new HColumnDescriptor(columnFamily)) } // 創建表 adminHbase.createTable(htable) println("表創建成功") } else { println("表已存在") } adminHbase.close() } // 測試建表 // createHbaseTable("scTable3", "info,base") /** * 列出所有表 */ def listAllHbaseTable() ={ // 配置 Hbase val hbaseconf = HBaseConfiguration.create() //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183" //hbaseconf.set("hbase.zookeeper.quorum", zkConn) val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val listTables = adminHbase.listTableNames() for(table <- listTables){ println(table) } adminHbase.close() } //listAllHbaseTable() /** * 刪除一張表,輸入表名 * 判斷是否存在,是否失效,否則不能刪除 * * @param tableName */ def deleteHbaseTable(tableName: String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) if(ensureHbaseTableExist(tbName)){ // 若表不失效,則使失效 if(!adminHbase.isTableDisabled(tbName)){ adminHbase.disableTable(tbName) } adminHbase.deleteTable(tbName) println("刪除成功") } else { println("表不存在") } adminHbase.close() } //deleteHbaseTable("scTable3") /** * * 刪除表的某個列族 ,得到 HTableDescriptor , 調用該類的 removeFamily 方法 * @param tableName 表名 ---> ns:tb or tb [String] * @param columnFamily 列族名 ---> String */ def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) // disable table adminHbase.disableTable(tbName) // get HTableDescriptor val htd = adminHbase.getTableDescriptor(tbName) // delete family htd.removeFamily(columnFamily.getBytes()) // apply htd to table adminHbase.modifyTable(tbName, htd) // enable table adminHbase.enableTable(tbName) println("刪除成功") adminHbase.close() } // deleteHbaseColumnFamily("scTable3", "base") /** * 給表增加列族 先得到表的 HTableDescriptor, 然後使用 HColumnDescriptor 初始化 新增列,並設置屬性 * 調用 HTableDescriptor 的 addFamily 方法,將初始化好的 HCD 添加到 HTableDescriptor ,然後使用admin 的 modifyTable 方法將修改應用 * @param tableName * @param columnFamily */ def addHbaseColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) // disable table adminHbase.disableTable(tbName) // get HTableDescriptor val htd = adminHbase.getTableDescriptor(tbName) // val hcd = new HColumnDescriptor(columnFamily) hcd.setMaxVersions(3) // add family htd.addFamily(hcd) // apply htd to table adminHbase.modifyTable(tbName, htd) // enable table adminHbase.enableTable(tbName) println("添加成功") adminHbase.close() } //addHbaseColumnFamily("scTable3", "base") /** * 修改列簇功能 get 到 HTableDescriptor ,再 get 到 Family ,設置 Family ,admin modifyTable 應用 * @param tableName * @param columnFamily */ def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) adminHbase.disableTable(tbName) val htd = adminHbase.getTableDescriptor(tbName) val modifyCol = htd.getFamily(columnFamily.getBytes()) modifyCol.setMaxVersions(3) adminHbase.modifyTable(tbName,htd) adminHbase.enableTable(tbName) adminHbase.close() println("修改成功!") } //modifyHbaseTableColumnFamily("scTable3", "info") /** * 插入數據,五個參數 * @param tableName * @param columnFamily * @param column * @param rowkey * @param value */ def putDataHbaseTable(tableName:String, columnFamily:String, column:String, rowkey:String, value:String) ={ val hbaseconf = HBaseConfiguration.create() // table val hTable = new HTable(hbaseconf, tableName) // row key val putData = new Put(rowkey.getBytes()) // put value putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes()) /** * 插入方式 * ASYNC_WAL : 當數據變動時,異步寫WAL日誌 * SYNC_WAL : 當數據變動時,同步寫WAL日誌 * FSYNC_WAL : 當數據變動時,同步寫WAL日誌,並且,強制將數據寫入磁盤 * SKIP_WAL : 不寫WAL日誌 * USE_DEFAULT : 使用HBase全局默認的WAL寫入級別,即 SYNC_WAL */ putData.setDurability(Durability.SYNC_WAL) // put data to table hTable.put(putData) println("插入數據成功!") // close table hTable.close() } //putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala") def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String, column:String = null)={ val hbaseConf = HBaseConfiguration.create() val hTable = new HTable(hbaseConf, tableName) // 初始化 Delete ,表名後可接時間戳 val deletaData = new Delete(rowkey.getBytes()) /** * 1).刪除指定列的 最新版本 的數據:Delete addColumn (byte[] family, byte[] qualifier) * 2).刪除指定列的 指定版本 的數據:Delete addColumn (byte[] family, byte[] qualifier, long timestamp ) * 3).刪除指定列的 所有版本 的數據:Delete addColumns (byte[] family, byte[] qualifier) * 4).刪除指定列的,時間戳 小於等於 給定時間戳的 所有版本 的數據:Delete addColumns (byte[] family, byte[] qualifier, long timestamp ) * 5).刪除指定列族的所有列的 所有版本 數據:Delete addFamily (byte[] family) 默認使用當前時間的時間戳,時間戳大於當前時間的數據刪除不掉 * 6).刪除指定列族的所有列中 時間戳 小於等於 指定時間戳 的所有數據:Delete addFamily (byte[] family, long timestamp) * 7).刪除指定列族中 所有列的時間戳 等於 指定時間戳的版本數據:Delete addFamilyVersion (byte[] family, long timestamp) */ deletaData.addColumn(columnFamily.getBytes(),column.getBytes()) //deletaData.addColumns(columnFamily.getBytes(),column.getBytes()) //deletaData.addFamily(columnFamily.getBytes()) hTable.delete(deletaData) println("刪除成功") hTable.close() } // deleteDataHbaseTable("scTable3", "rk0002", "info") def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={ val hbaseCOnf = HBaseConfiguration.create() val hTable = new HTable(hbaseCOnf, tableName) val getData = new Get(rowkey.getBytes()) /** * 1). Get addFamily(byte[] family) 指定希望獲取的列族 * 2). Get addColumn(byte[] family, byte[] qualifier) 指定希望獲取的列 * 3). Get setTimeRange(long minStamp, long maxStamp) 設置獲取數據的 時間戳範圍 * 4). Get setTimeStamp(long timestamp) 設置獲取數據的時間戳 * 5). Get setMaxVersions(int maxVersions) 設定獲取數據的版本數 * 6). Get setMaxVersions() 設定獲取數據的所有版本 * 7). Get setFilter(Filter filter) 為Get對象添加過濾器 * 8). void setCacheBlocks(boolean cacheBlocks) 設置該Get獲取的數據是否緩存在內存中 */ //getData.addFamily(columnFamily.getBytes()) //getData.addColumn(columnFamily.getBytes(), column.getBytes()) //getData.setTimeStamp("1535680422860".toLong) getData.setMaxVersions() val results = hTable.get(getData) for (result <- results.rawCells()){ println( new String(CellUtil.cloneRow(result)) + "\t" + new String(CellUtil.cloneFamily(result)) + "\t" + new String(CellUtil.cloneQualifier(result)) + "\t" + new String(CellUtil.cloneValue(result)) + "\t" + result.getTimestamp) } hTable.close() } //getDataHbaseTable("scTable", "rk0002", "info", "age") def scanDataHbaseTable(tableName:String, startRow:String, stopRow:String, columnFamily:String, column:String)={ val hBaseConf = HBaseConfiguration.create() val hTable = new HTable(hBaseConf, tableName) /** * 1). 創建掃描所有行的Scan:Scan() * 2). 創建Scan,從指定行開始掃描:Scan(byte[] startRow) * 註意:如果指定行不存在,從下一個最近的行開始 * 3). 創建Scan,指定起止行:Scan(byte[] startRow, byte[] stopRow) * 註意: startRow <= 結果集 < stopRow * 4). 創建Scan,指定起始行和過濾器:Scan(byte[] startRow, Filter filter) */ val scanData = new Scan() val filter1 = new SingleColumnValueFilter(columnFamily.getBytes(), column.getBytes(), CompareOp.GREATER_OR_EQUAL, "60".getBytes() ) /** * Scan setStartRow (byte[] startRow) 設置Scan的開始行,默認 結果集 包含該行。如果希望結果集不包含該行,可以在行鍵末尾加上0。 * Scan setStopRow (byte[] stopRow) 設置Scan的結束行,默認 結果集 不包含該行。如果希望結果集包含該行,可以在行鍵末尾加上0。 * Scan setBatch(int batch) 指定最多返回的Cell數目.用於防止一行中有過多的數據,導致OutofMemory錯誤 * Scan setTimeRange (long minStamp, long maxStamp) 掃描指定 時間範圍 的數據 * Scan setTimeStamp (long timestamp) 掃描 指定時間 的數據 * Scan addColumn (byte[] family, byte[] qualifier) 指定掃描的列 * Scan addFamily (byte[] family) 指定掃描的列族 * Scan setFilter (Filter filter) 為Scan設置過濾器,詳見HBase API Filter過濾器 * Scan setReversed (boolean reversed) 設置Scan的掃描順序,默認是正向掃描(false),可以設置為逆向掃描(true)。註意:該方法0.98版本以後才可用!! * Scan setMaxVersions () 獲取所有版本的數據 * Scan setMaxVersions (int maxVersions) 設置獲取的最大版本數! 不調用上下兩個setMaxVersions() 方法,只返回最新版本數據 * void setCaching (int caching) 設定緩存在內存中的行數,緩存得越多,以後查詢結果越快,同時也消耗更多內存 * void setRaw (boolean raw) 激活或者禁用raw模式。如果raw模式被激活,Scan將返回 所有已經被打上刪除標記但尚未被真正刪除 的數據。該功能僅用於激活了 KEEP_DELETED_ROWS的列族,即列族開啟了 hcd.setKeepDeletedCells(true) * Scan激活raw模式後,只能瀏覽所有的列,而不能指定任意的列,否則會報錯 */ scanData.setFilter(filter1) val resultsScan:ResultScanner = hTable.getScanner(scanData) while (resultsScan.iterator().hasNext){ val results = resultsScan.iterator().next() for (result:Cell <- results.rawCells()) { println(new String(CellUtil.cloneRow(result)) + "\t" + new String(CellUtil.cloneFamily(result)) + "\t" + new String(CellUtil.cloneQualifier(result)) + "\t" + new String(CellUtil.cloneValue(result)) + "\t" + result.getTimestamp) } } /** * for 循環無法直接遍歷 ResultScanner 暫無辦法 */ // for(results:Result <- resultsScan){ // // for (result:Cell <- results.rawCells()) { // // println(new String(CellUtil.cloneRow(result)) + "\t" + // new String(CellUtil.cloneFamily(result)) + "\t" + // new String(CellUtil.cloneQualifier(result)) + "\t" + // new String(CellUtil.cloneValue(result)) + "\t" + // result.getTimestamp) // } // // } hTable.close() } //scanDataHbaseTable("scTable", "rk0001", "rk0002", "info", "age") } }
Hbase 基礎API