1. 程式人生 > >基於MLlib的機器學習

基於MLlib的機器學習

1. 簡介

MLlib 是Spark 中提供機器學習函式的庫。它是專為在叢集上並行執行的情況而設計的。MLlib 中包含許多機器學習演算法,可以在Spark 支援的所有程式語言中使用,由於Spark基於記憶體計算模型的優勢,非常適合機器學習中出現的多次迭代,避免了操作磁碟和網路的效能損耗。Spark 官網展示的 MLlib 與Hadoop效能對比圖就非常顯著。所以Spark比Hadoop的MapReduce框架更易於支援機器學習。

2. 資料型別

MLlib 包含一些特有的資料型別,它們位於org.apache.spark.mllib 包(Java/Scala)或
pyspark.mllib(Python)內。

Vectors

本地向量(Local Vector)的索引是從0開始的,並且是整型。而它的值為 Double 型別,儲存於單個機器內。 MLlib 支援兩種本地向量:稠密向量和稀疏向量。

稠密向量是用一個 Double 型別的陣列代表它的實體值,而稀疏向量是基於兩個並行陣列,即索引和值。舉個例子,向量 (1.0, 0.0, 3.0) 寫成稠密形式就是 [1.0, 0.0, 3.0],而寫成稀疏形式則是 (3, [0, 2], [1.0, 3.0]),後者的第一個 3 是指向量的大小。稀疏和稠密的界限沒有嚴格意義上的標準,通常需要依據具體的問題來決定。

本地向量的基類是 Vector 類,DenseVector 和 SparseVector 類是其兩種不同的實現。官方文件推薦大家使用 Vector 類中已實現的工廠方法來建立本地向量。

Scala環境下:

//建立稠密向量
scala> val denseVec1 = Vectors.dense(1.0,2.0,3.0)
denseVec1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

scala> val denseVec2 = Vectors.dense(Array(1.0,2.0,3.0))
denseVec2: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

//建立稀疏向量
scala> val sparseVec1 = Vectors.sparse(4
,Array(0,2),Array(1.0,2.0)) sparseVec1: org.apache.spark.mllib.linalg.Vector = (4,[0,2],[1.0,2.0])

python環境下:

>>> from pyspark.mllib.linalg import Vectors
>>> den = Vectors.dense([1.0,2.0,3.0])
>>> den
DenseVector([1.0, 2.0, 3.0])
>>> spa = Vectors.sparse(4,[0,2],[1.0,2.0])
>>> spa
SparseVector(4, {0: 1.0, 2: 2.0})

LabeledPoint

在諸如分類和迴歸這樣的監督式學習(supervised learning)演算法中,LabeledPoint 用來表示帶標籤的資料點。它包含一個特徵向量與一個標籤(由一個浮點數表示),位置在
mllib.regression包中。

Scala環境中:

// 首先需要引入標籤點相關的類
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// 建立一個帶有正面標籤和稠密特徵向量的標籤點。
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// 建立一個帶有負面標籤和稀疏特徵向量的標籤點。
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

注意其第一個引數為標籤,第二個引數為向量。標籤是用Double型別表示的。

Python環境中:

>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.linalg import Vectors
>>> pos = LabeledPoint(1.0,Vectors.dense([1.0,2.0,3.0]))
>>> neg = LabeledPoint(0.0,Vectors.dense([1.0,2.0,3.0]))

Matrix

稠密矩陣的實體值以列為主要次序的形式,存放於單個 Double 型陣列內。係數矩陣的非零實體以列為主要次序的形式,存放於壓縮稀疏列(Compressed Sparse Column, CSC)中。例如,下面這個稠密矩陣就是存放在一維陣列 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩陣的大小為 (3, 2) 。
本地矩陣的基類是 Matrix 類,在 Spark 中有其兩種實現,分別是 DenseMatrix 和 SparseMatrix 。官方文件中推薦使用 已在 Matrices 類中實現的工廠方法來建立本地矩陣。需要注意的是,MLlib 中的本地矩陣是列主序的(column-major)

import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// 建立稠密矩陣 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
/下列矩陣
    1.0 0.0 4.0

    0.0 3.0 5.0

    2.0 0.0 6.0
/
如果採用稀疏矩陣儲存的話,其儲存資訊包括:
實際儲存值: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`,
矩陣元素對應的行索引:rowIndices=[0, 2, 1, 0, 1, 2]`
列起始位置索引: `colPointers=[0, 2, 3, 6]`.

