1. 程式人生 > >Spark機器學習

Spark機器學習

tin ordering 自身 優點 根據 最好 man ray ron

這篇文章參考《Spark快速大數據分析》,歸納spark技術核心的rdd及MLlib以及其中幾個重要庫的使用。

初始化操作

spark shell: bin/pyspark
每個spark應用都由一個驅動器程序(driver program)來發起集群上的各種並行操作,驅動器程序包含應用的main函數,並且定義了集群上的分布式數據集,還對這些分布式數據集應用了相關操作,驅動器程序通過一個sparkcontext對象來訪問spark(sc),這個對象代表對計算集群的一個連接。可以用它來創建rdd.

sc.textFile(“file-name”) #創建一個代表文件的rdd
sc.parallelize([,,])#把一個已有的集合傳遞給spark
bin
/spark-submit python-file.py #初始化sparkContext: from pyspark import SparkConf,SparkContext conf=SparkConf().setMaster(“local”).setAppName(“MYAPP”) SC=SparkContext(conf=conf)

Rdd

rdd有兩種操作:轉化操作和行動操作,轉化操作會由一個rdd生成一個新的rdd,行動操作會對rdd計算出一個結果。spark惰性計算轉化操作,只有在第一個行動操作中用到時,才會真正計算。

常見的Rdd轉化操作

  • map():接收一個函數,把這個函數用於rdd的每個元素,將函數的返回結果作為結果rdd中對應元素的值。lambda表達式
  • filter():接收一個函數,並將rdd中滿足該函數的元素放入新的rdd中返回。
  • flatmap():每個輸入元素生成多個輸出元素,一個簡單應用是把輸入的字符串切分為單詞。
  • distinct():生成一個只包含不同元素的新rdd。開銷很大,因為它需要將所有數據通過網絡進行混洗(shuffle).
  • union(other):返回一個包含兩個rdd中所有元素的rdd,會重復
  • intersection(other):返回兩個rdd中都有的元素。會去重復
  • substract(other):只存在第一個rdd中,不存在第二個rdd中的,會suffle
  • cartesian(other):計算笛卡爾集,返回所有可能的(a,b)對,用途:用戶對各種產品的預期興趣度,自身做笛卡爾積,用於求用戶相似度的應用中。-開銷大
  • sample(with replacement,fraction,[seed]),采樣rdd以及是否替換

常見的rdd行動操作

  • collect():返回rdd中所有的元素
  • count():rdd中元素
  • countByvalue():各元素在rdd中出現的次數
  • take(num):從rdd中返回num個元素
  • top(num):返回最前面的num個元素
  • takeOrderd(num)(ordering-func):從rdd中按照提供的順序返回最前面的num個元素
  • ruduce(func): 並行整合rdd中的所有數據
  • fold(zero)(func):與reduce一樣,但是需要提供初始值
  • aggregate(zerovalue(seqop,combop))和reduce相似,但是通常返回不同類型的函數
  • foreach(func):對rdd中每個元素使用給定的函數

鍵值對操作 pair Rdd

提供了並行操作各個鍵或跨節點重新進行數據分組的操作接口。
reduceByKey()可以分別規約每個鍵對應的數據
join()把兩個rdd中鍵相同的元素組合在一起
創建pair rdd

提供了並行操作各個鍵或跨節點重新進行數據分組的操作接口。
reduceByKey()可以分別規約每個鍵對應的數據
join()把兩個rdd中鍵相同的元素組合在一起
創建pair rdd
把普通的rdd轉換成pair rdd,map操作傳遞的函數需要返回鍵值對
pairs=lines.map(lambda x: (x.split(“”)[0],x))
轉化操作:
reduceByKey() 合並具有相同鍵的值
groupBykey() 對具有相同鍵的值進行分組
combineByKey(createCombiner,mergevalue, merge combiners,partitioner):使用不同的返回類型合並具有相同鍵的值
mapValues(func)對每個value應用函數,不改變鍵值
flatMapValues(func)對每個value應用一個返回叠代器的函數,然後對返回的每個元素都生成一個對應原鍵的鍵值對記錄,通常用於符號化

