1. 程式人生 > >spark 1.6 MLlib

spark 1.6 MLlib

譯者續:本文會持續更新。

MLlib 是spark 機器學習的庫,它的目標是使機器學習演算法能更容易上手。這個庫包含通用學習演算法和工具集,包括:分類,迴歸聚類協同過濾降維以及深層優化策略和上層管道API(pipeline).

分為兩個包

1 spark.mllib 包含基於RDD的原始API

2 spark.ml 包含上層操作DataFrame 的API, 可以構造機器學習管道

推薦使用spark.ml 包,因為DataFrame API 在機器學習應用中更通用和靈活。但我們會持續支援spark.mllib 也配合spark.ml的開發。開發者可以提交新演算法到spark.ml 包,但使用者可以持續關注spark.mllib和使用spark.mllib中的特性。例如,特徵抽取和特徵變換。

一下列出機器學習包中主要的功能並講解細節

spark.mllib: data types, algorithms, and utilities

一 資料型別 – MLlib

MLlib支援單個節點的本地向量和本地指標,同時也支援基於RDDs的分散式指標集。本地向量和本地指標可看做資料模型的對外介面,而底層的線性代數操作有Breeze 和 jblas提供。監督學習中的訓練樣本在MLlib中稱為,“標籤點”(

本人註解即有類別資訊的樣本點資料

1.1本地向量

本地向量有兩個關鍵資料:0開始在索引和雙精度浮點型值。MLlib支援兩類本地向量緊緻向量和稀鬆向量。緊緻向量是一個雙精度浮點型向量元素組成的陣列,稀鬆向量是兩個同長度的資料,一個是非0向量指標陣列,另一個是非0向量元素陣列。如, 向量(1.0,0.0,3.0) 的緊緻向量為[1.0,0.0,3.0] ,而對應的稀鬆向量為(3, [0, 2], [1.0, 3.0])此處,3代表向量長度(本人註解:[0,2] 是向量中非0資料的指標集,[1.0,3.0] 是對應非0.0資料的值)

本地向量的基類是Vector,我們提供兩個實現:DenseVector 和SparseVector ,建議使用者使用Vectors的工廠方法建立本地向量。

Scala Vectors API

importorg.apache.spark.mllib.linalg.{Vector,Vectors}

// Create a dense vector (1.0, 0.0, 3.0).

val dv:Vector=Vectors.dense(1.0,0.0,3.0)

// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.

val sv1:Vector=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))

// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.

val sv2:Vector=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))

注意:

Scala預設import scala.collection.immutable.Vector , 在執行spark ML時,需要手動引入importorg.apache.spark.mllib.linalg.Vector.

1.2標籤點

標籤點是本地向量,可以使緊緻向量,也可以使稀鬆向量。在MLlib中標籤點用於監督學習演算法但是繫結雙精度浮點類別標籤後也可以應用於迴歸和分類演算法在兩類分類中類別標籤可選 0 或 1 , 對於多分類,類別標籤  從0 到(總類別數- 1)。

標籤類使用case classs LabeledPoint .

importorg.apache.spark.mllib.linalg.Vectors
importorg.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
valpos=LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))
// Create a labeled point with a negative label and a sparse feature vector.
valneg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))

1.2.1 稀鬆資料

實踐中經常會碰到需要訓練稀鬆資料集,MLLib支援從LIBSVN格式直接讀取訓練資料對於LIBSVN和LIBLINEAR的使用者對這種格式並不陌生這種格式是文字檔案每行是一個標籤點這個點標識一個稀鬆特徵向量

label index1:value1 index2:value2 ...

注意此處檔案中向量的索引是從1開始,載入到spark 後自動轉換為從0 開始。

importorg.apache.spark.mllib.regression.LabeledPoint
importorg.apache.spark.mllib.util.MLUtils
importorg.apache.spark.rdd.RDD
valexamples:RDD[LabeledPoint]=MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt")

1.3本地矩陣

