1. 程式人生 > >Spark DataFrame vector 類型存儲到Hive表

Spark DataFrame vector 類型存儲到Hive表

ont afr analysis iss context 有一個 wdf com col

1. 軟件版本

軟件版本
Spark 1.6.0
Hive 1.2.1

2. 場景描述

在使用Spark時,有時需要存儲DataFrame數據到Hive表中,一般的存儲方式如下:

 // 註冊臨時表
 myDf.registerTempTable("t1")
 // 使用SQLContext從臨時表創建Hive表
 sqlContext.sql("create table h1 as select * from t1")

在DataFrame中存儲一般的數據類型,比如Double、Float、String等到Hive表是沒有問題的,但是在DataFrame中還有一個數據類型:vector , 如果存儲這種類型到Hive表那麽會報錯,類似:

org.apache.spark.sql.AnalysisException: cannot resolve cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) 
due to data type mismatch: cannot cast org.apache.spark.mllib.linalg.VectorUDT@f71b0bce to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true
), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));

這個錯誤如果搜索的話,可以找到類似這種結果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

也即是說暫時使用Spark是不能夠直接存儲vector類型的DataFrame到Hive表的,那麽有沒有一種方法可以存儲呢?
想到這裏,那麽在Spark中是有一個工具類VectorAssembler 可以達到相反的目的,即把多個列(也需要要求這些列的類型是一致的)合並成一個vector列。但是並沒有相反的工具類,也就是我們的需求。

3. 問題的迂回解決方法

這裏提出一個解決方法如下:
假設:
1. DataFrame中數據類型是vector的列中的數據類型都是已知的,比如Double,數值類型;
2. vector列中的具體子列個數也是已知的;
有了上面兩個假設就可以通過構造RDD[Row]以及schema的方式來生成新的DataFrame,並且這個新的DataFrame的類型是基本類型,如Double。這樣就可以保存到Hive中了。

4. 示例

本例流程如下:

技術分享圖片

代碼如下:

// 1.讀取數據
val data = sqlContext.sql("select * from normalize")

讀取數據如下:

技術分享圖片

// 2.構造vector數據
import org.apache.spark.ml.feature.VectorAssembler
val cols = data.schema.fieldNames
val newFeature = "fea"
val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature)
val newDf = asb.transform(data)
newDf.show(1)

技術分享圖片

// 3.做歸一化
import org.apache.spark.ml.feature.Normalizer
val norFeature ="norF"
val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0)
val l1NormData = normalizer.transform(newDf)
l1NormData.show(1)
// 存儲DataFrame vector類型報錯
// l1NormData.select(norFeature).registerTempTable("t1")
// sqlContext.sql("create table h2 as select * from t1")

技術分享圖片

// 4.扁平轉換vector到row
import org.apache.spark.sql.Row
val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray))
val finalDf = sqlContext.createDataFrame(finalRdd,data.schema)
finalDf.show(1)

技術分享圖片

// 5. 存儲到Hive中
finalDf.registerTempTable("t1")
sqlContext.sql("create table h1 as select * from t1")

技術分享圖片

Spark DataFrame vector 類型存儲到Hive表