1. 程式人生 > >Hbase 基礎API

Hbase 基礎API

rem else get 指定時間 新版 cal mem max 下一個

本文參考:https://www.cnblogs.com/skyl/p/4803738.html

package com.Hbase

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

object HbaseDDL {

  def main(args: Array[String]): Unit = {

    // 確認表是否存在

    def ensureHbaseTableExist(tableName:TableName) = {
      // 配置 Hbase
      val hbaseconf = HBaseConfiguration.create()
      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val ifExist = adminHbase.tableExists(tableName)

      ifExist
    }

    // 確認表是否存在 測試
//    val result = ensureHbaseTableExist("user_info1")
//    if (result) {
//      println("表存在")
//    } else {
//      println("表不存在")
//    }


    /**
      * Hbase建表  兩個參數
      * @param tableName   形式為 ns:tb  或者  tb    API 創建 namespace 機會不多,一般通過 hbase shell 創建
      * @param columnFamilys  cf1,cf2,cf3
      */
    def createHbaseTable(tableName:String, columnFamilys:String) = {

      // 配置 Hbase
      val hbaseconf = HBaseConfiguration.create()
      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      // 列簇逗號分隔
      val CFS = columnFamilys.split(",")

      // 表名,判斷是否帶了namespace,帶了則判斷是否存在 namespace, 不存在則創建
      val nameSpace = tableName.split(":")(0)


      if (nameSpace != tableName) {
        adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build())
        println("NameSpace 創建成功!")
      }
      // 判斷表是否存在,不存在新建,存在則提示
      if (!ensureHbaseTableExist(TableName.valueOf(tableName))) {

        // 實例化 HTableDescriptor
        val htable = new HTableDescriptor(TableName.valueOf(tableName))

        // 循環添加所有列簇
        for ( columnFamily <- CFS) {

          // 實例化 HColumnDescriptor
          val htableColumnFamily1 = new HColumnDescriptor((columnFamily))
          // 調用 HColumnDescriptor 設置列簇屬性
          htableColumnFamily1.setMaxVersions(3)
          // 表增加列族
          htable.addFamily(new HColumnDescriptor(columnFamily))
        }
        // 創建表
        adminHbase.createTable(htable)
        println("表創建成功")
      } else {
        println("表已存在")
      }

       adminHbase.close()

    }

    // 測試建表
   // createHbaseTable("scTable3", "info,base")


    /**
      *  列出所有表
      */
    def listAllHbaseTable() ={
      // 配置 Hbase
      val hbaseconf = HBaseConfiguration.create()
      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val listTables =  adminHbase.listTableNames()

      for(table <- listTables){
        println(table)
      }
      adminHbase.close()
    }

    //listAllHbaseTable()

    /**
      * 刪除一張表,輸入表名
      * 判斷是否存在,是否失效,否則不能刪除
      *
      * @param tableName
      */
    def deleteHbaseTable(tableName: String) ={

      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)

      if(ensureHbaseTableExist(tbName)){

        // 若表不失效,則使失效
        if(!adminHbase.isTableDisabled(tbName)){
          adminHbase.disableTable(tbName)
        }

        adminHbase.deleteTable(tbName)
        println("刪除成功")
      } else {
        println("表不存在")
      }

