Spark MLlib 1.6 -- 統計基礎篇
2.1 統計概覽
在Statistics類中提供基本列統計RDD[Vector]功能
colStats()返回MultivariateStatisticalSummary 的例項,這個例項可以按列計算最大,最小,均值,方差,非0個數統計,列的1範數。
importorg.apache.spark.mllib.linalg.Vector
importorg.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}
val observations:RDD[Vector]=...// an RDD of Vectors
// Compute column summary statistics.
val summary:MultivariateStatisticalSummary=Statistics.colStats(observations)
println(summary.mean)// a dense vector containing the mean value for each column
println(summary.variance)// column-wise variance
println(summary.numNonzeros)// number of nonzeros in each column
2.2 相關統計
計算兩個資料序列(可以使向量或矩陣)的相關係數。在spark.mllib
importorg.apache.spark.SparkContext
importorg.apache.spark.mllib.linalg._
importorg.apache.spark.mllib.stat.Statistics
val sc:SparkContext=...
val seriesX:RDD[Double]=...// a series
val seriesY:RDD[Double]=...// must have the same number of partitions and cardinality as seriesX
// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a
// method is not specified, Pearson's method will be used by default.
val correlation:Double=Statistics.corr(seriesX, seriesY,"pearson")
val data:RDD[Vector]=...// note that each Vector is a row and not a column
// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default.
val correlMatrix:Matrix=Statistics.corr(data,"pearson")
2.3 分層取樣(Stratified sampling)
在spark.mllib中提供計算原始RDD 鍵值對的分層取樣方法:sampleByKey 和 sampleByKeyExact 。在分層取樣中,鍵可以看做標籤類,相應的值可以看做屬性。如,鍵可以使男人或女人,文件ID,相應的值可以使人的年齡或文件的單次。 sampleByKey 方法隨機取樣一系列觀測值,過程就像逐個遍歷所有樣本點,通過拋銀幣決定取捨,因此只需要確定取樣點個數。sampleByKeyExact 比分層隨機取樣方法sampleByKey需要更多地樣本,才能保證取樣點個數有99.99%的置信度,sampleByKeyExact暫不支援python.
sampleByKeyExact() 取樣由[ f_k , n_k ] 完全決定, 對任意一個鍵k 屬於 K 鍵集合,f_k是預期鍵對應取樣點值得佔比(分數),n_k 是這個鍵k在整個集合中值的個數。無放回取樣(即取樣的資料取走,不會出現重複) 方法需要一個引數(withReplacement預設是false) , 而又放回取樣方法需要兩個引數。
importorg.apache.spark.SparkContext
importorg.apache.spark.SparkContext._
importorg.apache.spark.rdd.PairRDDFunctions
valsc:SparkContext=...
valdata=...// an RDD[(K, V)] of any key value pairs
valfractions:Map[K, Double]=...// specify the exact fraction desired from each key
// Get an exact sample from each stratum
valapproxSample=data.sampleByKey(withReplacement=false,fractions)
valexactSample=data.sampleByKeyExact(withReplacement=false,fractions)
2.4 假設檢驗
假設檢驗在統計上用於判定統計結果又多大統計意義,及統計結果有多大置信度。Spark.mllib 暫支援Pearson’s chi-squared 檢驗,檢驗結果的適用性和獨立性。輸入資料需要驗證適用性和獨立性。適用性檢驗需要輸入Vector , 獨立性需要資料Matrix 。
Spark.mllib 支援輸入RDD[LabledPoint] ,使用chi-squared獨立性來決定特徵的選擇。
Statistics 提供方法執行Pearson’s chi-squared 檢驗,下例用於假設檢驗。
importorg.apache.spark.SparkContext
importorg.apache.spark.mllib.linalg._
importorg.apache.spark.mllib.regression.LabeledPoint
importorg.apache.spark.mllib.stat.Statistics._
val sc:SparkContext=...
val vec:Vector=...// a vector composed of the frequencies of events
// compute the goodness of fit. If a second vector to test against is not supplied as a parameter,
// the test runs against a uniform distribution.
val goodnessOfFitTestResult =Statistics.chiSqTest(vec)
println(goodnessOfFitTestResult)// summary of the test including the p-value, degrees of freedom,
// test statistic, the method used, and the null hypothesis.
val mat:Matrix=...// a contingency matrix
// conduct Pearson's independence test on the input contingency matrix
val independenceTestResult =Statistics.chiSqTest(mat)
println(independenceTestResult)// summary of the test including the p-value, degrees of freedom...
val obs:RDD[LabeledPoint]=...// (feature, label) pairs.
// The contingency table is constructed from the raw (feature, label) pairs and used to conduct
// the independence test. Returns an array containing the ChiSquaredTestResult for every feature
// against the label.
val featureTestResults:Array[ChiSqTestResult]=Statistics.chiSqTest(obs)
var i =1
featureTestResults.foreach { result =>
println(s"Column $i:\n$result")
i +=1
}// summary of the test
Statistics 提供1-sample, 2-sided Kolmogorov-Smirnov檢驗概率分佈是否相等。提供理論分佈名稱和理論分佈引數,或者根據已知理論分佈計算累計分佈函式,使用者可以檢驗樣本點是否出自來驗證概率分佈。在特殊例子中,如正態分佈,不用沒有提供正態分佈引數,則檢驗會使用標準正態分佈引數。
importorg.apache.spark.mllib.stat.Statistics
val data:RDD[Double]=...// an RDD of sample data
// run a KS test for the sample versus a standard normal distribution
val testResult =Statistics.kolmogorovSmirnovTest(data,"norm",0,1)
println(testResult)// summary of the test including the p-value, test statistic,
// and null hypothesis
// if our p-value indicates significance, we can reject the null hypothesis
// perform a KS test using a cumulative distribution function of our making
val myCDF:Double=>Double=...
val testResult2 =Statistics.kolmogorovSmirnovTest(data, myCDF)
2.4.1 流式顯著性測試
Spark.mllib 提供線上測試實現,如A/B線上測試。此測試需要在spark streaming DStream[(Boolean, Double)] 上使用,每個流單元的第一個元素是邏輯真假,假代表對照組(false),而真代表實驗組(true) , 第二個元素是觀測值。
流式顯著性檢驗支援這兩個引數:
1 peacePeriod (平穩週期), 預設最初啟動後可以忽略的資料組數。
2 windowSize (窗尺寸) , 每次假設檢驗使用的資料批次數,若設為0 , 則累計處理之前所有批次。
StreamingTest 支援流式假設檢驗。
val data = ssc.textFileStream(dataDir).map(line => line.split(",")match{
caseArray(label, value)=>BinarySample(label.toBoolean, value.toDouble)
})
val streamingTest =newStreamingTest()
.setPeacePeriod(0)
.setWindowSize(0)
.setTestMethod("welch")
val out = streamingTest.registerStream(data)
out.print()
完整例子程式碼見:examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala
2.5 隨機數發生器
隨機數發生器在隨機演算法,隨機模板和效能測試中很有用。Spark.mllib 的隨機發生器RDD 帶i.i.d. 隨機資料來自給定分佈:均勻分佈, 標準正態, Possion (泊松分佈)。
RandomRDDs 提供工廠方法來生成隨機雙精度浮點RDD 和 隨機向量RDD。下例生辰隨機雙精度浮點RDD, 這些隨機值來自標準正態分佈N(0,1), 做平移和伸縮後對映到N(1,4)。
importorg.apache.spark.SparkContext
importorg.apache.spark.mllib.random.RandomRDDs._
val sc:SparkContext=...
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc,1000000L,10)
// Apply a transform to get a random double RDD following `N(1, 4)`.
val v = u.map(x =>1.0+2.0* x)
2.6 核密度估計
核密度估計在經驗概率分佈圖中用處很大,這種分佈圖不需要假設觀測值來自特定的某個分佈。通過給定點集,來計算隨機變數的概率密度函式。通過計算經驗分佈在特定點的PDF(偏導數),作為標準正態分佈在每個取樣點附近的PDF。
KernelDensity 提供方法計算RDD取樣點集的核密度估計,見下例:
importorg.apache.spark.mllib.stat.KernelDensity
importorg.apache.spark.rdd.RDD
val data:RDD[Double]=...// an RDD of sample data
// Construct the density estimator with the sample data and a standard deviation for the Gaussian
// kernels
val kd =newKernelDensity()
.setSample(data)
.setBandwidth(3.0)
// Find density estimates for the given values
val densities = kd.estimate(Array(-1.0,2.0,5.0))