1. 程式人生 > >ScalaHbase 使用scala 操作hbase

ScalaHbase 使用scala 操作hbase

package DAO

import java.io.IOException
import java.util.List
import java.util.UUID
import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import 
org.apache.hadoop.hbase.filter.SubstringComparator import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.slf4j.Logger import org.slf4j.LoggerFactory object ScalaHbase { //private val LOG: Logger = LoggerFactory.getLogger(classOf[ScalaHbase])
def LOG = LoggerFactory.getLogger(getClass) def getHbaseConf: Configuration = { val conf: Configuration = HBaseConfiguration.create conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("spark.executor.memory", "3000m") conf.set("hbase.zookeeper.quorum", Constant.HBASE_ZOOKEEPER
) conf.set("hbase.master", Constant.HBASE_MASTER) conf.set("hbase.rootdir", Constant.HBASE_ROOTDIR) conf } @throws(classOf[MasterNotRunningException]) @throws(classOf[ZooKeeperConnectionException]) @throws(classOf[IOException]) def createTable(conf: Configuration,hbaseconn: Connection, tablename: String, columnFamily: Array[String]) { val admin: HBaseAdmin = hbaseconn.getAdmin.asInstanceOf[HBaseAdmin] if (admin.tableExists(tablename)) { LOG.info(tablename+" Table exists!") // val tableDesc: HTableDescriptor = new HTableDescriptor(TableName.valueOf(tablename)) // tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation") } else { val tableDesc: HTableDescriptor = new HTableDescriptor(TableName.valueOf(tablename)) tableDesc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation") for ( i <- 0 to (columnFamily.length - 1)) { val columnDesc: HColumnDescriptor = new HColumnDescriptor(columnFamily(i)); tableDesc.addFamily(columnDesc); } admin.createTable(tableDesc) LOG.info(tablename+" create table success!") } admin.close } @throws(classOf[IOException]) def addRow(table: Table, rowKey: String, columnFamily: String, key: String, value: String) { // LOG.info("put '" + rowKey + "', '" + columnFamily + ":" + key + "', '" + value + "'") val rowPut: Put = new Put(Bytes.toBytes(rowKey)) if (value == null) { rowPut.addColumn(columnFamily.getBytes, key.getBytes, "".getBytes) } else { rowPut.addColumn(columnFamily.getBytes, key.getBytes, value.getBytes) } table.put(rowPut) } @throws(classOf[IOException]) def putRow(rowPut: Put,table: HTable, rowKey: String, columnFamily: String, key: String, value: String) { // val rowPut: Put = new Put(Bytes.toBytes(rowKey)) // println("put '" + rowKey + "', '" + columnFamily + ":" + key + "', '" + value + "'") if (value == null) { rowPut.add(columnFamily.getBytes, key.getBytes, "".getBytes) } else { rowPut.add(columnFamily.getBytes, key.getBytes, value.getBytes) } table.put(rowPut) } @throws(classOf[IOException]) def getRow(table: HTable, rowKey: String): Result = { val get: Get = new Get(Bytes.toBytes(rowKey)) val result: Result = table.get(get) return result } /** * 鎵歸噺娣誨姞鏁版? * * @param list * @throws IOException */ def addDataBatch(table: HTable, list: List[Put]) { try { table.put(list) } catch { case e: RetriesExhaustedWithDetailsException => { LOG.error(e.getMessage) } case e: IOException => { LOG.error(e.getMessage) } } } /** * 鏌ヨ鍏ㄩ儴 */ def queryAll(table: HTable) { val scan: Scan = new Scan try { val s = new Scan() val results = table.getScanner(s) } catch { case e: IOException => { LOG.error(e.toString) } } } def queryBySingleColumn(table: HTable, queryColumn: String, value: String, columns: Array[String]): ResultScanner = { if (columns == null || queryColumn == null || value == null) { return null } try { val filter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(queryColumn), Bytes.toBytes(queryColumn), CompareOp.EQUAL, new SubstringComparator(value)) val scan: Scan = new Scan for (columnName <- columns) { scan.addColumn(Bytes.toBytes(columnName), Bytes.toBytes(columnName)) } scan.setFilter(filter) return table.getScanner(scan) } catch { case e: Exception => { LOG.error(e.toString) } } return null } // @throws(classOf[Exception]) // def main(args: Array[String]) { // val conf: Configuration = ScalaConn.getHbaseConf // val table: HTable = new HTable(conf, "test") // var rowPut:Put = null // try { // val familyColumn: Array[String] = Array[String]("test1", "test2") // createTable(conf, "test", familyColumn) // val uuid: UUID = UUID.randomUUID // val s_uuid: String = uuid.toString // if(rowPut == null){ // rowPut = new Put(Bytes.toBytes(s_uuid)) // } // ScalaHbase.putRow(rowPut,table, s_uuid, "uuid", "col1", s_uuid) // rowPut = null; // ScalaHbase.getRow(table, s_uuid) // } // catch { // case e: Exception => { // if (e.getClass == classOf[MasterNotRunningException]) { // System.out.println("MasterNotRunningException") // } // if (e.getClass == classOf[ZooKeeperConnectionException]) { // System.out.println("ZooKeeperConnectionException") // } // if (e.getClass == classOf[IOException]) { // System.out.println("IOException") // } // e.printStackTrace // } // } finally { // if (null != table) { // table.close // } // } // } def dropTable(conf: Configuration, tableName: String) { try { val admin: HBaseAdmin = new HBaseAdmin(conf) admin.disableTable(tableName) admin.deleteTable(tableName) } catch { case e: MasterNotRunningException => { LOG.error(e.toString) } case e: ZooKeeperConnectionException => { LOG.error(e.toString) } case e: IOException => { LOG.error(e.toString) } } } }