如何呼叫Spark中的資料標準化庫
在大資料的學習過程中,總有很多小夥伴遇到不知如何呼叫Spark中的資料標準庫,本文的核心這不是在於介紹「資料標準化」,也不是在於實現「Spark呼叫」,畢竟這些概念大家應該耳濡目染了,至於呼叫方法一搜一大堆。今天這個問題也是科多大資料的一名學員提出來的,估計有很多人也遇到類似的問題,一併分享在此,希望可以幫到大家。
首先,我們先看一下Spark要做「標準化」的輸入資料樣式。
// 原始資料
+---+-----------------+
| id| features |
+---+-----------------+
| 0 |[1.0,0.5,-1.0]|
| 1 | [2.0,1.0,1.0]|
| 2 |[4.0,10.0,2.0]|
+---+-----------------+
看到這,我就不想去用了,除了簡單的DataFrame賦值,正常情況下的業務特徵都是一張寬表,或者是其他特徵工程的組合形式。
那有人會無聊去把資料的儲存形式儲存為向量型的呢?雖然也可以這樣做,但是我覺得不太方便去回顧資料。
其次,在無奈之下,我在使用DataFrame和「標準化庫」時,做了一個簡單的優化,具體如下所示:
// 原始資料
Userid,Feature1,Feature2,Feature3
import sqlContext.implicits._
//需要進行資料標準化的特徵(除Userid外)有:
val value = behavData.map(_.split(",")).map(record =>
{
var featureArray:Array[Double] = new Array[Double](3)
val userid = record(0)
val feature = ( for(i <- 1 until 3 ) yield record(i).toDouble ).toArray
val featureVector = Vectors.dense(feature)
(userid,featureVector)
}
).toDF("userid","featureSet")
這樣的話,我就可以直接將「原始資料」轉化為Spark標準化庫所要求的樣式了。
// 轉化資料
Userid,[Feature1,Feature2,Feature3]
提醒一下,其他向量型別還不行,必須是import org.apache.spark.mllib.linalg.Vectors;
令人反感的「資料輸入」解決了一半,我們再著手「資料輸出」,儘量讓後期的建模工作順暢起來。
// 這是其中一種標準化方法的資料輸出。
+------+----------------------+-------------------------------------------------------------------+
| id | features | scaledFeatures |
+------+-----------------------+------------------------------------------------------------------+
| 0 | [1.0,0.5,-1.0] | [0.654653670707,0.09352195,-0.654653670] |
| 1 | [2.0,1.0,1.0] | [1.3093073414159544,0.18704390,0.65465] |
| 2 | [4.0,10.0,2.0] | [2.618614682831909,1.87043905,1.309307] |
+-----+-------------------------+-----------------------------------------------------------------+
可能是我真的看不習慣,要說這結果輸出的靈活性太差也不為過,所以我又做了一個簡單的優化。
//將DataFrame轉換成RDD再儲存於HDFS上
val resultRDD = inputValue.rdd.map(record =>
{
val ouputResult = new StringBuilder()
ouputResult.append(record(0).toString()).append(",")
//呼叫字串StrDealOne函式
StrDealOne(record(1).toString()).split(",").map(records =>
{
ouputResult.append(round(records.toDouble,4)).append(",")
}
)
//呼叫字串StrDealTwo函式
StrDealTwo(ouputResult.toString())
}
)
其中
/**
* 字串處理(替換特殊字元、去掉字串末尾一位)
*/
def StrDealOne(InputValue:String):String = {
InputValue.replaceAll("\\(","").replaceAll("\\)","").replaceAll("\\[","").replaceAll("\\]","")
}
def StrDealTwo(InputValue:String):String = {
InputValue.substring(0, InputValue.toString().length()-1)
}
簡單來說,就是讓標準化後的資料恢復最初的Userid,Feature1,Feature2,Feature3格式,方便後期使用。
通過對資料「輸入」和「輸出」的簡單操作,我在後期想將數值型的特徵進行標準化時,就能很舒服去呼叫了。
通過以上的方法,就能輕鬆的呼叫Spark中的資料標準化庫了