1. 程式人生 > >Spark DataFrame寫入HBase的常用方式

Spark DataFrame寫入HBase的常用方式

Spark是目前最流行的分散式計算框架,而HBase則是在HDFS之上的列式分散式儲存引擎,基於Spark做離線或者實時計算,資料結果儲存在HBase中是目前很流行的做法。例如使用者畫像、單品畫像、推薦系統等都可以用HBase作為儲存媒介,供客戶端使用。

因此Spark如何向HBase中寫資料就成為很重要的一個環節了。本文將會介紹三種寫入的方式,其中一種還在期待中,暫且官網即可...

程式碼在spark 2.2.0版本親測

第一種是最簡單的使用方式了,就是基於RDD的分割槽,由於在spark中一個partition總是儲存在一個excutor上,因此可以建立一個HBase連線,提交整個partition的內容。

大致的程式碼是:

rdd.foreachPartition { records =>
    val config = HBaseConfiguration.create
    config.set("hbase.zookeeper.property.clientPort", "2181")
    config.set("hbase.zookeeper.quorum", "a1,a2,a3")
    val connection = ConnectionFactory.createConnection(config)
    val table = connection.getTable(TableName.valueOf("rec:user_rec"
))            val list = new java.util.ArrayList[Put]    for(i <- 0 until 10){        val put = new Put(Bytes.toBytes(i.toString))        put.addColumn(Bytes.toBytes("t"), Bytes.toBytes("aaaa"), Bytes.toBytes("1111"))        list.add(put)    }        table.put(list)        table.close() }

這樣每次寫的程式碼很多,顯得不夠友好,如果能跟dataframe儲存parquet、csv之類的就好了。下面就看看怎麼實現dataframe直接寫入hbase吧!

由於這個外掛是hortonworks提供的,maven的中央倉庫並沒有直接可下載的版本。需要使用者下載原始碼自己編譯打包,如果有maven私庫,可以上傳到自己的maven私庫裡面。具體的步驟可以參考如下:

2.1 下載原始碼、編譯、上傳

去官網github下載即可:https://github.com/hortonworks-spark/shc
可以直接按照下面的readme說明來,也可以跟著我的筆記走。

下載完成後,如果有自己的私庫,可以修改shc中的distributionManagement。然後點選旁邊的maven外掛deploy釋出工程,如果只想打成jar包,那就直接install就可以了。



2.2 引入

在pom.xml中引入:

<dependency>
    <groupId>com.hortonworks</groupId>
    <artifactId>shc-core</artifactId>
    <version>1.1.2-2.2-s_2.11-SNAPSHOT</version></dependency>

2.3

首先建立應用程式,Application.scala

object Application {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local").appName("normal").getOrCreate()
        spark.sparkContext.setLogLevel("warn")
        val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

        val df:DataFrame = spark.createDataFrame(data)
        df.write
          .mode(SaveMode.Overwrite)
          .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .save()
    }
    def catalog = s"""{
                   |"table":{"namespace":"rec", "name":"user_rec"},
                   |"rowkey":"key",
                   |"columns":{
                   |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                   |"col1":{"cf":"t", "col":"col1", "type":"boolean"},
                   |"col2":{"cf":"t", "col":"col2", "type":"double"},
                   |"col3":{"cf":"t", "col":"col3", "type":"float"},
                   |"col4":{"cf":"t", "col":"col4", "type":"int"},
                   |"col5":{"cf":"t", "col":"col5", "type":"bigint"},
                   |"col6":{"cf":"t", "col":"col6", "type":"smallint"},
                   |"col7":{"cf":"t", "col":"col7", "type":"string"},
                   |"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
                   |}
                   |}""".stripMargin
}case class HBaseRecord(
                  col0: String,
                  col1: Boolean,
                  col2: Double,
                  col3: Float,
                  col4: Int,
                  col5: Long,
                  col6: Short,
                  col7: String,
                  col8: Byte)

object HBaseRecord
{
  def apply(i: Int, t: String): HBaseRecord = {
    val s = s"""row${"%03d".format(i)}"""
    HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,
      i,
      i.toLong,
      i.toShort,
      s"String$i: $t",
      i.toByte)
  }
}


然後再resources目錄下,新增hbase-site.xml、hdfs-site.xml、core-site.xml等配置檔案。主要是獲取Hbase中的一些連線地址。

如果有瀏覽官網習慣的同學,一定會發現,HBase官網的版本已經到了3.0.0-SNAPSHOT,並且早就在2.0版本就增加了一個hbase-spark模組,使用的方法跟上面hortonworks一樣,只是format的包名不同而已,猜想就是把hortonworks給拷貝過來了。

另外Hbase-spark 2.0.0-alpha4目前已經公開在maven倉庫中了。

http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark

不過,內部的spark版本是1.6.0,太陳舊了!!!!真心等不起了...

期待hbase-spark官方能快點提供正式版吧。

  1. hortonworks-spark/shc github:https://github.com/hortonworks-spark/shc

  2. maven倉庫地址: http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark

  3. Hbase spark sql/ dataframe官方文件:https://hbase.apache.org/book.html#_sparksql_dataframes

