1. 程式人生 > >Mahout clustering Canopy+K-means 原始碼分析

Mahout clustering Canopy+K-means 原始碼分析

聚類分析

     聚類(Clustering)可以簡單的理解為將資料物件分為多個簇(Cluster),每個簇裡的所有資料物件具有一定的相似性,這樣一個簇可以看作一個整體,以此可以提高計算質量或減少計算量。而資料物件間相似性的衡量通常是通過座標系中空間距離的大小來判斷;常見的有 歐幾里得距離演算法、餘弦距離演算法、皮爾遜相關係數演算法等,Mahout對此都提供了實現,並且你可以在實現自己的聚類時,通過介面切換不同的距離演算法。

資料模型

     在Mahout的聚類分析的計算過程中,資料物件會轉化成向量(Vector)參與運算,在Mahout中的介面是org.apache.mahout.math.Vector  它裡面每個域用一個浮點數(double)表示,你可以通過繼承Mahout裡的基類如:AbstractVector來實現自己的向量模型,也可以直接使用一些它提供的已有實現如下:

    1. DenseVector,它的實現就是一個浮點數陣列,對向量裡所有域都進行儲存,適合用於儲存密集向量。

    2. RandomAccessSparseVector 基於浮點數的 HashMap 實現的,key 是整形 (int) 型別,value 是浮點數(double) 型別,它只儲存向量中不為空的值,並提供隨機訪問。

    3. SequentialAccessVector 實現為整形 (int) 型別和浮點數 (double) 型別的並行陣列,它也只儲存向量中不 為空的值,但只提供順序訪問。

聚類演算法K-means與Canopy

       首先介紹先K-means演算法:所有做聚類分析的資料物件,會被描述成n維空間中的一個點,用向量(Vector)表示;演算法開始會隨機選擇K個點,作為一個簇的中心,然後其餘的點會根據它與每個簇心的距離,被分配到最近簇中去;接著以迭代的方式,先重新計算每個簇的中心(通過其包含的所有向量的平均值),計算完成後對所有點屬於哪個簇進行重新劃分;一直如此迭代直到過程收斂;可證明迭代次數是有限的。

       雖然K-means簡單且高效,但它存在一定問題,首先K值(即簇的數量)是人為確定的,在對資料不瞭解的情況下,很難給出合理的K值;其次初始簇心的選擇是隨機的,若選擇到了較孤立的點,會對聚類的效果產生非常大的影響。因此通常會用Canopy演算法配合,進行初始化,確定簇數以及初始簇心。

       Canopy演算法首先會要求輸入兩個閥值 T1和T2,T1>T2;演算法有一個叢集這裡叫Canopy的集合(Set),當然一開始它是空的;然後會將讀取到的第一個點作為集合中的一個Canopy,接著讀取下一個點,若該點與集合中的每個Canopy計算距離,若這個距離小於T1,則這個點會分配給這個Canopy(一個點可以分配給多個Canopy),而當這個距離小於T2時這個點不能作為一個新的Canopy而放到集合中。也就是說當一個點只要與集合中任意一個Canopy的距離小於T2了,即表示它裡那個Canopy太近不能作為新的Canopy。若都沒有則生成一個新的Canopy放入集合中。以此迴圈,直到沒有點了。

       所以這裡用到的聚類分析演算法的思路是:首先通過Canopy演算法進行聚類,以確定簇數以及初始簇心的,接著通過K-means演算法進行迭代運算,收斂出最後的聚類結果。接下來我們看看實現。

程式碼示例

在 mahout-examples 中的 org.apache.mahout.clustering.syntheticcontrol.kmeans.Job類,對上述演算法提供了較完整的實現,它是一個Hadoop的job,我們從原始碼入手,看如何將實際的資料跑起來。下面是該類的核心邏輯程式碼:

