Spark 實戰,第 4 部分: 使用 Spark MLlib 做 K-means 聚類分析
引言
提起機器學習 (Machine Learning),相信很多計算機從業者都會對這個技術方向感到興奮。然而學習並使用機器學習演算法來處理資料卻是一項複雜的工作,需要充足的知識儲備,如概率論,數理統計,數值逼近,最優化理論等。機器學習旨在使計算機具有人類一樣的學習能力和模仿能力,這也是實現人工智慧的核心思想和方法。傳統的機器學習演算法,由於技術和單機儲存的限制,只能在少量資料上使用,隨著 HDFS(Hadoop Distributed File System) 等分散式檔案系統出現,儲存海量資料已經成為可能。然而由於 MapReduce 自身的限制,使得使用 MapReduce 來實現分散式機器學習演算法非常耗時和消耗磁碟容量。因為通常情況下機器學習演算法引數學習的過程都是迭代計算的,即本次計算的結果要作為下一次迭代的輸入,這個過程中,如果使用 MapReduce,我們只能把中間結果儲存磁碟,然後在下一次計算的時候從新讀取,這對於迭代 頻發的演算法顯然是致命的效能瓶頸。Spark 立足於記憶體計算,天然的適應於迭代式計算,相信對於這點,讀者通過前面幾篇文章已經有了較為深入的瞭解。然而即便這樣,對於普通開發者來說,實現一個分散式機器學習演算法仍然是一件極具挑戰的事情。MLlib 正是為了讓基於海量資料的機器學習變得更加簡單,它提供了常用機器學習演算法的分散式實現,開發者只需要有 Spark 基礎並且瞭解機器學習演算法的原理,以及方法相關引數的含義,就可以輕鬆的通過呼叫相應的 API 來實現基於海量資料的機器學習過程。當然,原始資料 ETL,特徵指標提取,調節引數並優化學習過程,這依然需要有足夠的行業知識和資料敏感度,這往往也是經驗的體現。本文的重點在於向讀者介紹如何使用 MLlib 機器學習庫提供的 K-means 演算法做聚類分析,這是一個有意義的過程,相信會對讀者特別是初學者有啟發意義。
Spark 機器學習庫簡介
Spark 機器學習庫提供了常用機器學習演算法的實現,包括聚類,分類,迴歸,協同過濾,維度縮減等。使用 Spark 機器學習庫來做機器學習工作,可以說是非常的簡單,通常只需要在對原始資料進行處理後,然後直接呼叫相應的 API 就可以實現。但是要想選擇合適的演算法,高效準確地對資料進行分析,您可能還需要深入瞭解下演算法原理,以及相應 Spark MLlib API 實現的引數的意義。
需要提及的是,Spark 機器學習庫從 1.2 版本以後被分為兩個包,分別是:
- spark.mllib
Spark MLlib 歷史比較長了,1.0 以前的版本中已經包含了,提供的演算法實現都是基於原始的 RDD,從學習角度上來講,其實比較容易上手。如果您已經有機器學習方面的經驗,那麼您只需要熟悉下 MLlib 的 API 就可以開始資料分析工作了。想要基於這個包提供的工具構建完整並且複雜的機器學習流水線是比較困難的。
- spark.ml
Spark ML Pipeline 從 Spark1.2 版本開始,目前已經從 Alpha 階段畢業,成為可用並且較為穩定的新的機器學習庫。ML Pipeline 彌補了原始 MLlib 庫的不足,向用戶提供了一個基於 DataFrame 的機器學習工作流式 API 套件,使用 ML Pipeline API,我們可以很方便的把資料處理,特徵轉換,正則化,以及多個機器學習演算法聯合起來,構建一個單一完整的機器學習流水線。顯然,這種新的方式給我們提供了更靈活的方法,而且這也更符合機器學習過程的特點。
從官方文件來看,Spark ML Pipeline 雖然是被推薦的機器學習方式,但是並不會在短期內替代原始的 MLlib 庫,因為 MLlib 已經包含了豐富穩定的演算法實現,並且部分 ML Pipeline 實現基於 MLlib。而且就筆者看來,並不是所有的機器學習過程都需要被構建成一個流水線,有時候原始資料格式整齊且完整,而且使用單一的演算法就能實現目標,我們就沒有必要把事情複雜化,採用最簡單且容易理解的方式才是正確的選擇。
本文基於 Spark 1.5,向讀者展示使用 MLlib API 進行聚類分析的過程。讀者將會發現,使用 MLlib API 開發機器學習應用方式是比較簡單的,相信本文可以使讀者建立起信心並掌握基本方法,以便在後續的學習和工作中事半功倍。
K-means 聚類演算法原理
聚類分析是一個無監督學習 (Unsupervised Learning) 過程, 一般是用來對資料物件按照其特徵屬性進行分組,經常被應用在客戶分群,欺詐檢測,影象分析等領域。K-means 應該是最有名並且最經常使用的聚類演算法了,其原理比較容易理解,並且聚類效果良好,有著廣泛的使用。
和諸多機器學習演算法一樣,K-means 演算法也是一個迭代式的演算法,其主要步驟如下:
- 第一步,選擇 K 個點作為初始聚類中心。
- 第二步,計算其餘所有點到聚類中心的距離,並把每個點劃分到離它最近的聚類中心所在的聚類中去。在這裡,衡量距離一般有多個函式可以選擇,最常用的是歐幾里得距離 (Euclidean Distance), 也叫歐式距離。公式如下:
其中 C 代表中心點,X 代表任意一個非中心點。
- 第三步,重新計算每個聚類中所有點的平均值,並將其作為新的聚類中心點。
- 最後,重複 (二),(三) 步的過程,直至聚類中心不再發生改變,或者演算法達到預定的迭代次數,又或聚類中心的改變小於預先設定的閥值。
在實際應用中,K-means 演算法有兩個不得不面對並且克服的問題。
- 聚類個數 K 的選擇。K 的選擇是一個比較有學問和講究的步驟,我們會在後文專門描述如何使用 Spark 提供的工具選擇 K。
- 初始聚類中心點的選擇。選擇不同的聚類中心可能導致聚類結果的差異。
Spark MLlib K-means 演算法的實現在初始聚類點的選擇上,借鑑了一個叫 K-means||的類 K-means++ 實現。K-means++ 演算法在初始點選擇上遵循一個基本原則: 初始聚類中心點相互之間的距離應該儘可能的遠。基本步驟如下:
- 第一步,從資料集 X 中隨機選擇一個點作為第一個初始點。
- 第二步,計算資料集中所有點與最新選擇的中心點的距離 D(x)。
- 第三步,選擇下一個中心點,使得最大。
- 第四部,重複 (二),(三) 步過程,直到 K 個初始點選擇完成。
MLlib 的 K-means 實現
Spark MLlib 中 K-means 演算法的實現類 (KMeans.scala) 具有以下引數,具體如下。
圖 1. MLlib K-means 演算法實現類預覽
通過下面預設建構函式,我們可以看到這些可調引數具有以下初始值。
圖 2. MLlib K-means 演算法引數初始值
引數的含義解釋如下:
- k 表示期望的聚類的個數。
- maxInterations 表示方法單次執行最大的迭代次數。
- runs 表示演算法被執行的次數。K-means 演算法不保證能返回全域性最優的聚類結果,所以在目標資料集上多次跑 K-means 演算法,有助於返回最佳聚類結果。
- initializationMode 表示初始聚類中心點的選擇方式, 目前支援隨機選擇或者 K-means||方式。預設是 K-means||。
- initializationSteps表示 K-means||方法中的部數。
- epsilon 表示 K-means 演算法迭代收斂的閥值。
- seed 表示叢集初始化時的隨機種子。
通常應用時,我們都會先呼叫 KMeans.train 方法對資料集進行聚類訓練,這個方法會返回 KMeansModel 類例項,然後我們也可以使用 KMeansModel.predict 方法對新的資料點進行所屬聚類的預測,這是非常實用的功能。
KMeans.train 方法有很多過載方法,這裡我們選擇引數最全的一個展示。
圖 3. KMeans.train 方法預覽
KMeansModel.predict 方法接受不同的引數,可以是向量,或者 RDD,返回是入參所屬的聚類的索引號。
圖 4. KMeansModel.predict 方法預覽
聚類測試資料集簡介
在本文中,我們所用到目標資料集是來自 UCI Machine Learning Repository 的 Wholesale customer Data Set。UCI 是一個關於機器學習測試資料的下載中心站點,裡面包含了適用於做聚類,分群,迴歸等各種機器學習問題的資料集。
Wholesale customer Data Set 是引用某批發經銷商的客戶在各種類別產品上的年消費數。為了方便處理,本文把原始的 CSV 格式轉化成了兩個文字檔案,分別是訓練用資料和測試用資料。
圖 5. 客戶消費資料格式預覽
讀者可以從標題清楚的看到每一列代表的含義,當然讀者也可以到 UCI 網站上去找到關於該資料集的更多資訊。雖然 UCI 的資料可以自由獲取並使用,但是我們還是在此宣告,該資料集的版權屬 UCI 以及其原始提供組織或公司所有。
案例分析和編碼實現
本例中,我們將根據目標客戶的消費資料,將每一列視為一個特徵指標,對資料集進行聚類分析。程式碼實現步驟如下
清單 1. 聚類分析實現類原始碼
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors object KMeansClustering { def main (args: Array[String]) { if (args.length < 5) { println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes") sys.exit(1) } val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering") val sc = new SparkContext(conf) /** *Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen * 2 3 12669 9656 7561 214 2674 1338 * 2 3 7057 9810 9568 1762 3293 1776 * 2 3 6353 8808 7684 2405 3516 7844 */ val rawTrainingData = sc.textFile(args(0)) val parsedTrainingData = rawTrainingData.filter(!isColumnNameLine(_)).map(line => { Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble)) }).cache() // Cluster the data into two classes using KMeans val numClusters = args(2).toInt val numIterations = args(3).toInt val runTimes = args(4).toInt var clusterIndex:Int = 0 val clusters:KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes) println("Cluster Number:" + clusters.clusterCenters.length) println("Cluster Centers Information Overview:") clusters.clusterCenters.foreach( x => { println("Center Point of Cluster " + clusterIndex + ":") println(x) clusterIndex += 1 }) //begin to check which cluster each test data belongs to based on the clustering result val rawTestData = sc.textFile(args(1)) val parsedTestData = rawTestData.map(line => { Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble)) }) parsedTestData.collect().foreach(testDataLine => { val predictedClusterIndex: Int = clusters.predict(testDataLine) println("The data " + testDataLine.toString + " belongs to cluster " + predictedClusterIndex) }) println("Spark MLlib K-means clustering test finished.") } private def isColumnNameLine(line:String):Boolean = { if (line != null && line.contains("Channel")) true else false }
該示例程式接受五個入參,分別是
- 訓練資料集檔案路徑
- 測試資料集檔案路徑
- 聚類的個數
- K-means 演算法的迭代次數
- K-means 演算法 run 的次數
執行示例程式
和本系列其他文章一樣,我們依然選擇使用 HDFS 儲存資料檔案。執行程式之前,我們需要將前文提到的訓練和測試資料集上傳到 HDFS。
圖 6. 測試資料的 HDFS 目錄
清單 2. 示例程式執行命令
./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \ --master spark://<spark_master_node_ip>:7077 \ --num-executors 6 \ --driver-memory 3g \ --executor-memory 512m \ --total-executor-cores 6 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \ 8 30 3
圖 7. K-means 聚類示例程式執行結果
如何選擇 K
前面提到 K 的選擇是 K-means 演算法的關鍵,Spark MLlib 在 KMeansModel 類裡提供了 computeCost 方法,該方法通過計算所有資料點到其最近的中心點的平方和來評估聚類的效果。一般來說,同樣的迭代次數和演算法跑的次數,這個值越小代表聚類的效果越好。但是在實際情況下,我們還要考慮到聚類結果的可解釋性,不能一味的選擇使 computeCost 結果值最小的那個 K。
清單 3. K 選擇示例程式碼片段
val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) ks.foreach(cluster => { val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1) val ssd = model.computeCost(parsedTrainingData) println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd) })
圖 8. K 選擇示例程式執行結果
從上圖的執行結果可以看到,當 K=9 時,cost 值有波動,但是後面又逐漸減小了,所以我們選擇 8 這個臨界點作為 K 的個數。當然可以多跑幾次,找一個穩定的 K 值。理論上 K 的值越大,聚類的 cost 越小,極限情況下,每個點都是一個聚類,這時候 cost 是 0,但是顯然這不是一個具有實際意義的聚類結果。
結束語
通過本文的學習,讀者已經初步瞭解了 Spark 的機器學習庫,並且掌握了 K-means 演算法的基本原理,以及如何基於 Spark MLlib 構建自己的機器學習應用。機器學習應用的構建是一個複雜的過程,我們通常還需要對資料進行預處理,然後特徵提取以及資料清洗等,然後才能利用演算法來分析資料。Spark MLlib 區別於傳統的機器學習工具,不僅是因為它提供了簡單易用的 API,更重要的是 Spark 在處理大資料上的高效以及在迭代計算時的獨特優勢。雖然本文所採用的測試資料集很小,並不能反映大資料的應用場景,但是對於掌握基本原理已經足夠,並且如果讀者擁有更大的資料集就可以輕鬆的將本文的測試程式推廣到大資料聚類的場景下,因為 Spark MLlib 的程式設計模型都是一致的,無非是資料讀取和處理的方式略有不同。希望讀者可以在本文中找到自己感興趣的知識,相信這對讀者今後深入學習是有幫助的。另外,讀者在閱讀本文的過程中,如果遇到問題或者發現不足之處,請不吝賜教,在文末留言,共同交流學習,謝謝。