1. 程式人生 > >mahout最新版+hadoop2.4.1執行kmeans分散式演算法

mahout最新版+hadoop2.4.1執行kmeans分散式演算法

1、理論須知

用過mahout和hadoop整合的朋友們,都經過很多折騰,mahout這個東西是包括了好多的機器學習演算法,確實我們呼叫起來相當方便,畢竟我們不需要為了使用一個演算法重新編碼。但是mahout0.10之前都只能支援到hadoop1.x版本,所以大部分使用hadoop2.x的朋友,很苦惱,雖然網上各種辦法,大都折騰的很,浪費時間且錯誤百出,鑑於此,將本人整合的成功案例分享給大家,少走彎路,用好點贊!

廢話不多說,先知道kmeans演算法的基本步驟,以知其所以然:

Hadoop分散式環境下實現K-Means聚類演算法的虛擬碼如下:

輸入:引數0--儲存樣本資料的文字檔案inputfile;

            引數1--儲存樣本資料的SequenceFile檔案inputPath;

            引數2--儲存質心資料的SequenceFile檔案centerPath;

            引數3--儲存聚類結果檔案(SequenceFile檔案)所處的路徑clusterPath;

            引數4--類的數量k;

輸出:k個類

Begin

           讀取inputPath,從中選取前k個點作為初始質心,將質心資料寫入centerPath;

           While 聚類終止條件不滿足

                    在Mapper階段,讀取inputPath,對於key所對應的點,遍歷所有的質心,選擇最近的質心,將該質心的編號作為鍵,

                    該點的編號作為值傳遞給Reducer;

                    在Reducer階段,將Mapper階段傳遞過來的值根據鍵歸併輸出,結果寫入clusterPath;

                    讀取clusterPath,重新計算質心,將結果寫入centerPath;

           EndWhile

End

判斷聚類效果好壞的常見指標是下述的準則函式值:

\

2.Eclipse中實現mahout分散式聚類演算法

 第1步:關鍵一步啊。

解決方法就是下載最新的原始碼,並且編譯成Hadoop 2.x相容模式,下面是具體編譯方法:



1. 使用git命令克隆Mahout最新的原始碼到本地,目前最新的版本是1.0-SNAPSHOT。

git clone https://github.com/apache/mahout.git

2. Mahout原始碼下載完成後,直接使用mvn命令編譯原始碼,注意要加上hadoop2.version=2.4.1引數讓編譯後的Mahout可以相容Hadoop 2.4.1版本。這裡版本可以填寫任何的2.x版本。

mvn -Dhadoop2.version=2.4.1 -DskipTests clean install

這一步在cmd控制檯執行即可,可能需要較長時間,打好的包會進入maven本地倉庫。
 

 第2步:新建一個maven專案,這個不多說。pom.xml中需要引入最新的mahout版本

  <dependency>
        	<groupId>org.apache.mahout</groupId>
        	<artifactId>mahout-math</artifactId>
        	<version>0.13.2-SNAPSHOT</version>
        </dependency>
        <dependency>
        	<groupId>org.apache.mahout</groupId>
        	<artifactId>mahout-mr</artifactId>
        	<version>0.13.2-SNAPSHOT</version>
        </dependency>
        <dependency>
        	<groupId>org.apache.mahout</groupId>
        	<artifactId>mahout-hdfs</artifactId>
        	<version>0.13.2-SNAPSHOT</version>
        </dependency>
        <dependency>
        	<groupId>org.apache.mahout</groupId>
        	<artifactId>mahout-h2o_2.10</artifactId>
        	<version>0.13.2-SNAPSHOT</version>
        </dependency>
        <dependency>
        	<groupId>commons-httpclient</groupId>
        	<artifactId>commons-httpclient</artifactId>
        	<version>3.0</version>
         
        </dependency>
這步之後,所有jar包已經匯入專案中。ok準備工作完成。

第3步:拷貝hadoop伺服器的core-site.xml,hdfs-site.xml及mapred-site.xml檔案到專案的hadoop目錄下。大致內容如下:

core-site.xml:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
   <property>
            <name>fs.defaultFS</name>
            <value>hdfs://192.168.10.127:9000</value>
        </property>
        <!-- 指定hadoop執行時產生檔案的儲存目錄 -->
        <property>
            <name>hadoop.tmp.dir</name>
            <value>/home/hadoop/hadoop/tmp</value>
           </property>
</configuration>
hdfs-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
   
  <property>
            <name>dfs.replication</name>
            <value>1</value>
    </property>
  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>
</configuration>
注意紅色部分,這樣設定訪問hdfs檔案不會提示沒有許可權。

mapred-site.xml:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>hdfs://192.168.10.127:9001</value>
  </property>
</configuration>

