1. 程式人生 > >Spark MLlib 1.6 -- 聚類

Spark MLlib 1.6 -- 聚類

聚類是根據某種相似度量,將相似的樣本劃分到同一個子類中,所以聚類是一種無監督學習。聚類常用於探索分析,或(和)看作分層監督學習管道上一個環節(在這個管道上,對每個聚類結果再深入進行分類或迴歸)。

Spark.mllib包支援以下模型:

1.1  K-means (k均值聚類)

K均值聚類是常用的聚類演算法,它可以將樣本點聚合到已給定的幾個聚類集中。Spark.mllb實現了並行的K均值聚類演算法(k-means++) ,稱為kmeans|| spark.mllib中提供以下引數:

1) k: 期望的類別數

2)maxIterations:  每次計算的最大的迭代總次數

3)initializationMode

:確定k-means||演算法的隨機初始化或普通初始化。

4) run: k-means 演算法計算的次數(因為演算法不能保證得到全域性最優解,固對同一個聚類資料集進行多次計算,以得出最優結果)

5)initializationSteps:  K-means|| 演算法計算的步數。

6) epsilon:  一個閾值,用於判斷k-means演算法何時收斂

7) initialModel :每個初始聚類中心點的集合,如果提供此集合(非空),則算做一次計算步數。

例子:下面的程式碼可以在spark-shelll中執行。

下例程式碼在載入資料後,使用KMeans演算法對資料集聚成兩類。可以修改傳入演算法的

K值修改最終聚類的類別數。演算法緊接著計算類內方差和(WSSSE)。此引數的期望的類別數有關,可以容易想到,當聚類類別數越多時(或類別聚合更細時),這個值會變小。在實踐中,做多次實驗後將kWSSSE繪製曲線,最優的k值是曲線上波谷拐點。

KMeansModelScala docs API :

importorg.apache.spark.mllib.clustering.{KMeans,KMeansModel}

importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data

val data = sc.textFile

("data/mllib/kmeans_data.txt")

valparsedData = data.map(s =>Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Clusterthe data into two classes using KMeans

valnumClusters =2

valnumIterations =20

val clusters =KMeans.train(parsedData, numClusters,numIterations)

//Evaluate clustering by computing Within Set Sum of Squared Errors

valWSSSE= clusters.computeCost(parsedData)

println("WithinSet Sum of Squared Errors = "+WSSSE)

// Saveand load model

clusters.save(sc,"myModelPath")

val sameModel=KMeansModel.load(sc,"myModelPath")

5.2 高斯混合聚類

高斯混合模型是一種概率分佈的組合模型,每個點按一定的先檢概率從k個子類中抽取,每個子類又服從高斯分佈。Spark.mllib使用期望最大化演算法,對給定樣本集中點推斷最大似然函式值,最終決定次樣本聚合到哪一個子類中。該演算法有以下引數:

1) k 期望聚類數

2)convergenceTol : 演算法收斂的判定條件,log似然最大??(the maximum change in log-likehood

3) maxIterations最大的迭代次數

4) initialModel:期望最大演算法的可選起始點,如果此引數省略,演算法會構造一個隨機的起始點。

例子

下例載入資料後,使用高斯混合聚類演算法將資料聚成兩類,可以修改k來修改期望聚合的類數。然後輸出混合模型的引數。

GaussianMixtureModelScala Docs API :

importorg.apache.spark.mllib.clustering.GaussianMixture

importorg.apache.spark.mllib.clustering.GaussianMixtureModel

importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data

val data = sc.textFile("data/mllib/gmm_data.txt")