本地矩陣是單個主機上的矩陣具有特性:整數的矩陣索引和(雙精度)浮點矩陣元素。MLLib支援緊緻矩陣矩陣元素按列優先儲存在陣列中,稀鬆矩陣,矩陣非0元素按列優先儲存在CSC格式(Compressed Sparse Column,壓縮稀鬆列,如下面緊緻矩陣

      

(3,2)的矩陣儲存在陣列中為[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]

本地矩陣的基類是Matrix , 同時提供兩種本地矩陣實現:DenseMatrix,和SparseMatrix 。 建議使用者使用Matrices 類的工廠方法建立本地矩陣。再次提醒,矩陣是按列優先的陣列儲存。

Scala Matrix

Matrices API :

importorg.apache.spark.mllib.linalg.{Matrix,Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
valdm:Matrix=Matrices.dense(3,2,Array(1.0,3.0,5.0,2.0,4.0,6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
valsm:Matrix=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))

1.4 分散式矩陣

分散式矩陣是分佈在一個或多個RDDs的矩陣具有特徵長整型矩陣索引雙精度浮點矩陣元素考慮到將分散式矩陣轉換為其他形式需要全域性shuffle, 這樣很消耗時間,因此有必要仔細斟酌選擇合適形式來儲存分散式大矩陣。暫時支援三種類型的分散式矩陣。

第一類是RowMatrix .RowMatrix 是面向行儲存的矩陣因此忽略行索引例如特徵向量。這種矩陣每一行是一個本地向量(RDD)。假設每行的資料並不多,這樣本地矩陣可以在單節點的driver間自由通訊,也可以在單節點上儲存和操作。

第二類是IndexedRowMatrix ,它比RowMatrix多了行索引,這個行索引可以標記行並用於關聯操作。

注意

分散式矩陣的RDD的行和列在cache時必須是確定的,否則會出錯。

1.4.1 RowMatrix

因為每行是一個本地向量因此矩陣的列數限制在integer的範圍,在實際中不建議太大。

importorg.apache.spark.mllib.linalg.Vector

importorg.apache.spark.mllib.linalg.distributed.RowMatrix

val rows:RDD[Vector]=...// an RDD of local vectors

// Create a RowMatrix from an RDD[Vector].

val mat:RowMatrix=newRowMatrix(rows)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// QR decomposition

val qrResult= mat.tallSkinnyQR(true)

1.4.2 IndexedRowMatrix

IndexedRowMatrix 可由RDD[IndexedRow] 例項建立,此處IndexedRow 封裝為(Long, Vector) . IndexedRowMatrix 去掉行索引就變成了RowMatrix

importorg.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}

val rows:RDD[IndexedRow]=...// an RDD of indexed rows

// Create an IndexedRowMatrix from an RDD[IndexedRow].

val mat:IndexedRowMatrix=newIndexedRowMatrix(rows)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// Drop its row indices.

val rowMat:RowMatrix= mat.toRowMatrix()

1.4.3 CoordinateMatrix ( 調和矩陣)

Coordinatematrix 是分散式矩陣,所有元素做成的RDD物件。其中Tuple3 形如( i : Long , j : Long, value : Double ) ,此處i 是行索引, j 是列索引, value 是元素的值。CoordinateMatrix 只在當矩陣行和列都很大時,同時矩陣非0 元素很稀鬆。

CoordinateMatrix 可以從RDD[MatrixEntry]例項建立,此處MatrixEntry 封裝為(Long , Long, Double )。 CoordinateMatrix 呼叫toIndexeedRowMatrix 方法可以將CoordinateMatrix 矩陣轉化為IndexedRowMatrix 矩陣,其他coordinateMatrix 的計算暫時還不支援。

importorg.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...// an RDD of matrix entries

// Create a CoordinateMatrix from an RDD[MatrixEntry].

val mat:CoordinateMatrix=newCoordinateMatrix(entries)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.

val indexedRowMatrix= mat.toIndexedRowMatrix()

1.4.4 BlockMatrix (分塊矩陣)

BlockMatrix是分散式矩陣RDD[MarixBlock],此處MatrixBlock是元組((Int, Int) , Matrix ), 其中(Int, Int) 是矩陣塊的索引, Matrix 是給定矩陣塊索引的子矩陣,矩陣維度(是陣列的長度)rowsPerBlock*colsPerBlockBlockMatrix矩陣支援add 和 multiply 方法和另一個同維度的BlockMatrix 計算。Helper函式 validate可以校驗 BlockMatrix 是否設定正確。

BlockMatrix 矩陣可以有IndexedRowMatrix 或 CoordinateMatrix  呼叫toBlockMatrix 方法得到, toBlockMatrix 方法預設建立 1024 * 1024 的塊矩陣使用者可以呼叫介面 toBlockMatrix(rowsPerBlock , colsPerBlock ) 修改矩陣維度。

importorg.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...// an RDD of (i, j, v) matrix entries

// Create a CoordinateMatrix from an RDD[MatrixEntry].

val coordMat:CoordinateMatrix=newCoordinateMatrix(entries)

// Transform the CoordinateMatrix to a BlockMatrix

val matA:BlockMatrix= coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.

// Nothing happens if it is valid.

matA.validate()

// Calculate A^T A.

val ata= matA.transpose.multiply(matA)

2 基本統計 spark.mllib

2.1 統計概覽

Statistics類中提供基本列統計RDD[Vector]功能

colStats()返回MultivariateStatisticalSummary 的例項,這個例項可以按列計算最大,最小,均值,方差,非0個數統計,列的1範數。

importorg.apache.spark.mllib.linalg.Vector

importorg.apache.spark.mllib.stat.{MultivariateStatisticalSummary,Statistics}

val observations:RDD[Vector]=...// an RDD of Vectors

// Compute column summary statistics.

val summary:MultivariateStatisticalSummary=Statistics.colStats(observations)

println(summary.mean)// a dense vector containing the mean value for each column

println(summary.variance)// column-wise variance

println(summary.numNonzeros)// number of nonzeros in each column

2.2 相關統計

計算兩個資料序列可以使向量或矩陣)的相關係數。在spark.mllib中,我們提供成對計算相關係數,實現了Pearson’s相關和Spearman’s相關相關統計的結果依賴於計算物件如果是兩個RDD[Double]的計算,結果是Double型別,如果是兩個RDD[Vector]計算,結果是一個Matrix矩陣。

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.linalg._