keys() 返回一個僅包含鍵的rdd
values() 返回一個僅包含值得rdd
sortByKet() 返回一個根據鍵排序的rdd
針對兩個pair rdd的轉化操作 p43
subtractByKey(other) 刪掉rdd中鍵與other rdd中的鍵相同的元素

MLlib

mllib只包含能夠在集群上運行良好的並行算法。
預先安裝gfortran線性代數運行庫
數據類型
位於org.apache.spark.mllib包內:
vector:通過mllib.linalg.vectors類創建出來

from bumpy import array
from pyspark.mllib.linalg import vectors

創建稠密向量

densevec1=array(1.0,2.0.3.0])#直接傳numpy數組
densevec2=Verctors.dense([1.0,2.0,3.0])

創建稀疏向量,該方法只接收向量的維度以及非零位置和值
這些位置可以用一個dictionary來傳遞,或使用兩個分別表示位置和值的list

sparsevec1=Vectors.sparse(4,{0:1.0,2:2.0})
sparsevec2=Vectors.sparse(4,[0,2],[1.0,2.0])

LabledPoint:在分類和回歸這樣的監督式學習中,表示帶標簽的數據點,包含一個特征向量和一個標簽(由一個浮點數表示),在mllib.regression中
rating:用戶對一個產品的評分,在mllib.recommendation中
各類Model類:是訓練算法的結果,一般有一個predict()方法,用來對新的數據點或者數據點組成的rdd應用該模型進行預測

特征提取

mllib.feature
tf-idf:
HashingTF:從一個文檔中計算出給定大小的詞頻向量,采用了hash法,要求每個“文檔”都使用對象的可叠代序列來表示

#IDF計算逆文檔頻率
from pyspark.mllib.feature import HashingTF,IDF
rdd=sc.wholeTextFiles(“data”).map(lambda(name,text):text.split())
tf=HashingTF()
tfvectors=tf.transform(rdd).cache()
#計算idf
idf=IDF()
idfmodel= idf.fit(tfvectors)
tfidfvectors=idfmodel.transform(tfvectors)

縮放:歸一化,考慮特征向量中各元素的幅值,並且在特征縮放調整為平等對待時表現最好,例如特征平均值為0,標準差為1
方法:創建一個StandardScaler類的對象,對數據集調用fit()函數來獲取一個StandardScaleModel(也就是計算每一列得平均值和標準差),然後使用這個模型對象的transform()方法來縮放一個數據集。

正則化:把向量的長度正規化為1 Nomalizer.transform(rdd),一般情況下是L2範式,歐幾裏得距離。

word2Vec:是一個基於神經網絡的文本特征化算法,需要傳給他一個用string類的iterable表示的語料庫。訓練word2vec.fit(rdd)之後,得到一個word2vecModel,它可以用來將每個單詞通過transform()轉換為一個向量。算法模型的大小等於詞庫中單詞數乘以向量的大小

統計 mllib.stat.Statistics
Statistics.colStats(rdd):計算由向量組成的rdd的統計性綜述,保存著向量集合中每列的最小值、最大值、平均值和方差。
Statistics.corr(rdd,method)計算由向量組成的rdd中的列間的相關矩陣,使用皮爾遜相關或者斯皮爾曼相關,method必須是pearson或spearman中的一個
Statistics.corr(rdd1,rdd2,method)計算兩個由浮點值組成的RDD的相關矩陣
Statistics.chiSqTest(rdd)計算由LabledPoint對象組成的rdd中每個特征與標簽的皮爾遜獨立性測試結果,返回一個ChiSqTestResult對象,其中有p-value、測試統計及每個特征的自由度。標簽和特征必須是離散的。

線性回歸

分類與回歸,監督學習,都使用到mllib.regression.LabledPoint類,lable+freature向量
指用特征的線性組合來預測輸出值,也支持L1和L2的正則回歸,Lasso和ridge回歸
mllib.regression.LinearRegressionWithSGD,LassoWithSGD,ridgeRegressionWithSGD.
SGD表示隨機梯度下降法
這幾個類都有幾個可以用來對算法進行調優的參數:
numIterations,要運行的叠代次數,默認100
stepSize,梯度下降的步長
intercept:是否給數據加上一個幹擾特征或者偏差特征也就是值始終為1的特征,默認值false
regParam:Lasso和ridge的正則化參數