valparsedData = data.map(s =>Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// Clusterthe data into two classes using GaussianMixture

val gmm =newGaussianMixture().setK(2).run(parsedData)

// Saveand load model

gmm.save(sc,"myGMMModel")

val sameModel=GaussianMixtureModel.load(sc,"myGMMModel")

// outputparameters of max-likelihood model

for(i <-0 until gmm.k){

 println("weight=%f\nmu=%s\nsigma=\n%s\n" format

(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))

}

5.3  冪迭代聚類(power iteration clustering PIC

在演算法中,對聚類頂點使用兩兩相似度作為邊界點劃分屬性,可以提高處理聚類問題的效率(詳細見Lin and Cohen, Power Iteration Clusteringhttp://www.icml2010.org/papers/387.pdf)。使用冪迭代演算法對正交吸收矩陣計算偽特徵向量量,將這組特徵向量作為聚類的邊界點。Spark.mllib PIC 演算法使用GraphX 作為計算的基礎,它的輸入是(srcId, dstId,similarity) 三元組的RDD,輸出聚類模型。演算法中選取的相似度函式是非負的,PIC 演算法假設相似函式還滿足對稱性。輸入資料中(srcId,dstId0 對不考慮順序下只能出現至多一次,如果輸入資料中不含這個資料對(不考慮順序),則這兩個資料對的相似度為0. Spark-mllibPIC演算法需要設定以下假設引數

1) k : 期望聚類數

2) maxIterations: 冪迭代最大次數

3) initializationMode: 模型初始化,預設使用”random” ,即使用隨機向量作為初始聚類的邊界點,可以設定”degree” 使用正交相似度和。(?)

例子:

我們給出使用spark.mllibPIC演算法的例子

PowerIterationClustering 實現PIC演算法,輸入引數(srcId: Long , dstId : Long , similarity : Double ) 三元組的RDD 來表示吸收矩陣. 呼叫PowerIterationClustering.run 返回PowerIterationClusteringModel模型例項。

importorg.apache.spark.mllib.clustering.{PowerIterationClustering,PowerIterationClusteringModel}

importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data

val data = sc.textFile("data/mllib/pic_data.txt")

valsimilarities = data.map { line =>

val parts = line.split(' ')

(parts(0).toLong, parts(1).toLong, parts(2).toDouble)

}

// Clusterthe data into two classes using PowerIterationClustering

val pic =newPowerIterationClustering()

.setK(2)

.setMaxIterations(10)

val model = pic.run(similarities)

model.assignments.foreach { a =>

 println(s"${a.id}-> ${a.cluster}")

}

// Saveand load model

model.save(sc,"myModelPath")

val sameModel=PowerIterationClusteringModel.load(sc,"myModelPath")

完整例了見:examples

5.4 隱性狄利克雷劃分(LDA)

Latent Dirichletallocation (LDA) 是一種主題模型,即從文字文件集中推斷文章的主題。同樣,LDA也可以看作是一種聚類問題。

1) 主題相應於聚類的中心,每篇文章相應於樣本資料集中一條記錄

2) 主題和文件都存在於特徵空間,特徵向量的每個元素是在某篇文章中,每個詞出現的次數(單詞袋)

3) LDA使用基於統計模型的函式來計算什麼樣本點應該聚成一類,這個函式會根據每個文章選取的不同而使用不同的函式形式,它不使用傳統的距離度量來計算是否歸入同一類。

通過設定setOptimizer ,LDA 支援選取不同的推斷演算法。可選值為:

1 EMLDAOptimizer使用似然函式期望最大的原則來優化聚類模型,這樣訓練出來的結果易於解釋.

2 OnlineLDAOptimizer對線上變分推斷使用迭代最小批取樣進行優化,這樣訓練的過程記憶體消耗穩定。

LDA 將每個文件作為一個向量,每個向量的元素是每個單詞在文件中出現的次數,需要設定以下引數

1) k: 主題數(或聚類中心數)

2) optimizer:LDA訓練中使用的優化器, 可選EMLDAOptimizer / OnlineLDAOptimizer

3) docConcentration:狄利克雷引數,每篇文件的主題先驗概率分佈. 此值越大,則推斷的分佈越光滑。

4)topicConcentration: 狄利克雷引數,每個主題的詞的先驗概率分佈,此值越大,則推斷的公佈越光滑

5) maxIterations: 迭代次數的上限值

6)checkpointInterval: 如果使用週期檢查點(sparkconfiguration配置)這個引數確定建立檢查點頻次。如果 maxIterations越大,使用檢查點可以降低在磁碟上對檔案大小進行排序的次數,同時可以在任務失敗時快速恢復。

Spark.mllib’sLDA 模型支援:

1) describeTopics:返回文件中最重要的關鍵詞序列,以及每個關鍵詞的權重

2) topicsMatrix:返回 vocabSize X k 的矩陣,矩陣每列是一個主題

注意: LDA 演算法特徵還處於實驗開發階段,這樣,某種特徵只能用優化器中一種處理,模型的訓練完全依賴於優化器。現階段,分散式的模型可以轉化為本地執行模式,但是反之不成立。