則生成稀疏矩陣的方式為:

scala> val sparseMatrix= Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1.0, 2.0, 3.0,4.0,5.0,6.0))
sparseMatrix: org.apache.spark.mllib.linalg.Matrix = 
3 x 3 CSCMatrix
(0,0) 1.0
(2,0) 2.0
(1,1) 3.0
(0,2) 4.0
(1,2) 5.0
(2,2) 6.0

3. 統計

不論是在即時的探索中,還是在機器學習的資料理解中,基本的統計都是資料分析的重要部分。MLlib 通過mllib.stat.Statistics 類中的方法提供了幾種廣泛使用的統計函式,這些函式可以直接在RDD 上使用。一些常用的函式如下所列。

Statistics.colStats(rdd)

計算由向量組成的RDD 的統計性綜述,儲存著向量集合中每列的最小值、最大值、平均值和方差。這可以用來在一次執行中獲取豐富的統計資訊。

Statistics.corr(rdd, method)

計算由向量組成的RDD 中的列間的相關矩陣,使用皮爾森相關(Pearson correlation)或斯皮爾曼相關(Spearman correlation)中的一種(method 必須是pearson 或spearman中的一個)。

Statistics.corr(rdd1, rdd2, method)

計算兩個由浮點值組成的RDD 的相關矩陣,使用皮爾森相關或斯皮爾曼相關中的一種(method 必須是pearson 或spearman 中的一個)。

Statistics.chiSqTest(rdd)

計算由LabeledPoint 物件組成的RDD 中每個特徵與標籤的皮爾森獨立性測試
(Pearson’s independence test) 結果。返回一個ChiSqTestResult 物件, 其中有p 值
(p-value)、測試統計及每個特徵的自由度。標籤和特徵值必須是分類的(即離散值)。構建測試資料,

下面舉個例子:使用三個學生的成績Vector來構建所需的RDD Vector,這個矩陣裡的每個Vector都代表一個學生在四門課程裡的分數:

Python環境下:

from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
//構建RDD
basicTestRDD = sc.parallelize([Vectors.dense([60, 70, 80, 0]),
                       Vectors.dense([80, 50, 0,  90]),
                       Vectors.dense([60, 70, 80,  0])])
//以檢視下summary裡的成員,這個物件中包含了大量的統計內容
>>> print summary.mean()
[ 66.66666667  63.33333333  53.33333333  30.        ]
>>> print summary.variance()
[  133.33333333   133.33333333  2133.33333333  2700.        ]
>>> print summary.numNonzeros()
[ 3.  3.  2.  1.]

Scala環境:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD


val array1: Array[Double] = Array[Double](60, 70, 80, 0)
val array2: Array[Double] = Array[Double](80, 50, 0, 90)
val array3: Array[Double] = Array[Double](60, 70, 80, 0)
val denseArray1 = Vectors.dense(array1)
val denseArray2 = Vectors.dense(array2)
val denseArray3 = Vectors.dense(array3)

val seqDenseArray: Seq[Vector] = Seq(denseArray1, denseArray2, denseArray3)

val basicTestRDD: RDD[Vector] = sc.parallelize[Vector](seqDenseArray)

val summary: MultivariateStatisticalSummary = Statistics.colStats(basicTestRDD)

4. K-means聚類演算法示例

Python環境下:

# 讀取資料檔案,建立RDD
dataFile = "/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/kmeans_data.txt"
lines = sc.textFile(dataFile)

# 建立Vector,將每行的資料用空格分隔後轉成浮點值返回numpy的array
data = lines.map(lambda line: np.array([float(x) for x in line.split(' ')]))

# 其中2是簇的個數
model = KMeans.train(data, 2)

print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))

5. SVM演算法示例

實驗的資料我們直接使用官方提供的資料
/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt

# 載入模組
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import SVMWithSGD

# 讀取資料
dataFile = '/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt'
data = MLUtils.loadLibSVMFile(sc, dataFile)

splits = data.randomSplit([0.8, 0.2], seed = 9L)
training = splits[0].cache()
test = splits[1]

# 列印分割後的資料量
print "TrainingCount:[%d]" % training.count();
print "TestingCount:[%d]" % test.count();

model = SVMWithSGD.train(training, 100)

scoreAndLabels = test.map(lambda point : (model.predict(point.features), point.label))

#輸出結果,包含預測的數字結果和0/1結果:
for score, label in scoreAndLabels.collect():
    print score, label

參考資料: