Spark MLlib 1.6 -- 聚類
聚類是根據某種相似度量,將‘相似’的樣本劃分到同一個子類中,所以聚類是一種無監督學習。聚類常用於探索分析,或(和)看作分層監督學習管道上一個環節(在這個管道上,對每個聚類結果再深入進行分類或迴歸)。
Spark.mllib包支援以下模型:
1.1 K-means (k均值聚類)
K均值聚類是常用的聚類演算法,它可以將樣本點聚合到已給定的幾個聚類集中。Spark.mllb實現了並行的K均值聚類演算法(k-means++) ,稱為kmeans|| 。spark.mllib中提供以下引數:
1) k: 期望的類別數
2)maxIterations: 每次計算的最大的迭代總次數
3)initializationMode
4) run: k-means 演算法計算的次數(因為演算法不能保證得到全域性最優解,固對同一個聚類資料集進行多次計算,以得出最優結果)
5)initializationSteps: K-means|| 演算法計算的步數。
6) epsilon: 一個閾值,用於判斷k-means演算法何時收斂
7) initialModel :每個初始聚類中心點的集合,如果提供此集合(非空),則算做一次計算步數。
例子:下面的程式碼可以在spark-shelll中執行。
下例程式碼在載入資料後,使用KMeans演算法對資料集聚成兩類。可以修改傳入演算法的
KMeansModelScala docs API :
importorg.apache.spark.mllib.clustering.{KMeans,KMeansModel}
importorg.apache.spark.mllib.linalg.Vectors
// Loadand parse the data
val data = sc.textFile
valparsedData = data.map(s =>Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Clusterthe data into two classes using KMeans
valnumClusters =2
valnumIterations =20
val clusters =KMeans.train(parsedData, numClusters,numIterations)
//Evaluate clustering by computing Within Set Sum of Squared Errors
valWSSSE= clusters.computeCost(parsedData)
println("WithinSet Sum of Squared Errors = "+WSSSE)
// Saveand load model
clusters.save(sc,"myModelPath")
val sameModel=KMeansModel.load(sc,"myModelPath")
5.2 高斯混合聚類
高斯混合模型是一種概率分佈的組合模型,每個點按一定的先檢概率從k個子類中抽取,每個子類又服從高斯分佈。Spark.mllib使用期望最大化演算法,對給定樣本集中點推斷最大似然函式值,最終決定次樣本聚合到哪一個子類中。該演算法有以下引數:
1) k 期望聚類數
2)convergenceTol : 演算法收斂的判定條件,log似然最大??(the maximum change in log-likehood)
3) maxIterations:最大的迭代次數
4) initialModel:期望最大演算法的可選起始點,如果此引數省略,演算法會構造一個隨機的起始點。
例子
下例載入資料後,使用高斯混合聚類演算法將資料聚成兩類,可以修改k來修改期望聚合的類數。然後輸出混合模型的引數。
GaussianMixtureModelScala Docs API :
importorg.apache.spark.mllib.clustering.GaussianMixture
importorg.apache.spark.mllib.clustering.GaussianMixtureModel
importorg.apache.spark.mllib.linalg.Vectors
// Loadand parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
valparsedData = data.map(s =>Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Clusterthe data into two classes using GaussianMixture
val gmm =newGaussianMixture().setK(2).run(parsedData)
// Saveand load model
gmm.save(sc,"myGMMModel")
val sameModel=GaussianMixtureModel.load(sc,"myGMMModel")
// outputparameters of max-likelihood model
for(i <-0 until gmm.k){
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
5.3 冪迭代聚類(power iteration clustering PIC)
在演算法中,對聚類頂點使用兩兩相似度作為邊界點劃分屬性,可以提高處理聚類問題的效率(詳細見Lin and Cohen, Power Iteration Clusteringhttp://www.icml2010.org/papers/387.pdf)。使用冪迭代演算法對正交吸收矩陣計算偽特徵向量量,將這組特徵向量作為聚類的邊界點。Spark.mllib 中PIC 演算法使用GraphX 作為計算的基礎,它的輸入是(srcId, dstId,similarity) 三元組的RDD,輸出聚類模型。演算法中選取的相似度函式是非負的,PIC 演算法假設相似函式還滿足對稱性。輸入資料中(srcId,dstId0 對不考慮順序下只能出現至多一次,如果輸入資料中不含這個資料對(不考慮順序),則這兩個資料對的相似度為0. Spark-mllib在PIC演算法需要設定以下假設引數
1) k : 期望聚類數
2) maxIterations: 冪迭代最大次數
3) initializationMode: 模型初始化,預設使用”random” ,即使用隨機向量作為初始聚類的邊界點,可以設定”degree” 使用正交相似度和。(?)
例子:
我們給出使用spark.mllibPIC演算法的例子
PowerIterationClustering
實現PIC演算法,輸入引數(srcId:
Long , dstId : Long , similarity : Double ) 三元組的RDD 來表示吸收矩陣. 呼叫PowerIterationClustering.run
返回PowerIterationClusteringModel模型例項。
importorg.apache.spark.mllib.clustering.{PowerIterationClustering,PowerIterationClusteringModel}
importorg.apache.spark.mllib.linalg.Vectors
// Loadand parse the data
val data = sc.textFile("data/mllib/pic_data.txt")
valsimilarities = data.map { line =>
val parts = line.split(' ')
(parts(0).toLong, parts(1).toLong, parts(2).toDouble)
}
// Clusterthe data into two classes using PowerIterationClustering
val pic =newPowerIterationClustering()
.setK(2)
.setMaxIterations(10)
val model = pic.run(similarities)
model.assignments.foreach { a =>
println(s"${a.id}-> ${a.cluster}")
}
// Saveand load model
model.save(sc,"myModelPath")
val sameModel=PowerIterationClusteringModel.load(sc,"myModelPath")
完整例了見:examples下
5.4 隱性狄利克雷劃分(LDA)
Latent Dirichletallocation (LDA) 是一種主題模型,即從文字文件集中推斷文章的主題。同樣,LDA也可以看作是一種聚類問題。
1) 主題相應於聚類的中心,每篇文章相應於樣本資料集中一條記錄
2) 主題和文件都存在於特徵空間,特徵向量的每個元素是在某篇文章中,每個詞出現的次數(單詞袋)
3) LDA使用基於統計模型的函式來計算什麼樣本點應該聚成一類,這個函式會根據每個文章選取的不同而使用不同的函式形式,它不使用傳統的距離度量來計算是否歸入同一類。
通過設定setOptimizer ,LDA 支援選取不同的推斷演算法。可選值為:
1 EMLDAOptimizer使用似然函式期望最大的原則來優化聚類模型,這樣訓練出來的結果易於解釋.
2 OnlineLDAOptimizer對線上變分推斷使用迭代最小批取樣進行優化,這樣訓練的過程記憶體消耗穩定。
LDA 將每個文件作為一個向量,每個向量的元素是每個單詞在文件中出現的次數,需要設定以下引數:
1) k: 主題數(或聚類中心數)
2) optimizer:LDA訓練中使用的優化器, 可選EMLDAOptimizer / OnlineLDAOptimizer
3) docConcentration:狄利克雷引數,每篇文件的主題先驗概率分佈. 此值越大,則推斷的分佈越光滑。
4)topicConcentration: 狄利克雷引數,每個主題的詞的先驗概率分佈,此值越大,則推斷的公佈越光滑
5) maxIterations: 迭代次數的上限值
6)checkpointInterval: 如果使用週期檢查點(sparkconfiguration配置),這個引數確定建立檢查點頻次。如果 maxIterations越大,使用檢查點可以降低在磁碟上對檔案大小進行排序的次數,同時可以在任務失敗時快速恢復。
Spark.mllib’sLDA 模型支援:
1) describeTopics:返回文件中最重要的關鍵詞序列,以及每個關鍵詞的權重
2) topicsMatrix:返回 vocabSize X k 的矩陣,矩陣每列是一個主題
注意: LDA 演算法特徵還處於實驗開發階段,這樣,某種特徵只能用優化器中一種處理,模型的訓練完全依賴於優化器。現階段,分散式的模型可以轉化為本地執行模式,但是反之不成立。
下面分別講解優化器和模型。
5.4.1 期望最大化
LDA 中引數。
1) docConcentration:只支援對稱先驗(?),所以k-維向量所有值是一致的。並且所有值要大於1.0( > 1.0)。 vector(-1) 會使用預設值(如k 維向量所有元素值都為 (50 / k) + 1 )
2) topicConcentration:只支援對稱先驗(?) , 所以值要大於1.0 ,如果是-1 ,則使用預設值,如0.1 + 1
3)maxIterations: 最大EM迭代次數.
注意:迭代足夠次數是重要的,在最初的迭代中,EM會產生很多無用的主題,但隨機迭代的繼續,這些無用的主題會被替換。請至少使用20次迭代,根據資料集建議使用50 ~ 100 次迭代。
EMLDAOptimizer 生成 DistributedLDAModel 模型,此模型不僅儲存了推斷的主題集,同時儲存完整訓練語料,和每個文件的主題分佈。 DistributedLDAModel支援:
1) topTopicsPerDocument:每個訓練語料最匹配的主題及相應的權重。
2)topDocumentsPerTopic: 每個主題最匹配的文件,以及在主題內的相應權重
3) logPrior : 估計主題的log概率,以及給定假設引數docConcentration和 topicConcentration 下文件-主題二維分佈。
4) logLikelihood:訓練語料的log 似然,給定推斷主題和文件-主題分佈
5.4.2 線上變分貝葉斯(online variational bayes)
OnlineLDAOptimizer和 LocalLDAModel
LDA 提供以下引數配置:
1) docConcentration:非對稱先驗(如狄利克雷引數值)組成K-維向量,且每個值大於0。Vector(-1) 會使用預設值(如k-維向量每個元素是(1.0 / k ) )
2)topicConcentration: 只支援對稱先驗,且值要不小於0 (>=0) 。如果給出 -1會使用預設值,如 1.0 / k
3) maxIterations: 最大可提交的最小批數。
除此以外,OnlineLDAOptimizer 接受以下引數
1)miniBatchFraction: 每次迭代使用取樣語料的比例
2)optimizeDocConcentration: 如果設定為true , 在每次最小批迭代後對假設引數docConcentration(aka alpha) 使用最大似然估計,並在返回的LocalLDAModel 中配置此最優的docConcentration值
3) tau0 and kappa: 在學習速率衰減中需要這兩個引數,即計算 ( \Tau_0 + iter )^ -K 此處iter 是當前迭代序號
OnlineLDAOptimizer生成 LocalLDAModdel , 此模型儲存推斷主題, LocalLDAModel支援:
1) logLikelihood(documents): 給定推斷主題,計算每個文件的log-似然函式下界
2)logPerplexity(documents): 給定推斷主題,計算每個文件的log-複雜度函式上界
例子
接下來的例子,我們載入文件的語料向量,即每個向量值是文件中詞出來的頻次。然後使用LDA演算法預測文件的三個主題,通過修改k可以配置演算法期望的聚類數,然後輸出每個單詞的概率分佈。
importorg.apache.spark.mllib.clustering.{LDA,DistributedLDAModel}
importorg.apache.spark.mllib.linalg.Vectors
// Loadand parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
valparsedData = data.map(s =>Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Indexdocuments with unique IDs
val corpus =parsedData.zipWithIndex.map(_.swap).cache()
// Clusterthe documents into three topics using LDA
val ldaModel =newLDA().setK(3).run(corpus)
// Outputtopics. Each is a distribution over words (matching word count vectors)
println("Learnedtopics (as distributions over vocab of "+ ldaModel.vocabSize +"words):")
val topics = ldaModel.topicsMatrix
for(topic <-Range(0,3)){
print("Topic"+ topic +":")
for(word <-Range(0, ldaModel.vocabSize)){ print(""+ topics(word, topic));}
println()
}
// Saveand load model.
ldaModel.save(sc,"myLDAModel")
val sameModel=DistributedLDAModel.load(sc,"myLDAModel")
5.5 二分 K-means
二分k-means 演算法通常會比普通的K-means演算法要快,但兩種演算法生成的聚類結果卻不同。
二分k-means 是一種分層聚類,分層聚類在聚類分析中經常用於,當需要構建一個多層聚類時。分層聚類的策略一般有兩種:
1) Agglomerative 自下向上聚合:演算法最初從每個點獨立為一個聚類開始,每次遞迴地將多個聚類中點合併到一個聚類。
2) Divisive: 自上而下細分:演算法最初從一個聚類開始,每次遞迴地將一個聚類中點細分成多個聚類。
二分k-means 演算法是一種分治演算法,MLLib中需要配置以下引數
1) k : 葉子聚類(?)的期望類別數(預設4)。如果最終葉子聚類沒有細分,實際的聚類數是小於此值的。
2) maxIterations: k-means 每次迭代細分的最大聚類類別(預設20)
3)minDivisibleClusterSize : 如果此值不小於1.0,則是每個聚類最小點數,如果此值小於1.0 ,則是每個聚類最小點的佔比。
4) seed: 隨機種子(預設類別名字的hash值)
例子
importorg.apache.spark.mllib.clustering.BisectingKMeans
importorg.apache.spark.mllib.linalg.{Vector,Vectors}
// Loads andparses data
def parse(line:String):Vector=Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clusteringthe data into 6 clusters by BisectingKMeans.
val bkm =newBisectingKMeans().setK(6)
val model = bkm.run(data)
// Show thecompute cost and the cluster centers
println(s"ComputeCost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach {case(center, idx)=>
println(s"Cluster Center ${idx}: ${center}")
}
詳細的例子程式碼見:examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala
5.6 流式k-means
當資料以流式產生,我們可能需要動態估計聚類,每當有資料到達就更新模型。Spark.mllib 支援流式k-means聚類,配置控制估計的衰減值。演算法使用通用最小批k-means更新聚類規則。對每批次資料,演算法把資料聚類到距離最近的聚類上。重新計算新聚類的中心,再更新其它所有聚類的引數
c_{t+1} = frac { c_t n_t \Alpha + x_t m_t } { n_t \Alpha+ m_t} (1)
n_{t+1} = n_t + m_t (2)
此處 c_t 是先前的聚類中心, n_t 是此聚類的點數, x_t 是當前批次產生新的聚類中心, m_t 是當前批次中新新增到此聚類的點數。衰減因子 \Alpha 可用來降低更早批次資料的權重,當 \Alpha = 1 時,所有點會從頭到尾使用; 當 \Alpha = 0 裡,最新一次迭代的資料不參考。這個類似於指數權重的移動平均。
可以設定halfLife 引數控制衰減,通過此引數可以配置適當的衰減因子\Alpha , 對時間t 時的資料,經過halfLife 時間後原始資料會清除掉一半。這裡的時間單位可以是批次,或點數,相應的更新規則也需要調整。
下例給出流式資料的聚類估計
首先需要匯入類
importorg.apache.spark.mllib.linalg.Vectors
importorg.apache.spark.mllib.regression.LabeledPoint
importorg.apache.spark.mllib.clustering.StreamingKMeans
val trainingData= ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)
先建立一個隨機聚類的模型,然後確定聚類的類別數
val numDimensions=3
val numClusters=2
val model =newStreamingKMeans()
.setK(numClusters)
.setDecayFactor(1.0)
.setRandomCenters(numDimensions,0.0)
現在可以註冊訓練資料流和測試資料流,並啟動job , 當新資料點到達時,打印出預測新資料的聚類標籤
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp =>(lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
隨著新增新的資料檔案,全部聚類中心點會更新。每個訓練點的格式是[x_1, x_2, x_3] ,同樣每個測試資料的格式 ( y , [x_1,x_2, x_3]) ,此處y 是類別標籤或類別標識. 只要有新檔案到達/training/data/dir 目錄下,新檔案中點實時預測結果打印出來,並且模型同時會更新聚類中心。持續的新資料使用聚類中心持續變化!