轉載自:https://mp.weixin.qq.com/s/oZ59E01wmU71Zvt01pmYYw

相關推薦

Spark DataFrame寫入HBASE常用方式

Spark是目前最流行的分散式計算框架,而HBase則是在HDFS之上的列式分散式儲存引擎,基於Spark做離線或者實時計算,資料結果儲存在HBase中是目前很流行的做法。例如使用者畫像、單品畫像、推薦系統等都可以用HBase作為儲存媒介,供客戶端使用。 因

Spark DataFrame寫入HBase常用方式

Spark是目前最流行的分散式計算框架,而HBase則是在HDFS之上的列式分散式儲存引擎,基於Spark做離線或者實時計算,資料結果儲存在HBase中是目前很流行的做法。例如使用者畫像、單品畫像、推薦系統等都可以用HBase作為儲存媒介,供客戶端使用。因此Spark如何向H

spark踩坑——dataframe寫入hbase連接異常

查找 inux ron user ora nat 文件 cor 1.8 最近測試環境基於shc[https://github.com/hortonworks-spark/shc]的hbase-connector總是異常連接不到zookeeper,看下報錯日誌: 18/06/

spark.dataframe的一些常用操作(Scala)

前言 說起dataframe,大家一般會首先想起pandas.dataframe。隨著資料科學越來越火熱,大部分同學都使用過python去進行一些資料科學的實踐,也應該會對dataframe的簡單易用頗有好感。 然而pandas只能用於處理單機問題,面對工業級的海量資料處理和計算,就顯得

使用spark將資料以bulkload的方式寫入Hbase時報錯

Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region 從報錯日誌中可以很明顯看出因為Hfiles的個數超出了32預設的時32

Spark SQL初始化和創建DataFrame的幾種方式

hdf per () med 分析 exception vat 都是 tty 一、前述 1、SparkSQL介紹 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL產生的根本原因是其完全脫離了Hive的限制。

Spark:將DataFrame寫入Mysql

normal avi sqlt getc height serve saveas ecif access Spark將DataFrame進行一些列處理後,需要將之寫入mysql,下面是實現過程 1.mysql的信息 mysql的信息我保存在了外部的配置文件,這樣方便後續的配

Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)

 一:準備資料來源       在專案下新建一個student.txt檔案,裡面的內容為: print? <code class="language-java">1,zhangsan,20   2,lisi,21   3,wanger,1

Spark讀寫Hbase的二種方式對比

作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請註明出處 一.傳統方式 這種方式就是常用的TableInputFormat和TableOutputFormat來讀寫hbase,如下程式碼所示 簡單解釋下,用sc.newA

blukload方式將資料寫入HBase

package wondersgroup_0628.com import java.util.{Base64, Date} import com.wonders.TXmltmp import org.apache.hadoop.fs.Path import org.apache.hadoop

使用spark將資料寫入Hbase

--------------組裝xml並捕獲異常------------------- package wondersgroup_0628.com import java.io.{IOException, PrintWriter, StringReader, StringWriter} imp

Spark 建立DataFrame的三種方式

1.從資料庫讀資料建立DF /**SQLComtext 建立 DataFrame 1**/ def createDataFrame(sqlCtx: SQLContext): Unit = { val prop = new Properties() p

spark讀取kafka資料寫入hbase

package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa

spark運算結果寫入hbase及優化

在Spark中利用map-reduce或者spark sql分析了資料之後,我們需要將結果寫入外部檔案系統。 本文,以向Hbase中寫資料,為例,說一下,Spark怎麼向Hbase中寫資料。 首先,

spark rdd轉dataframe 寫入mysql的示例

   dataframe是在spark1.3.0中推出的新的api,這讓spark具備了處理大規模結構化資料的能力,在比原有的RDD轉化方式易用的前提下,據說計算效能更還快了兩倍。spark在離線批處理或者實時計算中都可以將rdd轉成dataframe進而通過簡

spark DataFrame 使用Java讀取mysql和寫入mysql的例子

例子 package com.hjh.demo.mysql; import java.util.Properties; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; impor

Spark將資料寫入Hbase以及從Hbase讀取資料

本文將介紹 1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入hbase 2、spark從hbase中讀取資料並轉化為RDD 操作方式為在eclipse本地執行spark連線到遠端的hbase。 ja

Spark RDD(DataFrame) 寫入到HIVE的程式碼實現

在實際工作中,經常會遇到這樣的場景,想將計算得到的結果儲存起來,而在Spark中,正常計算結果就是RDD。 而將RDD要實現注入到Hive表中,是需要進行轉化的。 關鍵的步驟,是將RDD轉化為一個SchemaRDD,正常實現方式是定義一個case class. 然後,

《深入理解Spark》之RDD轉換DataFrame的兩種方式的比較

package com.lyzx.day19 import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.{SparkConf, Spark

JS對象創建常用方式及原理分析

原型模式 這樣的 前言 values 一句話 開始 creat 動態原型 1-1 ====此文章是稍早前寫的,[email protected]/* */==== 前言 俗話說“在js語言中,一切都對象”,而且創建對象的方式也有很多種,所以今天我們做一下梳理 最