/**

Java程式碼  收藏程式碼
  1.  * Run the kmeans clustering job on an input dataset using the given  
  2.  * distance measure, t1, t2 and iteration parameters. All output data will  
  3.  * be written to the output directory, which will be initially deleted if it  
  4.  * exists. The clustered points will reside in the path  
  5.  * <output>/clustered-points. By default, the job expects the a file  
  6.  * containing synthetic_control.data as obtained from  
  7.  * http://archive.ics.uci.  
  8.  * edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a  
  9.  * directory named "testdata", and writes output to a directory named  
  10.  * "output".  
  11.  *   
  12.  * @param conf  
  13.  *            the Configuration to use  
  14.  * @param input  
  15.  *            the String denoting the input directory path  
  16.  * @param output  
  17.  *            the String denoting the output directory path  
  18.  * @param measure  
  19.  *            the DistanceMeasure to use  
  20.  * @param t1  
  21.  *            the canopy T1 threshold  
  22.  * @param t2  
  23.  *            the canopy T2 threshold  
  24.  * @param convergenceDelta  
  25.  *            the double convergence criteria for iterations  
  26.  * @param maxIterations  
  27.  *            the int maximum number of iterations  
  28.  */  
  29. public static void run(Configuration conf, Path input, Path output,  
  30.         DistanceMeasure measure, double t1, double t2,  
  31.         double convergenceDelta, int maxIterations) throws Exception {  
  32.     System.out.println("run canopy output: " + output);  
  33.     Path directoryContainingConvertedInput = new Path(output,  
  34.             DIRECTORY_CONTAINING_CONVERTED_INPUT);  
  35.     log.info("Preparing Input");  
  36.     InputDriver.runJob(input, directoryContainingConvertedInput,  
  37.             "org.apache.mahout.math.RandomAccessSparseVector");  
  38.     log.info("Running Canopy to get initial clusters");  
  39.     CanopyDriver.run(conf, directoryContainingConvertedInput, output,  
  40.             measure, t1, t2, falsefalse);  
  41.     log.info("Running KMeans");  
  42.     System.out.println("kmeans cluster starting...");  
  43.     KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(  
  44.             output, Cluster.INITIAL_CLUSTERS_DIR+"-final"), output, measure,  
  45.             convergenceDelta, maxIterations, truefalse);  
  46.     // run ClusterDumper  
  47.     ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,  
  48.             output, maxIterations), new Path(output, "clusteredPoints"));  
  49.     clusterDumper.printClusters(null);  
  50. }  

       這個Job中呼叫了3個Map/Reduce 任務以及一個轉換,它們如下:

       1. 第8行: InputDriver.runJob ( ) ,它用於將原始資料檔案轉換成 Mahout進行計算所需格式的檔案 SequenceFile,它是Hadoop API提供的一種二進位制檔案支援。這種二進位制檔案直接將<key, value>對序列化到檔案中。        2. 第11行:CanopyDriver.run( ) , 即用Canopy演算法確定初始簇的個數和簇的中心。        3.  第14行:KMeansDriver.run( ) , 這顯然是K-means演算法進行聚類。        4. 第18~20行,ClusterDumper類將聚類的結果裝換並寫出來,若你瞭解了原始碼,你也可以自己實現這個類的功能,因為聚類後的資料儲存格式,往往跟自身業務有關。           這裡細講下第一個Map/Reduce: InputDriver.runJob ( )因為我們需要了解,初始資料的格式,其他的任務CanopyDriver.run( )和KMeansDriver.run( )任務就不細講了,主要就是Canopy和K-means演算法,原理已經介紹了,實現也不難,需要你瞭解hadoop程式設計。  InputDriver.runJob( )實現也非常簡單,它只有Map,其程式碼如下: Java程式碼  收藏程式碼
  1. @Override  
  2. protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {  
  3.   String[] numbers = SPACE.split(values.toString());  
  4.   // sometimes there are multiple separator spaces  
  5.   Collection<Double> doubles = Lists.newArrayList();  
  6.   for (String value : numbers) {  
  7.     if (!value.isEmpty()) {  
  8.       doubles.add(Double.valueOf(value));  
  9.     }  
  10.   }  
  11.   // ignore empty lines in data file  
  12.   if (!doubles.isEmpty()) {  
  13.     try {  
  14.       Vector result = (Vector) constructor.newInstance(doubles.size());  
  15.       int index = 0;  
  16.       for (Double d : doubles) {  
  17.         result.set(index++, d);  
  18.       }  
  19.       VectorWritable vectorWritable = new VectorWritable(result);  
  20.       context.write(new Text(String.valueOf(index)), vectorWritable);  
  21.     } catch (InstantiationException e) {  
  22.       throw new IllegalStateException(e);  
  23.     } catch (IllegalAccessException e) {  
  24.       throw new IllegalStateException(e);  
  25.     } catch (InvocationTargetException e) {  
  26.       throw new IllegalStateException(e);  
  27.     }  
  28.   }  
  29. }  
  由程式碼可以看出,它將你初始資料檔案的每一行用空格切開成個 String[] numbers ,然後再將 numbers中的每個String轉換成Double型別,並以此生成一個向量 Vector ,然後通過 SequenceFileOutputFormat的方式輸出成SequenceFile,以作下一步計算的輸入。由此我們可以瞭解到我們的初始資料的格式需要 以一行為一個單位,用空格分隔,每一列為一個Double數即可(當然你也可以反過來修改例子中的實現)。 參考資料: https://cwiki.apache.org/confluence/display/MAHOUT/K-Means+Clustering https://cwiki.apache.org/confluence/display/MAHOUT/Canopy+Clustering http://www.ibm.com/developerworks/cn/java/j-mahout-scaling/ http://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy3/ 《Mahout in action》 https://cwiki.apache.org/MAHOUT/cluster-dumper.html