第4步:編寫呼叫kmeans演算法java程式碼,具體如下(一些重要地方做了註釋),
package org.conan.mymahout.cluster08;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.clustering.kmeans.Kluster;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class KMeansHadoop13 {
	 private static final String HDFS = "hdfs://192.168.10.127:9000";
	public static final double[][] points = {
		{1, 1}, {2, 1}, {1, 2},
		{2, 2}, {3, 3}, {8, 8},
		{9, 8}, {8, 9}, {9, 9}};

		public static void writePointsToFile(List<Vector> points,String fileName,FileSystem fs,	Configuration conf) throws IOException {
			Path path = new Path(fileName);
			SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,path, LongWritable.class, VectorWritable.class);
			long recNum = 0;
			VectorWritable vec = new VectorWritable();
			for (Vector point : points) {
			vec.set(point);
			writer.append(new LongWritable(recNum++), vec);
			}
			writer.close();
		}

		public static List<Vector> getPoints(double[][] raw) {
			List<Vector> points = new ArrayList<Vector>();
			for (int i = 0; i < raw.length; i++) {
			double[] fr = raw[i];
			Vector vec = new RandomAccessSparseVector(fr.length);
			vec.assign(fr);
			points.add(vec);
			}
			return points;
		}

		public static void main(String args[]) throws Exception {
			System.setProperty("HADOOP_USER_NAME", "hadoop");
		int k = 2;

		List<Vector> vectors = getPoints(points);//根據初始點座標陣列構建成為聚類演算法能夠處理的vector的list集合格式

		File testData = new File("clustering/testdata");//在hdfs建立存放資料集的目錄
		if (!testData.exists()) {
		testData.mkdir();
		}
		testData = new File("clustering/testdata/points");
		if (!testData.exists()) {
		testData.mkdir();
		}

		Configuration conf = new Configuration();
		 conf.addResource("classpath:/hadoop/core-site.xml");
	        conf.addResource("classpath:/hadoop/hdfs-site.xml");
	        conf.addResource("classpath:/hadoop/mapred-site.xml");
	        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.127:9000/"), conf, "hadoop");
		writePointsToFile(vectors, "clustering/testdata/points/file1", fs, conf);//將點的集合寫到hdfs上的資料檔案中名稱為file1

		Path path = new Path("clustering/testdata/clusters/part-00000");//序列化檔案的路徑
		SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
		
		// 初始化中心點k=2
		 
		for (int i = 0; i < k; i++) {
		Vector vec = vectors.get(i);
		//cluster--{"r":[],"c":[1.0,1.0],"n":0,"identifier":"CL-0"}
		Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
		writer.append(new Text(cluster.getIdentifier()), cluster);
		}
		writer.close();
		//執行聚類演算法
		KMeansDriver.run(conf,
		new Path(HDFS+"/user/hadoop/clustering/testdata/points"),//原始輸入
		new Path(HDFS+"/user/hadoop/clustering/testdata/clusters/part-00000"),//初始中心點集合
		new Path(HDFS+"/user/hadoop/clustering/output"),//聚類結果
		0.001,
		3,
		true,
		0,
		false);
		//讀取聚類結果
		//@SuppressWarnings("deprecation")
		SequenceFile.Reader reader = new SequenceFile.Reader(fs,
		new Path("clustering/output/" +Cluster.CLUSTERS_DIR+"3"+ Cluster.FINAL_ITERATION_SUFFIX + "/part-r-00000"), conf);
		IntWritable key = new IntWritable();
//		WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();針對本地檔案
		ClusterWritable value=new ClusterWritable();//針對叢集,也就是hadoop中的檔案
		while (reader.next(key, value)) {
		System.out.println(value.getValue().toString() + " belongs to cluster " + key.toString());
		}
		reader.close();
		
		 
	      
		}

}

說明:該聚類演算法迭代了3次,演算法採用的資料是九個二維平面的點,陣列儲存了他們的座標。執行速度非常快,如果採用樣本檔案資料,迭代次數太多會導致聚類速度太慢,十幾分鍾都很正常。最後用ClusterWritable來展示結果,新版本mahout0.13,所以沒有之前版本的WeightedPropertyVectorWritable 類來展示結果。

最後聚類結果如下:

{"r":[0.748,0.748],"c":[1.8,1.8],"n":5,"identifier":"VL-0"} belongs to cluster 0
{"r":[0.5,0.5],"c":[8.5,8.5],"n":4,"identifier":"VL-1"} belongs to cluster 1


4、補充
如果以上程式執行過程中,報org.apache.hadoop.io.nativeio.NativeIO$Windows.access的錯誤,千萬不要用網上的一些做法,搞個什麼hadoop.dll拷貝到c盤系統目錄等等,太折騰,而且如果dll版本不對應,作業系統位數不對都會報錯,給大家一個直接解決問題的辦法,你可以看到錯誤報到了NativeIO的570行,window.access返回false,所以沒有許可權,大家可以直接修改為return true,即可windows和hadoop的linux機器傳輸資料。辦法就是到hadoop2.4.1解壓縮包中找到NativeIO原始碼,拷貝到你自己專案中,當然要遵循原來的包結構,這樣就可以正常執行。

ok,收工,希望各位從中獲益,一切順利且happy!