1. 程式人生 > 程式設計 >Spark On Hbase的官方jar包編譯與使用

Spark On Hbase的官方jar包編譯與使用

前言

找了一番,Spark讀寫HBase已經有專門的 Maven 依賴包可用,HBase提供了一個HBase Spark Connector專案,hbase官網檔案提到這個專案可從原始碼編譯。這樣就有類似spark-kafka,spark-hive的hbase-spark依賴了。mvn庫現在提供了一個1.0版本https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark/1.0.0,其Spark為2.4.0,Scala為2.11.12,其他版本需要自行編譯。

官網1.0jar maven依賴:

<!-- https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark -->
<dependency>
    <groupId>org.apache.hbase.connectors.spark</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>1.0.0</version>
</dependency>

複製程式碼

非常容易和這個專案混淆:https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark,這個不是HBase Connector Spark專案!

編譯 hbase-spark 原始碼

原始碼地址

Apache Hbase維護的專案,從此處下載原始碼壓縮包: Hbase Connectors-Spark原始碼

Apache HBase™ Spark Connector

  • Scala and Spark Versions

To generate an artifact for a different spark version and/or scala version,pass command-line options as follows (changing version numbers appropriately):

$ mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 clean install
複製程式碼

準備編譯出Spark2.3.1&Scala2.11.8的hbase-spark依賴:

unzip hbase-connectors-master.zip
cd hbase-connectors-master/
mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 clean install
複製程式碼

編譯報錯

  1. maven版本過低,安裝一下maven3.5.4,配置環境變數即可。
  2. TestJavaHBaseContext Failed,編譯時新增 -DskipTests即可
    最終編譯語句:

mvn -Dspark.version=2.3.1 -Dscala.version=2.11.8 -Dscala.binary.version=2.11 -DskipTests clean install

編譯成功

位置:~/hbase-connectors-master/spark/hbase-spark/target

就是hbase-spark-1.0.1-SNAPSHOT.jar,現在可以在專案中使用了。

使用hbase-spark-1.0.1-SNAPSHOT.jar

使用HBaseContext寫資料

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}

//從編譯的hbase-spark-1.0.1-SNAPSHOT.jar中引入
import org.apache.hadoop.hbase.spark.HBaseContext

import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util.UUID
/**
  * 引入了從Hbase官網編譯的hbase-spark jar,呼叫HBaseContext
  * Spark批量寫資料到HBase
  */
object SparkWithHBase {
  def main(args: Array[String]): Unit = {
    //Spark統一入口
    val spark = SparkSession.builder()
      .appName("Spark JDBC Test")
      .master("local")
      .getOrCreate()
    //列族名稱
    val SRC_FAMILYCOLUMN = "info"
    //Hbase配置
    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum","manager.bigdata")
    config.set("hbase.zookeeper.property.clientPort","2181")
    //Hbase上下文,是API的核心
    val hbaseContext = new HBaseContext(spark.sparkContext,config)
    //讀取資料來源,封裝成<RowKey,Values>這種格式
    val rdd: RDD[(String,Array[(String,String)])] = spark.read.csv("hdfs://manager.bigdata:8020/traffic.txt")
      .rdd
      .map(r => {
        (UUID.randomUUID().toString,Array((r.getString(0),"c1"),(r.getString(1),"c2"),(r.getString(2),"c3")))
      })
    //使用批量put方法寫入資料
    hbaseContext.bulkPut[(String,String)])](rdd,TableName.valueOf("spark_hbase_bulk_put"),row => {
        val put = new Put(Bytes.toBytes(row._1))
        row._2.foreach(putValue => put.addColumn(
          Bytes.toBytes(SRC_FAMILYCOLUMN),Bytes.toBytes(putValue._2),Bytes.toBytes(putValue._1)))
        put
      })

  }
}
複製程式碼

經查詢,資料成功寫入HBase。