1. 程式人生 > 程式設計 >Spark2.3讀寫Hbase2.0最新API實踐(CURD)

Spark2.3讀寫Hbase2.0最新API實踐(CURD)

前言

在一些大佬的部落格已查不到HBase2.x最新的實踐程式碼,從某書上貼上來的程式碼在新版本下執行不了,因此寫下本篇實踐,從HBase 1.4.2等老版本升級而來,想要使用Spark讀寫HBase2.0 API的可借鑑本文。ps:官網掛的示例也報錯!(因為沒找到依賴o(╯□╰)o)

程式碼環境

  • Spark 2.3.1 (2.2,2.3.x系列應該都能用)
  • HBase 2.0.0 (與Hbase 1.x系列不相容)
  • IDEA 2019.1 社群版

準備工作

HBase shell建立表

# hbase shell
> list //檢視錶
> create 'spark_hbase_src'
,'info' //建立一張資料來源表
複製程式碼

> create 'spark_hbase_res','info' //建立一張結果表,用來寫入計算結果
複製程式碼

以上兩張表就建立好了,簡單。

準備示例資料

資料模型:模擬路上車輛的經過記錄,為csv格式文字檔案(txt)

  • 欄位5個:車牌號、車牌顏色、拍照裝置編號、行駛方向、記錄時間
  • 對應英文:"number","color","device","direction","photo_time"
  • 示例資料:模擬資料僅供參考
車牌號 車牌顏色 裝置編號 行駛方向 記錄時間
豫A12345 藍色 D12C01 南北 2019/10/16 12:00:00
豫B12121 黃色 D13C06 南北 2019/10/10 12:11:00
豫C66666 藍色 D15C08 西東 2019/10/29 12:09:00
豫D11111 藍色 D18C07 北南 2019/10/18 12:15:00

自己模擬生成一些文字資料,上傳到hdfs,也可以在本機。

Maven依賴

HBase Server API

        <!-- Hbase server庫 提供Hbase讀寫API-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}
</version> </dependency> 複製程式碼

之前只需要這個HBase jar就可以了,實踐中有報錯:

錯誤1

  • Error 1:無法import org.apache.hadoop.hbase.mapreduce.TableInputFormat

解決辦法

匯入這個包:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>
複製程式碼

錯誤2

  • Error 2:找不到org.apache.htrace.SamplerBuilder類
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 17 more
複製程式碼

解決辦法

匯入這個包:

        <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>
複製程式碼

Spark等依賴

其他spark-core等依賴自行新增:

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        
        <!-- Spark核心庫 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--Spark sql庫 提供DF類API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
複製程式碼

Spark寫入HBase

程式碼實踐

ctrl+c自取:

import java.util.UUID

import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

import scala.util.Try

object SparkWriteHBase {
  val hbaseConfig = HBaseConfiguration.create()
  hbaseConfig.set("hbase.zookeeper.quorum","zk地址1,zk地址2,zk地址3")
  hbaseConfig.set("hbase.zookeeper.property.clientPort","2181")
  //根據自己叢集設定如下一行配置值
  config.set("zookeeper.znode.parent","/hbase-unsecure")
  //在IDE中設定此項為true,避免出現"hbase-default.xml"版本不匹配的執行時異常
  hbaseConfig.set("hbase.defaults.for.version.skip","true")

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Spark-HBase")
      .master("local[2]")
      .getOrCreate()
    //讀取的示例資料
    val data = spark.read.csv("hdfs://your-hdfs-host:8020/traffic.txt")
      .toDF("number","color","device","direction","photo_time")

    println("資料條數是:" + data.count())

    val SRC_FAMILYCOLUMN = "info"

    data.foreachPartition(p => {
      //獲取HBase連線
      val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
      val resultTable = TableName.valueOf("spark_hbase_src")
      //獲取表連線
      val table = hbaseConn.getTable(resultTable)
      p.foreach(r => {
        val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes("number"),Bytes.toBytes(r.getString(0)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes("color"),Bytes.toBytes(r.getString(1)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes("device"),Bytes.toBytes(r.getString(2)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes("direction"),Bytes.toBytes(r.getString(3)))
        put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes("photo_time"),Bytes.toBytes(r.getString(4)))

        Try(table.put(put)).getOrElse(table.close()) //將資料寫入HBase,若出錯關閉table
      })
      table.close()
      hbaseConn.close()
    })
  }
}
複製程式碼

寫操作結果檢視

寫入前後資料量對比:0 -> 1199:

Spark讀取HBase

程式碼實踐

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession

