1. 程式人生 > >如何呼叫Spark中的資料標準化庫

如何呼叫Spark中的資料標準化庫

在大資料的學習過程中,總有很多小夥伴遇到不知如何呼叫Spark中的資料標準庫,本文的核心這不是在於介紹「資料標準化」,也不是在於實現「Spark呼叫」,畢竟這些概念大家應該耳濡目染了,至於呼叫方法一搜一大堆。今天這個問題也是科多大資料的一名學員提出來的,估計有很多人也遇到類似的問題,一併分享在此,希望可以幫到大家。

首先,我們先看一下Spark要做「標準化」的輸入資料樣式。

// 原始資料

+---+-----------------+

| id|  features  |

+---+-----------------+

| 0 |[1.0,0.5,-1.0]|

| 1 | [2.0,1.0,1.0]|

| 2 |[4.0,10.0,2.0]|

+---+-----------------+

看到這,我就不想去用了,除了簡單的DataFrame賦值,正常情況下的業務特徵都是一張寬表,或者是其他特徵工程的組合形式。

那有人會無聊去把資料的儲存形式儲存為向量型的呢?雖然也可以這樣做,但是我覺得不太方便去回顧資料。

其次,在無奈之下,我在使用DataFrame和「標準化庫」時,做了一個簡單的優化,具體如下所示:

// 原始資料

Userid,Feature1,Feature2,Feature3

import sqlContext.implicits._

//需要進行資料標準化的特徵(除Userid外)有:

val value = behavData.map(_.split(",")).map(record =>

{

var featureArray:Array[Double] = new Array[Double](3)

        val userid = record(0)

        val feature = ( for(i <- 1 until 3 ) yield record(i).toDouble ).toArray

        val featureVector = Vectors.dense(feature)

        (userid,featureVector)

}

).toDF("userid","featureSet")

這樣的話,我就可以直接將「原始資料」轉化為Spark標準化庫所要求的樣式了。

// 轉化資料

Userid,[Feature1,Feature2,Feature3]

提醒一下,其他向量型別還不行,必須是import org.apache.spark.mllib.linalg.Vectors;

令人反感的「資料輸入」解決了一半,我們再著手「資料輸出」,儘量讓後期的建模工作順暢起來。

// 這是其中一種標準化方法的資料輸出。

+------+----------------------+-------------------------------------------------------------------+

| id  |  features     |           scaledFeatures                  |

+------+-----------------------+------------------------------------------------------------------+

| 0  |  [1.0,0.5,-1.0]  |  [0.654653670707,0.09352195,-0.654653670]  |

| 1  |  [2.0,1.0,1.0]   |  [1.3093073414159544,0.18704390,0.65465]  |

| 2  |  [4.0,10.0,2.0]  |  [2.618614682831909,1.87043905,1.309307]  |

+-----+-------------------------+-----------------------------------------------------------------+

可能是我真的看不習慣,要說這結果輸出的靈活性太差也不為過,所以我又做了一個簡單的優化。

//將DataFrame轉換成RDD再儲存於HDFS上

val resultRDD = inputValue.rdd.map(record =>

{

val ouputResult = new StringBuilder()

        ouputResult.append(record(0).toString()).append(",")

//呼叫字串StrDealOne函式

StrDealOne(record(1).toString()).split(",").map(records =>

{

ouputResult.append(round(records.toDouble,4)).append(",")

}

)

        //呼叫字串StrDealTwo函式

        StrDealTwo(ouputResult.toString())

}

)

其中

/**

* 字串處理(替換特殊字元、去掉字串末尾一位)

*/

def StrDealOne(InputValue:String):String = {

InputValue.replaceAll("\\(","").replaceAll("\\)","").replaceAll("\\[","").replaceAll("\\]","")

}

  def StrDealTwo(InputValue:String):String = {

InputValue.substring(0, InputValue.toString().length()-1)

}

簡單來說,就是讓標準化後的資料恢復最初的Userid,Feature1,Feature2,Feature3格式,方便後期使用。

通過對資料「輸入」和「輸出」的簡單操作,我在後期想將數值型的特徵進行標準化時,就能很舒服去呼叫了。

通過以上的方法,就能輕鬆的呼叫Spark中的資料標準化庫了