importorg.apache.spark.mllib.stat.Statistics

val sc:SparkContext=...

val seriesX:RDD[Double]=...// a series

val seriesY:RDD[Double]=...// must have the same number of partitions and cardinality as seriesX

// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a 

// method is not specified, Pearson's method will be used by default. 

val correlation:Double=Statistics.corr(seriesX, seriesY,"pearson")

val data:RDD[Vector]=...// note that each Vector is a row and not a column

// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.

// If a method is not specified, Pearson's method will be used by default. 

val correlMatrix:Matrix=Statistics.corr(data,"pearson")

2.3 分層取樣(Stratified sampling) 

spark.mllib中提供計算原始RDD 鍵值對的分層取樣方法:sampleByKey 和 sampleByKeyExact 。在分層取樣中,鍵可以看做標籤類,相應的值可以看做屬性。如,鍵可以使男人或女人,文件ID,相應的值可以使人的年齡或文件的單次。 sampleByKey 方法隨機取樣一系列觀測值,過程就像逐個遍歷所有樣本點,通過拋銀幣決定取捨,因此只需要確定取樣點個數。sampleByKeyExact 比分層隨機取樣方法sampleByKey需要更多地樣本,才能保證取樣點個數有99.99%的置信度,sampleByKeyExact暫不支援python.