object SparkReadHbase {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Spark-HBase")
      .master("local")
      .getOrCreate()

    val hbaseConfig = HBaseConfiguration.create()
    hbaseConfig.set("hbase.zookeeper.quorum",zk地址3")
    hbaseConfig.set("hbase.zookeeper.property.clientPort","2181")
    //在IDE中設定此項為true,避免出現"hbase-default.xml"版本不匹配的執行時異常
    hbaseConfig.set("hbase.defaults.for.version.skip","true")
    hbaseConfig.set(TableInputFormat.INPUT_TABLE,"spark_hbase_src")

    val SRC_FAMILYCOLUMN = "info"

    //從hbase中讀取RDD
    val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConfig,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

    import spark.implicits._

    hbaseRDD.map({ case (_,result) =>
      //      val key = Bytes.toString(result.getRow)
      val number = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes,"number".getBytes))
      val color = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes,"color".getBytes))
      val device = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes,"device".getBytes))
      val direction = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes,"direction".getBytes))
      val photo_time = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes,"photo_time".getBytes))
      (number,color,device,direction,photo_time)
    }).toDF("number","photo_time").show(false)
  }
}
複製程式碼

執行結果

show()的列印截圖~成功讀取到HBase中的資料:


官網示例踩坑

官網的 Example 36. HBaseContext Usage Example 如下:

val sc = new SparkContext("local","test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc,config)
複製程式碼
  • 不知道HBaseContext是哪一個jar包引入的,官網沒指名用的什麼Maven依賴!(後文看到可從一個專案可編譯,mvn也提供了一個1.0版本jar包)
  • new SparkContext("local","test") 這種寫法是這個包獨有的。

2019-10-10編譯了一下這個原始碼得到這個jar,mvn官網也提供了一個1.0版本的 ↓↓↓

編譯Hbase Spark Connector指南

提供spark讀寫hbase的api,可作為hbase-server庫之外的另一種選擇↑↑↑
複製程式碼

使用Spark RDD寫HBase

由以下兩種,主要區別是使用的配置檔案物件不同

saveAsHadoopDataset

使用Hadoop JobConf配置,初始化JobConf用的TableOutputFormat類 是 org.apache.hadoop.hbase.mapred 包下的。

saveAsNewAPIHadoopDataset

使用Hadoop Configuration配置,使用的 TableInputFormat 類是 org.apache.hadoop.hbase.mapreduce 包下的

這兩個API的使用方法類似,示例如下:

程式碼實現

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

object SparkWriteHBaseByHadoopDataset {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("SparkWriteHBase2").master("local").getOrCreate()
    val sc = spark.sparkContext
    val tableName = "test_student"

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum","manager.bigdata,master.bigdata,worker.bigdata")
    config.set("hbase.zookeeper.property.clientPort","2181")
    config.set("hbase.defaults.for.version.skip","true")

    val inputDataRDD = sc.parallelize(Array("1,Jack,M,26","2,Rose,17")) //模擬構建兩行記錄的RDD
    val rdd = inputDataRDD.map(_.split(',')).map { arr => {
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值
      put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值
      put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值
      put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值
      (new ImmutableBytesWritable,put)
    }
    }

    // 初始化JobConf,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
    rdd.saveAsHadoopDataset(jobConf)

    //TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
    config.set(TableOutputFormat.OUTPUT_TABLE,tableName)
    val job = Job.getInstance(config)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}
複製程式碼

兩種API方法底層均呼叫SparkHadoopWriter物件的write方法,無效能差異。

Spark建立HBase表

核心API程式碼示例

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder,ConnectionFactory,TableDescriptorBuilder}

  def main(args: Array[String]): Unit = {

    val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
    val admin = hbaseConn.getAdmin

    //如果不存在就建立表
    if (!admin.tableExists(TableName.valueOf("test_hb_new_api"))) {
      val desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("test_hb_new_api"))
      //指定列簇 不需要建立列,列式儲存不需要建立列
      val cf1 = ColumnFamilyDescriptorBuilder.newBuilder("cf1".getBytes()).build()
      desc.setColumnFamily(cf1)
      admin.createTable(desc.build())
    }

  }
複製程式碼

僅僅建立表,不需要spark的參與,寫在Spark程式碼裡當然也可以執行!

已過時的API

val desc = new HTableDescriptor(TableName.valueOf("hb_test"))

//這些API已經被標記為Deprecated,將會在HBase3.0移除!

val hcd = new HColumnDescriptor("cf")

Spark刪除HBase表 (待續)

SparkSQL操作HBase (待續)