      adminHbase.close()
    }

    //deleteHbaseTable("scTable3")


    /**
      *
      * 刪除表的某個列族  ,得到 HTableDescriptor , 調用該類的 removeFamily 方法
      * @param tableName   表名  --->   ns:tb   or    tb   [String]
      * @param columnFamily   列族名  --->   String
      */
    def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={

      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)
      // disable table
      adminHbase.disableTable(tbName)

      // get HTableDescriptor

      val htd = adminHbase.getTableDescriptor(tbName)

      // delete family
      htd.removeFamily(columnFamily.getBytes())

      // apply htd to table
      adminHbase.modifyTable(tbName, htd)

      // enable table
      adminHbase.enableTable(tbName)

      println("刪除成功")

      adminHbase.close()

    }

   // deleteHbaseColumnFamily("scTable3", "base")


    /**
      * 給表增加列族    先得到表的 HTableDescriptor, 然後使用 HColumnDescriptor 初始化 新增列,並設置屬性
      * 調用 HTableDescriptor 的 addFamily 方法,將初始化好的 HCD 添加到  HTableDescriptor ,然後使用admin 的 modifyTable 方法將修改應用
      * @param tableName
      * @param columnFamily
      */
    def addHbaseColumnFamily(tableName:String, columnFamily:String) ={

      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)
      // disable table
      adminHbase.disableTable(tbName)

      // get HTableDescriptor
      val htd = adminHbase.getTableDescriptor(tbName)

      //
      val hcd = new HColumnDescriptor(columnFamily)
      hcd.setMaxVersions(3)

      // add family
      htd.addFamily(hcd)

      // apply htd to table
      adminHbase.modifyTable(tbName, htd)

      // enable table
      adminHbase.enableTable(tbName)

      println("添加成功")

      adminHbase.close()

    }

    //addHbaseColumnFamily("scTable3", "base")


    /**
      * 修改列簇功能  get 到  HTableDescriptor ,再 get 到 Family ,設置 Family ,admin modifyTable  應用
      * @param tableName
      * @param columnFamily
      */
    def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String)  ={
      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)

      adminHbase.disableTable(tbName)

      val htd = adminHbase.getTableDescriptor(tbName)

      val modifyCol  = htd.getFamily(columnFamily.getBytes())
      modifyCol.setMaxVersions(3)

      adminHbase.modifyTable(tbName,htd)

      adminHbase.enableTable(tbName)

      adminHbase.close()

      println("修改成功!")
    }

    //modifyHbaseTableColumnFamily("scTable3", "info")

    /**
      * 插入數據,五個參數
      * @param tableName
      * @param columnFamily
      * @param column
      * @param rowkey
      * @param value
      */
    def putDataHbaseTable(tableName:String, columnFamily:String, column:String,
                          rowkey:String, value:String) ={
      val hbaseconf = HBaseConfiguration.create()

      // table
      val hTable = new HTable(hbaseconf, tableName)
      // row key
      val putData = new Put(rowkey.getBytes())
      // put value
      putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes())

      /**
        * 插入方式
        * ASYNC_WAL : 當數據變動時,異步寫WAL日誌
        * SYNC_WAL : 當數據變動時,同步寫WAL日誌
        * FSYNC_WAL : 當數據變動時,同步寫WAL日誌,並且,強制將數據寫入磁盤
        * SKIP_WAL : 不寫WAL日誌
        * USE_DEFAULT : 使用HBase全局默認的WAL寫入級別,即 SYNC_WAL
        */
      putData.setDurability(Durability.SYNC_WAL)

      // put data to table
      hTable.put(putData)

      println("插入數據成功!")
      // close table
      hTable.close()
    }

    //putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala")

    def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String,
                             column:String = null)={

      val hbaseConf = HBaseConfiguration.create()

      val hTable = new HTable(hbaseConf, tableName)

      // 初始化 Delete ,表名後可接時間戳
      val deletaData = new Delete(rowkey.getBytes())

      /**
        *   1).刪除指定列的 最新版本 的數據:Delete addColumn (byte[] family, byte[] qualifier)
        *   2).刪除指定列的 指定版本 的數據:Delete addColumn (byte[] family, byte[] qualifier, long timestamp )
        *   3).刪除指定列的 所有版本 的數據:Delete addColumns (byte[] family, byte[] qualifier)
        *   4).刪除指定列的,時間戳 小於等於 給定時間戳的 所有版本 的數據:Delete addColumns (byte[] family, byte[] qualifier, long timestamp )
        *   5).刪除指定列族的所有列的 所有版本 數據:Delete addFamily (byte[] family)    默認使用當前時間的時間戳,時間戳大於當前時間的數據刪除不掉
        *   6).刪除指定列族的所有列中 時間戳 小於等於 指定時間戳 的所有數據:Delete addFamily (byte[] family, long timestamp)
        *   7).刪除指定列族中 所有列的時間戳 等於 指定時間戳的版本數據:Delete addFamilyVersion (byte[] family, long timestamp)
        */
      deletaData.addColumn(columnFamily.getBytes(),column.getBytes())
      //deletaData.addColumns(columnFamily.getBytes(),column.getBytes())
      //deletaData.addFamily(columnFamily.getBytes())
      hTable.delete(deletaData)

      println("刪除成功")
      hTable.close()

    }

   // deleteDataHbaseTable("scTable3", "rk0002", "info")

    def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={
      val hbaseCOnf =  HBaseConfiguration.create()

      val hTable = new HTable(hbaseCOnf, tableName)

      val getData = new Get(rowkey.getBytes())

      /**
        *    1). Get addFamily(byte[] family) 指定希望獲取的列族
        *    2). Get addColumn(byte[] family, byte[] qualifier) 指定希望獲取的列
        *    3). Get setTimeRange(long minStamp, long maxStamp) 設置獲取數據的 時間戳範圍
        *    4). Get setTimeStamp(long timestamp) 設置獲取數據的時間戳
        *    5). Get setMaxVersions(int maxVersions) 設定獲取數據的版本數
        *    6). Get setMaxVersions() 設定獲取數據的所有版本
        *    7). Get setFilter(Filter filter) 為Get對象添加過濾器
        *    8). void setCacheBlocks(boolean cacheBlocks) 設置該Get獲取的數據是否緩存在內存中
        */
      //getData.addFamily(columnFamily.getBytes())

      //getData.addColumn(columnFamily.getBytes(), column.getBytes())

      //getData.setTimeStamp("1535680422860".toLong)

      getData.setMaxVersions()
      val results = hTable.get(getData)

      for (result <- results.rawCells()){

        println( new String(CellUtil.cloneRow(result)) + "\t" +
          new String(CellUtil.cloneFamily(result)) + "\t" +
          new String(CellUtil.cloneQualifier(result)) + "\t" +
          new String(CellUtil.cloneValue(result)) + "\t" +
          result.getTimestamp)

      }
      hTable.close()

    }

    //getDataHbaseTable("scTable", "rk0002", "info", "age")

    def scanDataHbaseTable(tableName:String, startRow:String, stopRow:String,
                           columnFamily:String, column:String)={
      val hBaseConf = HBaseConfiguration.create()
      val hTable = new HTable(hBaseConf, tableName)

      /**
        *         1). 創建掃描所有行的Scan:Scan()
        *   2). 創建Scan,從指定行開始掃描:Scan(byte[] startRow)
        *   註意:如果指定行不存在,從下一個最近的行開始
        *   3). 創建Scan,指定起止行:Scan(byte[] startRow, byte[] stopRow)
        *   註意: startRow <= 結果集 < stopRow
        *   4). 創建Scan,指定起始行和過濾器:Scan(byte[] startRow, Filter filter)
        */
      val scanData = new Scan()

      val filter1 = new SingleColumnValueFilter(columnFamily.getBytes(), column.getBytes(), CompareOp.GREATER_OR_EQUAL, "60".getBytes() )

      /**
        * Scan setStartRow (byte[] startRow) 設置Scan的開始行,默認 結果集 包含該行。如果希望結果集不包含該行,可以在行鍵末尾加上0。
        * Scan setStopRow (byte[] stopRow) 設置Scan的結束行,默認 結果集 不包含該行。如果希望結果集包含該行,可以在行鍵末尾加上0。
        * Scan setBatch(int batch) 指定最多返回的Cell數目.用於防止一行中有過多的數據,導致OutofMemory錯誤
        * Scan setTimeRange (long minStamp, long maxStamp) 掃描指定 時間範圍 的數據
        * Scan setTimeStamp (long timestamp) 掃描 指定時間 的數據
        * Scan addColumn (byte[] family, byte[] qualifier) 指定掃描的列
        * Scan addFamily (byte[] family) 指定掃描的列族
        * Scan setFilter (Filter filter) 為Scan設置過濾器,詳見HBase API Filter過濾器
        * Scan setReversed (boolean reversed) 設置Scan的掃描順序,默認是正向掃描(false),可以設置為逆向掃描(true)。註意:該方法0.98版本以後才可用!!
        * Scan setMaxVersions () 獲取所有版本的數據
        * Scan setMaxVersions (int maxVersions) 設置獲取的最大版本數! 不調用上下兩個setMaxVersions() 方法,只返回最新版本數據
        * void setCaching (int caching) 設定緩存在內存中的行數,緩存得越多,以後查詢結果越快,同時也消耗更多內存
        * void setRaw (boolean raw) 激活或者禁用raw模式。如果raw模式被激活,Scan將返回 所有已經被打上刪除標記但尚未被真正刪除 的數據。該功能僅用於激活了 KEEP_DELETED_ROWS的列族,即列族開啟了 hcd.setKeepDeletedCells(true)
        * Scan激活raw模式後,只能瀏覽所有的列,而不能指定任意的列,否則會報錯
        */
      scanData.setFilter(filter1)

      val resultsScan:ResultScanner = hTable.getScanner(scanData)


      while (resultsScan.iterator().hasNext){
        val results = resultsScan.iterator().next()
        for (result:Cell <- results.rawCells()) {

                    println(new String(CellUtil.cloneRow(result)) + "\t" +
                      new String(CellUtil.cloneFamily(result)) + "\t" +
                      new String(CellUtil.cloneQualifier(result)) + "\t" +
                      new String(CellUtil.cloneValue(result)) + "\t" +
                      result.getTimestamp)
                  }

      }

      /**
        * for 循環無法直接遍歷 ResultScanner 暫無辦法
        */
//      for(results:Result <- resultsScan){
//
//        for (result:Cell <- results.rawCells()) {
//
//          println(new String(CellUtil.cloneRow(result)) + "\t" +
//            new String(CellUtil.cloneFamily(result)) + "\t" +
//            new String(CellUtil.cloneQualifier(result)) + "\t" +
//            new String(CellUtil.cloneValue(result)) + "\t" +
//            result.getTimestamp)
//        }
//
//      }
      hTable.close()
    }

    //scanDataHbaseTable("scTable", "rk0001", "rk0002", "info", "age")

  }
}

  

Hbase 基礎API