Spark DataFrame寫入HBASE常用方式
Spark是目前最流行的分散式計算框架,而HBase則是在HDFS之上的列式分散式儲存引擎,基於Spark做離線或者實時計算,資料結果儲存在HBase中是目前很流行的做法。例如使用者畫像、單品畫像、推薦系統等都可以用HBase作為儲存媒介,供客戶端使用。
因此Spark如何向HBase中寫資料就成為很重要的一個環節了。本文將會介紹三種寫入的方式。程式碼在spark 2.2.0版本親測。
1. 基於HBase API批量寫入
第一種是最簡單的使用方式了,就是基於RDD的分割槽,由於在spark中一個partition總是儲存在一個excutor上,因此可以建立一個HBase連線,提交整個partition的內容。
程式碼如下:
rdd.foreachPartition { records => val config = HBaseConfiguration.create config.set("hbase.zookeeper.property.clientPort", "2181") config.set("hbase.zookeeper.quorum", "a1,a2,a3") val connection = ConnectionFactory.createConnection(config) val table = connection.getTable(TableName.valueOf("rec:user_rec")) // 舉個例子而已,真實的程式碼根據records來 val list = new java.util.ArrayList[Put] for(i <- 0 until 10){ val put = new Put(Bytes.toBytes(i.toString)) put.addColumn(Bytes.toBytes("t"), Bytes.toBytes("aaaa"), Bytes.toBytes("1111")) list.add(put) } // 批量提交 table.put(list) // 分割槽資料寫入HBase後關閉連線 table.close() }
這樣每次寫的程式碼很多,顯得不夠友好,如果能跟dataframe儲存parquet、csv之類的就好了。下面就看看怎麼實現dataframe直接寫入hbase吧!
2. Hortonworks的SHC寫入
由於這個外掛是hortonworks提供的,maven的中央倉庫並沒有直接可下載的版本。需要使用者下載原始碼自己編譯打包,如果有maven私庫,可以上傳到自己的maven私庫裡面。具體的步驟可以參考如下:
2.1 下載原始碼、編譯、上傳
下載完成後,如果有自己的私庫,可以修改shc中的distributionManagement。然後點選旁邊的maven外掛deploy釋出工程,如果只想打成jar包,那就直接install就可以了。
2.2 引入
在pom.xml中引入:
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.2-2.2-s_2.11-SNAPSHOT</version>
</dependency>
2.3 編寫應用程式
首先建立應用程式,Application.scala
object Application {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("normal").getOrCreate()
spark.sparkContext.setLogLevel("warn")
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
val df:DataFrame = spark.createDataFrame(data)
df.write
.mode(SaveMode.Overwrite)
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
def catalog = s"""{
|"table":{"namespace":"rec", "name":"user_rec"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"t", "col":"col1", "type":"boolean"},
|"col2":{"cf":"t", "col":"col2", "type":"double"},
|"col3":{"cf":"t", "col":"col3", "type":"float"},
|"col4":{"cf":"t", "col":"col4", "type":"int"},
|"col5":{"cf":"t", "col":"col5", "type":"bigint"},
|"col6":{"cf":"t", "col":"col6", "type":"smallint"},
|"col7":{"cf":"t", "col":"col7", "type":"string"},
|"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
}
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i</span>: <span class="hljs-variable">$t",
i.toByte)
}
}
然後再resources目錄下,新增hbase-site.xml、hdfs-site.xml、core-site.xml等配置檔案。主要是獲取Hbase中的一些連線地址。
3. HBase 2.x+即將釋出的hbase-spark
如果有瀏覽官網習慣的同學,一定會發現,HBase官網的版本已經到了3.0.0-SNAPSHOT,並且早就在2.0版本就增加了一個hbase-spark模組,使用的方法跟上面hortonworks一樣,只是format的包名不同而已,猜想就是把hortonworks給拷貝過來了。
另外Hbase-spark 2.0.0-alpha4目前已經公開在maven倉庫中了。