Spark DataFrame vector 類型存儲到Hive表
阿新 • • 發佈:2018-07-18
ont afr analysis iss context 有一個 wdf com col
1. 軟件版本
軟件 | 版本 |
---|---|
Spark | 1.6.0 |
Hive | 1.2.1 |
2. 場景描述
在使用Spark時,有時需要存儲DataFrame數據到Hive表中,一般的存儲方式如下:
// 註冊臨時表
myDf.registerTempTable("t1")
// 使用SQLContext從臨時表創建Hive表
sqlContext.sql("create table h1 as select * from t1")
在DataFrame中存儲一般的數據類型,比如Double、Float、String等到Hive表是沒有問題的,但是在DataFrame中還有一個數據類型:vector , 如果存儲這種類型到Hive表那麽會報錯,類似:
org.apache.spark.sql.AnalysisException: cannot resolve ‘cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)‘
due to data type mismatch: cannot cast org.apache.spark.mllib.linalg.VectorUDT@f71b0bce to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));
這個錯誤如果搜索的話,可以找到類似這種結果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)
也即是說暫時使用Spark是不能夠直接存儲vector類型的DataFrame到Hive表的,那麽有沒有一種方法可以存儲呢?
想到這裏,那麽在Spark中是有一個工具類VectorAssembler 可以達到相反的目的,即把多個列(也需要要求這些列的類型是一致的)合並成一個vector列。但是並沒有相反的工具類,也就是我們的需求。
3. 問題的迂回解決方法
這裏提出一個解決方法如下:
假設:
1. DataFrame中數據類型是vector的列中的數據類型都是已知的,比如Double,數值類型;
2. vector列中的具體子列個數也是已知的;
有了上面兩個假設就可以通過構造RDD[Row]以及schema的方式來生成新的DataFrame,並且這個新的DataFrame的類型是基本類型,如Double。這樣就可以保存到Hive中了。
4. 示例
本例流程如下:
代碼如下:
// 1.讀取數據
val data = sqlContext.sql("select * from normalize")
讀取數據如下:
// 2.構造vector數據
import org.apache.spark.ml.feature.VectorAssembler
val cols = data.schema.fieldNames
val newFeature = "fea"
val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature)
val newDf = asb.transform(data)
newDf.show(1)
// 3.做歸一化
import org.apache.spark.ml.feature.Normalizer
val norFeature ="norF"
val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0)
val l1NormData = normalizer.transform(newDf)
l1NormData.show(1)
// 存儲DataFrame vector類型報錯
// l1NormData.select(norFeature).registerTempTable("t1")
// sqlContext.sql("create table h2 as select * from t1")
// 4.扁平轉換vector到row
import org.apache.spark.sql.Row
val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray))
val finalDf = sqlContext.createDataFrame(finalRdd,data.schema)
finalDf.show(1)
// 5. 存儲到Hive中
finalDf.registerTempTable("t1")
sqlContext.sql("create table h1 as select * from t1")
Spark DataFrame vector 類型存儲到Hive表