14.spark mllib之快速入門
??MLlib是Spark提供提供機器學習的庫,專為在集群上並行運行的情況而設計。
MLlib包含很多機器學習算法,可在Spark支持的所有編程語言中使用。
??MLlib設計理念是將數據以RDD的形式表示,然後在分布式數據集上調用各種算法。其實,MLlib就是RDD上一系列可供調用的函數的集合。
數據類型
??MLlib包含一些特有的數據類型,位於org.apache.spark.mllib包(Java/Scala)或pyspark.mllib(Python)中。主要的幾個類有:
-
Vector
-
一個本地向量(Local Vector)。索引是從0開始的,並且是整型。而值為 Double 類型,存儲於單個機器內。
-
MLlib既支持稠密向量也支持稀疏向量,前者表示向量的每一位都存儲,後者只存儲非零位以節約空間。
-
向量可以通過mllib.linalg.Vectors類創建
- scala
//創建稠密向量 scala> val denseVec1 = Vectors.dense(1.0,2.0,3.0) denseVec1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala> val denseVec2 = Vectors.dense(Array(1.0,2.0,3.0)) denseVec2: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] //創建稀疏向量 scala> val sparseVec1 = Vectors.sparse(4,Array(0,2),Array(1.0,2.0)) sparseVec1: org.apache.spark.mllib.linalg.Vector = (4,[0,2],[1.0,2.0])
- python
>>> from pyspark.mllib.linalg import Vectors >>> den = Vectors.dense([1.0,2.0,3.0]) >>> den DenseVector([1.0, 2.0, 3.0]) >>> spa = Vectors.sparse(4,[0,2],[1.0,2.0]) >>> spa SparseVector(4, {0: 1.0, 2: 2.0})
-
-
LabeledPoint
-
在分類和回歸之類的監督式學習(supervised learning)算法中使用。
-
LabeledPoint表示帶標簽的數據點,包括一個特征向量與一個標簽(由一個浮點數表示)。
-
位於mllib.regression包中
- scala
// 首先需要引入標簽點相關的類 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // 創建一個帶有正面標簽和稠密特征向量的標簽點。 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // 創建一個帶有負面標簽和稀疏特征向量的標簽點。 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
- python
>>> from pyspark.mllib.regression import LabeledPoint >>> from pyspark.mllib.linalg import Vectors >>> pos = LabeledPoint(1.0,Vectors.dense([1.0,2.0,3.0])) >>> neg = LabeledPoint(0.0,Vectors.dense([1.0,2.0,3.0]))
-
-
Matrix
-
矩陣分為稠密矩陣和稀疏矩陣
-
稠密矩陣的實體值以列為主要次序的形式,存放於單個 Double 型數組內。系數矩陣的非零實體以列為主要次序的形式,存放於壓縮稀疏列(Compressed Sparse Column, CSC)中。例如,下面這個稠密矩陣就是存放在一維數組 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩陣的大小為 (3, 2) 。
-
本地矩陣的基類是 Matrix 類,在 Spark 中有其兩種實現,分別是 DenseMatrix 和 SparseMatrix 。官方文檔中推薦使用 已在 Matrices 類中實現的工廠方法來創建本地矩陣。需要註意的是,MLlib 中的本地矩陣是列主序的(column-major)
-
稠密矩陣
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // 創建稠密矩陣 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
- 稀疏矩陣
scala> val sparseMatrix= Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1.0, 2.0, 3.0,4.0,5.0,6.0)) sparseMatrix: org.apache.spark.mllib.linalg.Matrix = 3 x 3 CSCMatrix (0,0) 1.0 (2,0) 2.0 (1,1) 3.0 (0,2) 4.0 (1,2) 5.0 (2,2) 6.0
-
-
Rating
-
用於產品推薦
-
表示用戶對一個產品的評分
- 位於mllib.recommendation包中
-
-
各種Model類(模型)
-
每個Model都是訓練算法的結果
- 模型一般都有一個predict()方法,使用該模型對新的數據點或數據點組成的RDD進行預測。
-
統計
??不論是在即時的探索中,還是在機器學習的數據理解中,基本的統計都是數據分析的重要部分。MLlib 通過mllib.stat.Statistics 類中的方法提供了幾種廣泛使用的統計函數,這些函數可以直接在RDD 上使用。一些常用的函數如下所列。
Statistics.colStats(rdd)
??計算由向量組成的RDD 的匯總統計,保存著向量集合中每列的最小值、最大值、平均值和方差。這可以用來在一次執行中獲取豐富的統計信息。
Statistics.corr(rdd, method)
?&esmp;計算由向量組成的RDD 中的列間的相關矩陣,使用皮爾森相關(Pearson correlation)或斯皮爾曼相關(Spearman correlation)中的一種(method 必須是pearson 或spearman中的一個)。
Statistics.corr(rdd1, rdd2, method)
??計算兩個由浮點值組成的RDD 的相關矩陣,使用皮爾森相關或斯皮爾曼相關中的一種(method 必須是pearson 或spearman 中的一個)。
Statistics.chiSqTest(rdd)
??計算由LabeledPoint 對象組成的RDD 中每個特征與標簽的皮爾森獨立性測試
(Pearson’s independence test) 結果。返回一個ChiSqTestResult 對象, 其中有p 值、(p-value)、測試統計及每個特征的自由度。標簽和特征值必須是分類的(即離散值)。
??下面舉個例子:使用三個學生的成績Vector來構建所需的RDD Vector,這個矩陣裏的每個Vector都代表一個學生在四門課程裏的分數:
python
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
//構建RDD
basicTestRDD = sc.parallelize([Vectors.dense([60, 70, 80, 0]),
Vectors.dense([80, 50, 0, 90]),
Vectors.dense([60, 70, 80, 0])])
//查看summary裏的成員,這個對象中包含了大量的統計內容
>>> print summary.mean()
[ 66.66666667 63.33333333 53.33333333 30. ]
>>> print summary.variance()
[ 133.33333333 133.33333333 2133.33333333 2700. ]
>>> print summary.numNonzeros()
[ 3. 3. 2. 1.]
scala
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
val array1: Array[Double] = Array[Double](60, 70, 80, 0)
val array2: Array[Double] = Array[Double](80, 50, 0, 90)
val array3: Array[Double] = Array[Double](60, 70, 80, 0)
val denseArray1 = Vectors.dense(array1)
val denseArray2 = Vectors.dense(array2)
val denseArray3 = Vectors.dense(array3)
val seqDenseArray: Seq[Vector] = Seq(denseArray1, denseArray2, denseArray3)
val basicTestRDD: RDD[Vector] = sc.parallelize[Vector](seqDenseArray)
val summary: MultivariateStatisticalSummary = Statistics.colStats(basicTestRDD)
算法
特征提取
- TF-IDF(詞頻——逆文檔頻率)是用來從文本文檔(例如網頁)中生成特定向量的簡單方法。
- 縮放,大多數要考慮特征向量中各元素的幅值,並且在特征縮放調整為平等對待時表現最好。
- 規化,在準備輸入數據時,把向量正規化為長度1。使用Normalizer類可以實現。
- Word2Vec是一個基於神經網絡的文本特征算法,可以用來將數據傳給許多下遊算法。
降維
-
主成分分析(PCA)
- PCA會把特征映射到低位空間,讓數據在低維空間表示的方差最大化,從而忽略一些無用的維度。
- 要計算這種映射,我們要構建出正規化的相關矩陣,並使用這個矩陣的奇異向量和奇異值。
- 最大的一部分奇異值相對應的奇異向量可以用來重建原始數據的主要成分。
- 奇異值分解
- MLlib也提供底層的奇異值分解(簡稱SVD)原語。
分類與回歸
- 分類與回歸是監督學習的兩種形式。
- 監督學習是指算法嘗試使用有標簽的訓練數據根據對象的特征預測結果。
- 在分類中,預測出的變量是離散的。
- 在回歸中,預測出的變量是連續的。
- MLlib中包含許多分類與回歸算法:如簡單的線性算法以及決策樹和森林算法。
聚類
- 聚類算法是一種無監督學習任務,用於將對象分到具有高度相似性的聚類中。
- 聚類算法主要用於數據探索(查看一個新數據集是什麽樣子)以及異常檢測(識別與任意聚類都相聚較遠的點)。
- MLlib中包含兩個聚類中流行的K-means算法,以及一個叫做K-means||的變種,可以提供為並行環境提供更好的初始化策略。
協同過濾與推薦
- 協同過濾是一種根據用戶對各種產品的交互與評分來推薦新產品的推薦系統技術。
- 交替最小二乘(ALS),會為每個用戶和產品都設一個特征向量,這樣用戶向量和產品向量的點積就接近於他們的得分。
實例
使用邏輯回歸算法實現垃圾郵件分類處理
def testLogisticRegressionWithSGD = {
val spam = sc.textFile("src/main/resources/mllib/spam.txt", 1)
val normal = sc.textFile("src/main/resources/mllib/normal.txt", 1)
//創建一個HashingTF實例來把郵件文本映射為包含一個10000個特征的向量
val tf = new HashingTF(numFeatures = 10000)
//各郵件都被切分為單詞,每個單詞被映射為一個特征
val spamFeatures = spam.map { email => tf.transform(email.split(" ")) }
val normalFeatures = normal.map { email => tf.transform(email.split(" ")) }
//創建LabeledPoint數據集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子
val positiveExamples = spamFeatures.map { features => LabeledPoint(1, features) }
val negativeExamples = normalFeatures.map { features => LabeledPoint(0, features) }
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache()
println(trainingData.toDebugString)
//使用SGD算法運行邏輯回歸
val model = new LogisticRegressionWithSGD().run(trainingData)
//以陽性(垃圾郵件)和陰性(正常郵件)的例子分別進行測試
val posTest = tf.transform("O M G get cheap stuff by sending money to .".split(" "))
val negTest = tf.transform("hello, i started studying Spark ".split(" "))
println(s"prediction for positive tset example: ${model.predict(posTest)}")
println(s"prediction for negitive tset example: ${model.predict(negTest)}")
Thread.sleep(Int.MaxValue)
}
svm分類算法
# 加載模塊
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import SVMWithSGD
# 讀取數據
dataFile = ‘/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt‘
data = MLUtils.loadLibSVMFile(sc, dataFile)
splits = data.randomSplit([0.8, 0.2], seed = 9L)
training = splits[0].cache()
test = splits[1]
# 打印分割後的數據量
print "TrainingCount:[%d]" % training.count();
print "TestingCount:[%d]" % test.count();
model = SVMWithSGD.train(training, 100)
scoreAndLabels = test.map(lambda point : (model.predict(point.features), point.label))
#輸出結果,包含預測的數字結果和0/1結果:
for score, label in scoreAndLabels.collect():
print score, label
k-means聚類算法
# 讀取數據文件,創建RDD
dataFile = "/opt/spark-1.6.1-bin-hadoop2.6/data/mllib/kmeans_data.txt"
lines = sc.textFile(dataFile)
# 創建Vector,將每行的數據用空格分隔後轉成浮點值返回numpy的array
data = lines.map(lambda line: np.array([float(x) for x in line.split(‘ ‘)]))
# 其中2是簇的個數
model = KMeans.train(data, 2)
print("Final centers: " + str(model.clusterCenters))
print("Total Cost: " + str(model.computeCost(data)))
14.spark mllib之快速入門