1. 程式人生 > >Spark MLlib 1.6 -- 統計基礎篇

Spark MLlib 1.6 -- 統計基礎篇

2.1 統計概覽

Statistics類中提供基本列統計RDD[Vector]功能

colStats()返回MultivariateStatisticalSummary 的例項,這個例項可以按列計算最大,最小,均值,方差,非0個數統計,列的1範數。

importorg.apache.spark.mllib.linalg.Vector

importorg.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}

val observations:RDD[Vector]=...// an RDD of Vectors

// Compute column summary statistics.

val summary:MultivariateStatisticalSummary=Statistics.colStats(observations)

println(summary.mean)// a dense vector containing the mean value for each column

println(summary.variance)// column-wise variance

println(summary.numNonzeros)// number of nonzeros in each column

2.2 相關統計

計算兩個資料序列可以使向量或矩陣)的相關係數。在spark.mllib

中,我們提供成對計算相關係數,實現了Pearson’s相關和Spearman’s相關相關統計的結果依賴於計算物件如果是兩個RDD[Double]的計算,結果是Double型別,如果是兩個RDD[Vector]計算,結果是一個Matrix矩陣。

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.linalg._

importorg.apache.spark.mllib.stat.Statistics

val sc:SparkContext=...

val seriesX:RDD[Double]=...// a series

val seriesY:RDD[Double]=...// must have the same number of partitions and cardinality as seriesX

// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a 

// method is not specified, Pearson's method will be used by default. 

val correlation:Double=Statistics.corr(seriesX, seriesY,"pearson")

val data:RDD[Vector]=...// note that each Vector is a row and not a column

// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.

// If a method is not specified, Pearson's method will be used by default. 

val correlMatrix:Matrix=Statistics.corr(data,"pearson")

2.3 分層取樣(Stratified sampling) 

spark.mllib中提供計算原始RDD 鍵值對的分層取樣方法:sampleByKey 和 sampleByKeyExact 。在分層取樣中,鍵可以看做標籤類,相應的值可以看做屬性。如,鍵可以使男人或女人,文件ID,相應的值可以使人的年齡或文件的單次。 sampleByKey 方法隨機取樣一系列觀測值,過程就像逐個遍歷所有樣本點,通過拋銀幣決定取捨,因此只需要確定取樣點個數。sampleByKeyExact 比分層隨機取樣方法sampleByKey需要更多地樣本,才能保證取樣點個數有99.99%的置信度,sampleByKeyExact暫不支援python.