form pyspark.mllib.regression import LabledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
points=#(創建LabledPoint組成的rdd)
model= LinearRegressionWithSGD.train(points,iteration=200,intercept=True)
print “weight: %s, intercept:%s”%(model.weights,model.intercept)

邏輯回歸

是一種二元分類方法,用來尋找一個分割數據的線性分隔平面.可以支持SGD/LBFGS算法
http://blog.sina.com.cn/s/blog_eb3aea990101gflj.html
LogisticRegressionModel可以為每個點求出一個在0,1之間的得分,之後會基於一個閾值返回0和1,默認情況下對於0.5,他會返回1,可以通過setThreshold()改變閾值,也可以通過clearThreshold()去除閾值設置,這樣的話predict()就會返回原始得分。

支持向量機

使用線性分割平面的二元分類算法,預期0或者1的標簽,通過SVMWithSGD類,我們可以訪問這種算法,返回SVMModel

樸素貝葉斯

多元分類算法,使用基於特征的線性函數計算將一個點分到各類中的得分,這種算法通常用於TF-IDF特征的額文本分類,mllib實現了多項樸素貝葉斯算法,需要非負的頻次作為輸入特征.

mllib.classification.NaiveBayes類來使用樸素貝葉斯算法,支持一個參數lambda_,用來進行平滑化,可以對一個由LabledPoint組成的rdd調用樸素貝葉斯算法,對於C個分類,標簽值的範圍是0-C-1,返回NaiveBayesModel,可以使用predict()預測對某個點最適合的分類,也可以訪問訓練好的模型的兩個參數:各特征與各分類的可能性矩陣theta(對於C個分類和D個特征的情況,矩陣大小是C*D),以及先驗概率的C維向量pi

決策樹與隨機森林

決策樹可以用來分類也可以用來回歸,以節點樹德形式表示,每個節點基於數據的特征作出一個二元決定,而樹德每個葉節點包含一種預測結果,優點:模型本身容易檢查,既支持分類的特征,也支持連續的特征。
mllib.tree.DecisionTree類中的靜態方法trainClassifier()和trainRegressor來訓練決策樹,訓練方法接收如下參數:
data:由LabledPoint組成的rdd
numClasses:要使用的類別數量
impurity,節點的不純凈度,對於分類可以為gini或entropy,對於回歸必須為variance.
maxDepth,樹的最大深度(默認值:5)
maxBins:在構建各節點時將數據分到多個箱子中,推薦值32
categoricalFeaturesInfo,一個映射表,用於指定哪些特征是分類的,以及他們各自有多少個分類,例如,如果特征1是標簽0或1的二元特征,特征2是一個標簽為0,1,2的三元特征,就可以傳遞{1:2,2:3},如果沒有特征是分類的,就傳遞一個空得映射表
train()方法會返回一個DecisionTreeModel對象,對象的predict()方法對一個新的特征向量預測對應的值,或者預測一個向量rdd,可以使用toDebugString()來輸出這棵樹
randomForest類可以用來構建一組樹的集合,隨機森林,randomForest.trainClassifier()和trainRegressor(),接收如下參數:
numTrees 構建樹德數量,提高numTrees 可以降低對訓練數據過度擬合的可能性。
featureSubsetStrategy 在每個節點上作決定時需要考慮的特征數量,可以是auto,all,sqrt,log2以及one third,越大的值所花費的代價越大。
seed所使用的隨機數的種子
隨機森林返回一個WeightedEnsembleModel對象,其中包含幾個決策樹(在WeakHypotheses字段中,權重由WeakHypothesisWeights決定),可以對RDD或者vector調用predict(),toDebugString()可以輸出所有樹

聚類

