1. 程式人生 > >Spark2.0操作Hbase

Spark2.0操作Hbase

讀寫Hbase的方法,這裡是通過Spark的RDD來操作的方法,通過Hbase API的方式是另一種,這裡不涉及。

首先配置pom,新增hbase依賴,一般Spark平臺不包含hbase的jar包,所以這些依賴不新增<scope>provided</scope>

maven相關部分如下:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <hbase.version>0.98.8-hadoop2</hbase.version>
        <spark.artifactId.version>2.11</spark.artifactId.version>
    </properties>

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.2</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-framework</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-recipes</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>curator-client</artifactId>
                    <groupId>org.apache.curator</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.2</version>
            <scope>provided</scope>
        </dependency>

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>                
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
            </exclusions>
        </dependency>

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>            
        </dependency>

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.98.8-hadoop2</version>
            <!-- <scope>provided</scope> -->
        </dependency>

 

然後看操作程式碼如下:

package mytest

import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

object hbaseSpark4 {
  val logger = LoggerFactory.getLogger(hbsaeSpark4.getClass)

  def main(args: Array[String]): Unit = {
    val tableName: String = "table1"
    val writeTableName: String = "table2"
    val pixFix: String = SevsConfig.getProp("hbase.table.family")

    val sparkconf = new SparkConf().setAppName("hbsaeSpark4")
      .set("HADOOP_USER_NAME", ("hbase.hadoop.username"))
      .set("HADOOP_GROUP_NAME", ("hbase.hadoop.groupname"))
      
    //.setMaster("local")

  
    val sc = new SparkContext(sparkconf)
    read_hbase(sc, tableName)
    logger.error("write to hbase #################")
    save_hase(sc, writeTableName)
    logger.error("ending ################")

  }

  def read_hbase(sc: SparkContext, tableName: String): Unit = {
    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.property.clientPort", "2015")
    configuration.set("zookeeper.znode.parent", "/hbase")
    configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com")   
    configuration.set(TableInputFormat.INPUT_TABLE, tableName)

    val startRowkey = "0600000003"
    val endRowkey = "8800000008"
    val scan = new Scan(Bytes.toBytes(startRowkey), Bytes.toBytes(endRowkey))
    //    scan.setCacheBlocks(false)
    //    scan.addFamily(Bytes.toBytes("ks"));
    //    scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data"))
    var proto = ProtobufUtil.toScan(scan)
    val scan_str = Base64.encodeBytes(proto.toByteArray())
    configuration.set(TableInputFormat.SCAN, scan_str)

    val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val resultRDD = rdd.map(tup => resultToList(tup))
    resultRDD.take(2).foreach(println)

  }

  def resultToList(tableValues: Tuple2[ImmutableBytesWritable, Result]): String = {
    val tup = tableValues._2
    val result = tableValues._2
    val rowKey = result.getRow()
    val rowId = Bytes.toString(rowKey).reverse
    rowId
  }

  def save_hase(sc: SparkContext, writeTableName: String): Unit = {
    val configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.property.clientPort", "2015")
    configuration.set("zookeeper.znode.parent", "/hbase")
    configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com")    
    configuration.set(TableOutputFormat.OUTPUT_TABLE, writeTableName)

    val job = new Job(configuration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val indataRDD = sc.makeRDD(Array("1,jack,15", "2,Lily,16", "3,mike,16"))
    val rdd = indataRDD.map(_.split(',')).map { arr =>
      {
        val put = new Put(Bytes.toBytes(arr(0)))
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
        (new ImmutableBytesWritable, put)
      }
    }

    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  }
}