sampleByKeyExact() 取樣由[ f_k , n_k ] 完全決定, 對任意一個鍵屬於 K 鍵集合f_k是預期鍵對應取樣點值得佔比分數),n_k 是這個鍵k在整個集合中值的個數。無放回取樣(即取樣的資料取走,不會出現重複) 方法需要一個引數(withReplacement預設是false , 而又放回取樣方法需要兩個引數

importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._

importorg.apache.spark.rdd.PairRDDFunctions

valsc:SparkContext=...

valdata=...// an RDD[(K, V)] of any key value pairs

valfractions:Map[KDouble]=...// specify the exact fraction desired from each key

// Get an exact sample from each stratum

valapproxSample=data.sampleByKey(withReplacement=false,fractions)

valexactSample=data.sampleByKeyExact(withReplacement=false,fractions)

2.4 假設檢驗

假設檢驗在統計上用於判定統計結果又多大統計意義及統計結果有多大置信度Spark.mllib 暫支援Pearson’s chi-squared 檢驗檢驗結果的適用性和獨立性輸入資料需要驗證適用性和獨立性適用性檢驗需要輸入Vector , 獨立性需要資料Matrix 

Spark.mllib 支援輸入RDD[LabledPoint] ,使用chi-squared獨立性來決定特徵的選擇。

Statistics 提供方法執行Pearson’s chi-squared 檢驗下例用於假設檢驗

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.linalg._

importorg.apache.spark.mllib.regression.LabeledPoint

importorg.apache.spark.mllib.stat.Statistics._

val sc:SparkContext=...

val vec:Vector=...// a vector composed of the frequencies of events

// compute the goodness of fit. If a second vector to test against is not supplied as a parameter, 

// the test runs against a uniform distribution.  

val goodnessOfFitTestResult =Statistics.chiSqTest(vec)

println(goodnessOfFitTestResult)// summary of the test including the p-value, degrees of freedom, 

// test statistic, the method used, and the null hypothesis.

val mat:Matrix=...// a contingency matrix

// conduct Pearson's independence test on the input contingency matrix

val independenceTestResult =Statistics.chiSqTest(mat)

println(independenceTestResult)// summary of the test including the p-value, degrees of freedom...

val obs:RDD[LabeledPoint]=...// (feature, label) pairs.

// The contingency table is constructed from the raw (feature, label) pairs and used to conduct

// the independence test. Returns an array containing the ChiSquaredTestResult for every feature 

// against the label.

val featureTestResults:Array[ChiSqTestResult]=Statistics.chiSqTest(obs)

var i =1

featureTestResults.foreach { result =>

    println(s"Column $i:\n$result")

    i +=1

}// summary of the test

Statistics 提供1-sample, 2-sided Kolmogorov-Smirnov檢驗概率分佈是否相等。提供理論分佈名稱和理論分佈引數,或者根據已知理論分佈計算累計分佈函式,使用者可以檢驗樣本點是否出自來驗證概率分佈。在特殊例子中,如正態分佈,不用沒有提供正態分佈引數,則檢驗會使用標準正態分佈引數。

importorg.apache.spark.mllib.stat.Statistics

val data:RDD[Double]=...// an RDD of sample data

// run a KS test for the sample versus a standard normal distribution

val testResult =Statistics.kolmogorovSmirnovTest(data,"norm",0,1)

println(testResult)// summary of the test including the p-value, test statistic,

// and null hypothesis

// if our p-value indicates significance, we can reject the null hypothesis

// perform a KS test using a cumulative distribution function of our making

val myCDF:Double=>Double=...

val testResult2 =Statistics.kolmogorovSmirnovTest(data, myCDF)

2.4.1 流式顯著性測試

Spark.mllib 提供線上測試實現A/B線上測試。此測試需要在spark streaming DStream[(Boolean, Double)] 上使用每個流單元的第一個元素是邏輯真假假代表對照false),而真代表實驗組(true) , 第二個元素是觀測值

流式顯著性檢驗支援這兩個引數

peacePeriod  平穩週期), 預設最初啟動後可以忽略的資料組數。

windowSize (窗尺寸, 每次假設檢驗使用的資料批次數,若設為, 則累計處理之前所有批次。

StreamingTest 支援流式假設檢驗

val data = ssc.textFileStream(dataDir).map(line => line.split(",")match{

caseArray(label, value)=>BinarySample(label.toBoolean, value.toDouble)

})

val streamingTest =newStreamingTest()

.setPeacePeriod(0)

.setWindowSize(0)

.setTestMethod("welch")

val out = streamingTest.registerStream(data)

out.print()

完整例子程式碼見examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala

2.5 隨機數發生器

隨機數發生器在隨機演算法隨機模板和效能測試中很有用Spark.mllib 的隨機發生器RDD i.i.d. 隨機資料來自給定分佈:均勻分佈, 標準正態, Possion (泊松分佈)。

RandomRDDs 提供工廠方法來生成隨機雙精度浮點RDD 和 隨機向量RDD。下例生辰隨機雙精度浮點RDD, 這些隨機值來自標準正態分佈N(0,1), 做平移和伸縮後對映到N(1,4)

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.random.RandomRDDs._

val sc:SparkContext=...

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the

// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.

val u = normalRDD(sc,1000000L,10)

// Apply a transform to get a random double RDD following `N(1, 4)`.

val v = u.map(=>1.0+2.0* x)

2.6 核密度估計

核密度估計在經驗概率分佈圖中用處很大這種分佈圖不需要假設觀測值來自特定的某個分佈通過給定點集來計算隨機變數的概率密度函式。通過計算經驗分佈在特定點的PDF(偏導數),作為標準正態分佈在每個取樣點附近的PDF

KernelDensity 提供方法計算RDD取樣點集的核密度估計,見下例:

importorg.apache.spark.mllib.stat.KernelDensity

importorg.apache.spark.rdd.RDD

val data:RDD[Double]=...// an RDD of sample data

// Construct the density estimator with the sample data and a standard deviation for the Gaussian

// kernels

val kd =newKernelDensity()

.setSample(data)

.setBandwidth(3.0)

// Find density estimates for the given values

val densities = kd.estimate(Array(-1.0,2.0,5.0))