下面分別講解優化器和模型。

5.4.1 期望最大化

LDA 中引數

1) docConcentration:只支援對稱先驗(?),所以k-維向量所有值是一致的。並且所有值要大於1.0( > 1.0) vector(-1) 會使用預設值(如k 維向量所有元素值都為 (50 / k) + 1

2) topicConcentration:只支援對稱先驗(?) , 所以值要大於1.0 如果是-1 則使用預設值,如0.1 + 1

3)maxIterations:  最大EM迭代次數.

注意:迭代足夠次數是重要的,在最初的迭代中,EM會產生很多無用的主題,但隨機迭代的繼續,這些無用的主題會被替換。請至少使用20次迭代,根據資料集建議使用50 ~ 100 次迭代。

EMLDAOptimizer 生成 DistributedLDAModel 模型,此模型不僅儲存了推斷的主題集,同時儲存完整訓練語料,和每個文件的主題分佈。 DistributedLDAModel支援:

1) topTopicsPerDocument:每個訓練語料最匹配的主題及相應的權重。

2)topDocumentsPerTopic: 每個主題最匹配的文件,以及在主題內的相應權重

3) logPrior : 估計主題的log概率以及給定假設引數docConcentration topicConcentration 下文件-主題二維分佈。

4) logLikelihood:訓練語料的log 似然,給定推斷主題和文件-主題分佈

5.4.2 線上變分貝葉斯(online variational bayes)

OnlineLDAOptimizer LocalLDAModel

LDA 提供以下引數配置:

1) docConcentration:非對稱先驗(如狄利克雷引數值)組成K-維向量,且每個值大於0Vector(-1) 會使用預設值(k-維向量每個元素是(1.0 / k )

2)topicConcentration: 只支援對稱先驗,且值要不小於0 (>=0) 。如果給出 -1會使用預設值,如 1.0 / k

3) maxIterations:  最大可提交的最小批數。

除此以外,OnlineLDAOptimizer 接受以下引數

1)miniBatchFraction: 每次迭代使用取樣語料的比例

2)optimizeDocConcentration: 如果設定為true , 在每次最小批迭代後對假設引數docConcentration(aka alpha) 使用最大似然估計,並在返回的LocalLDAModel 中配置此最優的docConcentration

3) tau0 and kappa: 在學習速率衰減中需要這兩個引數,即計算 ( \Tau_0 + iter )^ -K 此處iter 是當前迭代序號

OnlineLDAOptimizer生成 LocalLDAModdel , 此模型儲存推斷主題, LocalLDAModel支援:

1) logLikelihood(documents): 給定推斷主題,計算每個文件的log-似然函式下界

2)logPerplexity(documents): 給定推斷主題,計算每個文件的log-複雜度函式上界

例子

接下來的例子,我們載入文件的語料向量,即每個向量值是文件中詞出來的頻次。然後使用LDA演算法預測文件的三個主題,通過修改k可以配置演算法期望的聚類數,然後輸出每個單詞的概率分佈。

importorg.apache.spark.mllib.clustering.{LDA,DistributedLDAModel}

importorg.apache.spark.mllib.linalg.Vectors

// Loadand parse the data

val data = sc.textFile("data/mllib/sample_lda_data.txt")

valparsedData = data.map(s =>Vectors.dense(s.trim.split(' ').map(_.toDouble)))

// Indexdocuments with unique IDs

val corpus =parsedData.zipWithIndex.map(_.swap).cache()

// Clusterthe documents into three topics using LDA

val ldaModel =newLDA().setK(3).run(corpus)

// Outputtopics. Each is a distribution over words (matching word count vectors)

println("Learnedtopics (as distributions over vocab of "+ ldaModel.vocabSize +"words):")

val topics = ldaModel.topicsMatrix

for(topic <-Range(0,3)){

  print("Topic"+ topic +":")

for(word <-Range(0, ldaModel.vocabSize)){ print(""+ topics(word, topic));}

 println()

}

// Saveand load model.

ldaModel.save(sc,"myLDAModel")

val sameModel=DistributedLDAModel.load(sc,"myLDAModel")

5.5 二分 K-means

二分k-means 演算法通常會比普通的K-means演算法要快,但兩種演算法生成的聚類結果卻不同。