mllib.clustering.KMeans 調用train(),接收一個Vector組成的rdd作為參數,返回一個KmeansModel對象,可訪問該對象的clusterCenters屬性(聚類中心,是一個向量的數組),調用predict(),返回改點聚類最近的聚類中心
參數:
initializationMode 用來初始化聚類中心,可以是k-means|| 或者是random,前者是默認值,一般效果更好,但是代價高
maxIterations 運行的最大叠代次數 默認值是100
runs 算法並發運行的數目

協同過濾與推薦

協調過濾是一種根據用戶對各種產品的交互與評分來推薦新產品的推薦系統技術
交替最小二乘 ALS,是協同過濾的常用算法,mllib.recommendation.ALS類中,ALS會為每個用戶和產品都設一個特征向量,這樣用戶向量與產品向量的點積就接近他們的得分。
參數:
rank 使用的特征向量的大小,更大的特征向量會產生更好的模型,但是花銷也比較大,默認是10
iterations 要執行叠代次數 默認值10
lambda 正則化參數
alpha 用來在隱式ALS中計算置信度的常量,默認值1.0
numUserBlocks.numProductBlocks 切分用戶和產品數據的塊的數目,用來控制並行度,傳遞-1,mllib自動決定要使用ALS,需要一個由mllib.recommendation.Rating對象組成的rdd,其中每個包含一個用戶id,一個產品id,一個評分,id需要是一個32位的整形值,如果是大數字或者字符串,需要用哈希值,或者broadcast()一張從產品id到整形值的表。
ALS返回一個MatrixFactorizationModel對象來表示結果,可以調用predict()來對一個由(userid,numproduct)來為用戶找到最值得推薦的前numproduct個產品,MatrixFactorizationModel對象很大,為每個用戶和產品都存儲了一個向量。模型生成的model.userFeatures和model.productFeatures保存在分布式文件系統上
默認情況是顯示評分,隱式反饋需要調用ALS.trainImplicit()顯示評分時,每個用戶對一個產品的評分是一個得分預測出來的也是評分,隱式反饋,每個評分代表用戶和給定產品發生交互的置信度,預測出來的也是置信度

降維

主成分分析PCA: 把數據映射到低維空間,讓數據在低位空間表示的方差最大化,要計算這種映射,需要構建出正則化的相關矩陣,並使用這個矩陣的奇異向量和奇異值,與最大的一部分奇異值相對應的奇異向量可以用來重建原始數據的主要成分。
mllib.linalg.distributed.RowMatrix類來表示矩陣,然後存儲一個由Vector組成的RDD,每行一個
scala的pca

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val points: RDD[vector]=//
val mat: RowMatrix=new RowMatrix(points)
val pc: Matrix=mat.computePrincipalComponents(2)
//將點投影到低維空間
val projected=mat.multiply(pc).rows
//在投影出得二維數據上訓練k-means模型
val model=KMeans.train(projected,10)

奇異值分解

SVD會把一個m*n的矩陣A分解為三個矩陣A~UTVT,U是一個正交矩陣,它的列被稱為左奇異向量,T是一個對角線上德值均為非負數並且降序排列的對角矩陣,它的對角線上德值被稱為奇異值,V是一個正交矩陣,它的列被稱為右奇異向量
對於大型矩陣,通常不需要進行完全分解,只需要分解出靠前的奇異值和與之對應的奇異向量即可,這樣可以節省存儲空間,降噪,並有利於恢復低秩矩陣。如果保留前k個奇異值,那麽結果矩陣就會是U:m*k,T:k*k,V:n*k

val svd: 
SingularValueDecomposition[RowMatrix,matrix]=mat.somputeSVD(20,computeU=true)
val U: RowMatrix =svd.U //U是一個分布式RowMatrix
val S: vector = svd.s //奇異值,用一個局部稠密向量表示
val V: Matrix=svd.V //V是一個局部稠密矩陣

模型評估

mllib.evaluation包,使用BinaryClassificationMetrics和MulticlassMetrics類,可以從(預測,事實)對組成的RDD上創建一個Metrics對象,然後計算諸如準確率、召回率、接受者操作特性ROC曲線下面積等指標,這些方法運行在一個非訓練集上,生成由(預測、事實)對組成的RDD緩存值,如果內存中放不下,可以用persist(Storagelevel.DISK_ONLY)

Spark機器學習