sampleByKeyExact() 取樣由[ f_k , n_k ] 完全決定, 對任意一個鍵屬於 K 鍵集合f_k是預期鍵對應取樣點值得佔比分數),n_k 是這個鍵k在整個集合中值的個數。無放回取樣(即取樣的資料取走,不會出現重複) 方法需要一個引數(withReplacement預設是false , 而又放回取樣方法需要兩個引數

importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._

importorg.apache.spark.rdd.PairRDDFunctions

valsc:SparkContext=...

valdata=...// an RDD[(K, V)] of any key value pairs

valfractions:Map[KDouble]=...// specify the exact fraction desired from each key

// Get an exact sample from each stratum

valapproxSample=data.sampleByKey(withReplacement=false,fractions)

valexactSample=data.sampleByKeyExact(withReplacement=false,fractions)

2.4 假設檢驗

假設檢驗在統計上用於判定統計結果又多大統計意義及統計結果有多大置信度Spark.mllib 暫支援Pearson’s chi-squared 檢驗檢驗結果的適用性和獨立性輸入資料需要驗證適用性和獨立性適用性檢驗需要輸入Vector , 獨立性需要資料Matrix 

Spark.mllib 支援輸入RDD[LabledPoint] ,使用chi-squared獨立性來決定特徵的選擇。

Statistics 提供方法執行Pearson’s chi-squared 檢驗下例用於假設檢驗

importorg.apache.spark.SparkContext

importorg.apache.spark.mllib.linalg._

importorg.apache.spark.mllib.regression.LabeledPoint

importorg.apache.spark.mllib.stat.Statistics._

val sc:SparkContext=...

val vec:Vector=...// a vector composed of the frequencies of events

// compute the goodness of fit. If a second vector to test against is not supplied as a parameter, 

// the test runs against a uniform distribution.  

val goodnessOfFitTestResult =Statistics.chiSqTest(vec)

println(goodnessOfFitTestResult)// summary of the test including the p-value, degrees of freedom, 

// test statistic, the method used, and the null hypothesis.

val mat:Matrix=...// a contingency matrix

// conduct Pearson's independence test on the input contingency matrix

val independenceTestResult =Statistics.chiSqTest(mat)

println(independenceTestResult)// summary of the test including the p-value, degrees of freedom...

val obs:RDD[LabeledPoint]=...// (feature, label) pairs.

// The contingency table is constructed from the raw (feature, label) pairs and used to conduct

// the independence test. Returns an array containing the ChiSquaredTestResult for every feature 

// against the label.

val featureTestResults:Array[ChiSqTestResult]=Statistics.chiSqTest(obs)

var i =1

featureTestResults.foreach { result =>

    println(s"Column $i:\n$result")

    i +=1

}// summary of the test

Statistics 提供1-sample, 2-sided Kolmogorov-Smirnov檢驗概率分佈是否相等。提供理論分佈名稱和理論分佈引數,或者根據已知理論分佈計算累計分佈函式,使用者可以檢驗樣本點是否出自來驗證概率分佈。在特殊例子中,如正態分佈,不用沒有提供正態分佈引數,則檢驗會使用標準正態分佈引數。

importorg.apache.spark.mllib.stat.Statistics

val data:RDD[Double]=...// an RDD of sample data

// run a KS test for the sample versus a standard normal distribution

val testResult =Statistics.kolmogorovSmirnovTest(data,"norm",0,1)

println(testResult)// summary of the test including the p-value, test statistic,

// and null hypothesis

// if our p-value indicates significance, we can reject the null hypothesis

// perform a KS test using a cumulative distribution function of our making

val myCDF:Double=>Double=...

相關推薦

spark 1.6 MLlib

譯者續:本文會持續更新。 MLlib 是spark 機器學習的庫,它的目標是使機器學習演算法能更容易上手。這個庫包含通用學習演算法和工具集,包括:分類,迴歸,聚類,協同過濾,降維,以及深層優化策略和上層管道API(pipeline). 分為兩個包: 1 sp

Spark 1.6.3 thriftServer 支援 ldap 配置

hive-site.xml配置 [[email protected] conf]$ cat hive-site.xml <?xml version="1.0" encoding="UTF-8"?> <!--Autogenerated by

事無鉅細 Spark 1.6.1 叢集環境搭建

還是在之前的Hadoop叢集環境上繼續搭建Spark-1.6.1環境 下載安裝 下載Spark並解壓 wget http://mirrors.cnnic.cn/apache/spark/spark-1.6.1/spark-1.6.1-b

編譯打包spark-1.6.0-cdh5.11.0-src詳細過程及問題記錄

簡介 要深入學習spark,閱讀原始碼,修改原始碼,學會自己編譯打包spark是必須邁過的一道坎。折騰了兩天,先後編譯打包了spark-1.6.0-cdh5.11.0-src,spark-1.6.0-cdh5.13.0-src版本,現在記錄過程,及遇到的問題如下。 環境

Spark-1.6.0之Application執行資訊記錄器JobProgressListener

  JobProgressListener類是Spark的ListenerBus中一個很重要的監聽器,可以用於記錄Spark任務的Job和Stage等資訊,比如在Spark UI頁面上Job和Stage執行狀況以及執行進度的顯示等資料,就是從JobProgres

Spark standalone模式的安裝(spark-1.6.1-bin-hadoop2.6.tgz)(master、slave1和slave2)

 前期部落格 開篇要明白   (1)spark-env.sh 是環境變數配置檔案   (2)spark-defaults.conf   (3)slaves 是從節點機器配置檔案   (4)metrics.properties 是 監控   (5)log4j.

Spark on YARN模式的安裝(spark-1.6.1-bin-hadoop2.6.tgz + hadoop-2.6.0.tar.gz)(master、slave1和slave2)(博主推薦)

說白了   Spark on YARN模式的安裝,它是非常的簡單,只需要下載編譯好Spark安裝包,在一臺帶有Hadoop YARN客戶端的的機器上執行即可。    Spark on YARN分為兩種: YARN cluster(YARN standalone,0.9版本以前)和 YA

hadoop-2.6.0.tar.gz + spark-1.6.1-bin-hadoop2.6.tgz的叢集搭建(單節點)(CentOS系統)

前言 關於幾個疑問和幾處心得! a.用NAT,還是橋接,還是only-host模式? b.用static的ip,還是dhcp的? 答:static c.別認為快照和克隆不重要,小技巧,比別人靈活用,會很節省時間和大大減少錯誤。 d.重用起來指令碼語言

Centos 6.5 x64環境下 spark 1.6 maven 編譯-- 已驗證

Centos 6.5 x64  jdk 1.7 scala 2.10 maven 3.3.3 cd spark-1.6 export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" mvn -Dhado

Spark-1.6.0中的Sort Based Shuffle原始碼解讀

  從Spark-1.2.0開始,Spark的Shuffle由Hash Based Shuffle升級成了Sort Based Shuffle。即Spark.shuffle.manager從Hash換成了Sort。不同形式的Shuffle邏輯主要是Shuffle

"Spark 1.6 + Alluxio 1.2 HA + OFF_HEAP" 的配置

2、將alluxio-core-client-spark-1.2.0-jar-with-dependencies.jar、 spark-alluxio-blockstore.jar 放到所有Spark節點的lib目錄下。並在 conf/spark-env.s

Spark 1.6.2 單機版安裝配置

本文將介紹Apache Spark 1.6.2在單機的部署,與在叢集中部署的步驟基本一致,只是少了一些master和slave檔案的配置。直接安裝scala與Spark就可以在單機使用,但如果用到hdfs系統的話hadoop和jdk也要配置,建議全部安裝配置好。

Spark 1.6.1 單機安裝配置

本文將介紹Apache Spark 1.6.1在單機的部署,與在叢集中部署的步驟基本一致,只是少了一些master和slave檔案的配置。http://blog.csdn.net/u011513853/article/details/52865076     Spark在Wi

Apache Spark 1.6.1 學習教程

這篇部落格主要是利用Titanic dataset來簡單演示pyspark 1.6.1的使用方法。 這組資料比較小,訓練資料只有891行,訓練、測試資料可以在這裡下載(train.csv, test.csv)。 內容 資料載入和轉化 資料清理 特徵提取

spark 1.6.0 core原始碼分析7 Spark executor的執行

原始碼位置:org.apache.spark.executor.CoarseGrainedExecutorBackend private def run( driverUrl: String, executorId: String, h

為什麼spark 1.6之後使用Netty來替代Akka通訊庫?

Akka的底層是使用Netty,儘管Akka能簡化訊息通訊的使用,但使用Akka要求message傳送端和接收端有相同的版本(例如spark streaming的receiver接收上游訊息要求上游的actor有相同的Akka版本) 由於spark對Akka的

spark 1.6.0 core原始碼分析9 從簡單例子看action

這一節以reduce為例講解action操作 首先看submitJob方法,它將我們reduce中寫的處理函式隨JobSubmitted訊息傳遞出去,因為每個分割槽都需要呼叫它進行計算;而resultHandler是指最後合併的方法,在每個task完成後,需要呼叫resul

Spark 1.6 (Java) 問題彙總

一、通過SparkSQL讀取Oracle時報找不到Oracle JDBC包 (java.lang.ClassNotFoundException:oracle.jdbc.driver.OracleDri

Spark MLlib 1.6 -- 頻度模式挖掘

挖掘頻繁關聯物品,頻繁關聯物品集,頻繁關聯子序列,或其它子結構是分析海量資料的第一步,並且連續幾年作為資料探勘主要研究方向。此處引用維基中關聯規則學習()作為本章節基礎。spark.mllib 提供並行FP-growth演算法,這個演算法經常用於挖掘頻度物品集。 8.1

Spark MLlib 1.6 -- 統計基礎篇

2.1 統計概覽 在Statistics類中提供基本列統計RDD[Vector]功能 colStats()返回MultivariateStatisticalSummary 的例項,這個例項可以按列計算最大,最小,均值,方差,非0個數統計,列的1範數。 imp