二分k-means 是一種分層聚類,分層聚類在聚類分析中經常用於,當需要構建一個多層聚類時。分層聚類的策略一般有兩種:

1) Agglomerative 自下向上聚合:演算法最初從每個點獨立為一個聚類開始,每次遞迴地將多個聚類中點合併到一個聚類。

2) Divisive: 自上而下細分:演算法最初從一個聚類開始,每次遞迴地將一個聚類中點細分成多個聚類。

二分k-means 演算法是一種分治演算法,MLLib中需要配置以下引數

1) k : 葉子聚類(?)的期望類別數(預設4)。如果最終葉子聚類沒有細分,實際的聚類數是小於此值的。

2) maxIterations: k-means 每次迭代細分的最大聚類類別(預設20

3)minDivisibleClusterSize : 如果此值不小於1.0則是每個聚類最小點數,如果此值小於1.0 ,則是每個聚類最小點的佔比。

4) seed:  隨機種子(預設類別名字的hash值)

例子

importorg.apache.spark.mllib.clustering.BisectingKMeans

importorg.apache.spark.mllib.linalg.{Vector,Vectors}

// Loads andparses data

def parse(line:String):Vector=Vectors.dense(line.split(" ").map(_.toDouble))

val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clusteringthe data into 6 clusters by BisectingKMeans.

val bkm =newBisectingKMeans().setK(6)

val model = bkm.run(data)

// Show thecompute cost and the cluster centers

println(s"ComputeCost: ${model.computeCost(data)}")

model.clusterCenters.zipWithIndex.foreach {case(center, idx)=>

 println(s"Cluster Center ${idx}: ${center}")

}

詳細的例子程式碼見:examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala

5.6 流式k-means

當資料以流式產生,我們可能需要動態估計聚類,每當有資料到達就更新模型。Spark.mllib 支援流式k-means聚類,配置控制估計的衰減值。演算法使用通用最小批k-means更新聚類規則。對每批次資料,演算法把資料聚類到距離最近的聚類上。重新計算新聚類的中心,再更新其它所有聚類的引數

c_{t+1} = frac { c_t n_t \Alpha + x_t m_t } { n_t \Alpha+ m_t}         (1)

n_{t+1} = n_t + m_t                                            (2)

此處 c_t 是先前的聚類中心, n_t 是此聚類的點數 x_t 是當前批次產生新的聚類中心, m_t 是當前批次中新新增到此聚類的點數。衰減因子 \Alpha  可用來降低更早批次資料的權重, \Alpha  = 1 所有點會從頭到尾使用; \Alpha = 0 最新一次迭代的資料不參考。這個類似於指數權重的移動平均。

可以設定halfLife 引數控制衰減,通過此引數可以配置適當的衰減因子\Alpha , 對時間t 時的資料,經過halfLife 時間後原始資料會清除掉一半。這裡的時間單位可以是批次,或點數,相應的更新規則也需要調整。

下例給出流式資料的聚類估計

首先需要匯入類

importorg.apache.spark.mllib.linalg.Vectors

importorg.apache.spark.mllib.regression.LabeledPoint

importorg.apache.spark.mllib.clustering.StreamingKMeans

val trainingData= ssc.textFileStream("/training/data/dir").map(Vectors.parse)

val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

先建立一個隨機聚類的模型,然後確定聚類的類別數

val numDimensions=3

val numClusters=2

val model =newStreamingKMeans()

.setK(numClusters)

.setDecayFactor(1.0)

.setRandomCenters(numDimensions,0.0)

現在可以註冊訓練資料流和測試資料流,並啟動job , 當新資料點到達時,打印出預測新資料的聚類標籤

model.trainOn(trainingData)

model.predictOnValues(testData.map(lp =>(lp.label, lp.features))).print()

ssc.start()

ssc.awaitTermination()

隨著新增新的資料檔案,全部聚類中心點會更新。每個訓練點的格式是[x_1, x_2, x_3] 同樣每個測試資料的格式 ( y , [x_1,x_2, x_3]) 此處y 是類別標籤或類別標識. 只要有新檔案到達/training/data/dir 目錄下,新檔案中點實時預測結果打印出來,並且模型同時會更新聚類中心。持續的新資料使用聚類中心持續變化!