mahout最新版+hadoop2.4.1執行kmeans分散式演算法
1、理論須知
用過mahout和hadoop整合的朋友們,都經過很多折騰,mahout這個東西是包括了好多的機器學習演算法,確實我們呼叫起來相當方便,畢竟我們不需要為了使用一個演算法重新編碼。但是mahout0.10之前都只能支援到hadoop1.x版本,所以大部分使用hadoop2.x的朋友,很苦惱,雖然網上各種辦法,大都折騰的很,浪費時間且錯誤百出,鑑於此,將本人整合的成功案例分享給大家,少走彎路,用好點贊!
廢話不多說,先知道kmeans演算法的基本步驟,以知其所以然:
在Hadoop分散式環境下實現K-Means聚類演算法的虛擬碼如下:
輸入:引數0--儲存樣本資料的文字檔案inputfile;
引數1--儲存樣本資料的SequenceFile檔案inputPath;
引數2--儲存質心資料的SequenceFile檔案centerPath;
引數3--儲存聚類結果檔案(SequenceFile檔案)所處的路徑clusterPath;
引數4--類的數量k;
輸出:k個類
Begin
讀取inputPath,從中選取前k個點作為初始質心,將質心資料寫入centerPath;
While 聚類終止條件不滿足
在Mapper階段,讀取inputPath,對於key所對應的點,遍歷所有的質心,選擇最近的質心,將該質心的編號作為鍵,
該點的編號作為值傳遞給Reducer;
在Reducer階段,將Mapper階段傳遞過來的值根據鍵歸併輸出,結果寫入clusterPath;
讀取clusterPath,重新計算質心,將結果寫入centerPath;
EndWhile
End
判斷聚類效果好壞的常見指標是下述的準則函式值:
2.Eclipse中實現mahout分散式聚類演算法
第1步:關鍵一步啊。
解決方法就是下載最新的原始碼,並且編譯成Hadoop 2.x相容模式,下面是具體編譯方法:
1. 使用git命令克隆Mahout最新的原始碼到本地,目前最新的版本是1.0-SNAPSHOT。
git clone https://github.com/apache/mahout.git
2. Mahout原始碼下載完成後,直接使用mvn命令編譯原始碼,注意要加上hadoop2.version=2.4.1引數讓編譯後的Mahout可以相容Hadoop
2.4.1版本。這裡版本可以填寫任何的2.x版本。
mvn -Dhadoop2.version=2.4.1 -DskipTests clean install
這一步在cmd控制檯執行即可,可能需要較長時間,打好的包會進入maven本地倉庫。
第2步:新建一個maven專案,這個不多說。pom.xml中需要引入最新的mahout版本
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.13.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-mr</artifactId>
<version>0.13.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-hdfs</artifactId>
<version>0.13.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-h2o_2.10</artifactId>
<version>0.13.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0</version>
</dependency>
這步之後,所有jar包已經匯入專案中。ok準備工作完成。
第3步:拷貝hadoop伺服器的core-site.xml,hdfs-site.xml及mapred-site.xml檔案到專案的hadoop目錄下。大致內容如下:
core-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.10.127:9000</value>
</property>
<!-- 指定hadoop執行時產生檔案的儲存目錄 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/hadoop/tmp</value>
</property>
</configuration>
hdfs-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
注意紅色部分,這樣設定訪問hdfs檔案不會提示沒有許可權。
mapred-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://192.168.10.127:9001</value>
</property>
</configuration>
第4步:編寫呼叫kmeans演算法java程式碼,具體如下(一些重要地方做了註釋),
package org.conan.mymahout.cluster08;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.clustering.kmeans.Kluster;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
public class KMeansHadoop13 {
private static final String HDFS = "hdfs://192.168.10.127:9000";
public static final double[][] points = {
{1, 1}, {2, 1}, {1, 2},
{2, 2}, {3, 3}, {8, 8},
{9, 8}, {8, 9}, {9, 9}};
public static void writePointsToFile(List<Vector> points,String fileName,FileSystem fs, Configuration conf) throws IOException {
Path path = new Path(fileName);
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,path, LongWritable.class, VectorWritable.class);
long recNum = 0;
VectorWritable vec = new VectorWritable();
for (Vector point : points) {
vec.set(point);
writer.append(new LongWritable(recNum++), vec);
}
writer.close();
}
public static List<Vector> getPoints(double[][] raw) {
List<Vector> points = new ArrayList<Vector>();
for (int i = 0; i < raw.length; i++) {
double[] fr = raw[i];
Vector vec = new RandomAccessSparseVector(fr.length);
vec.assign(fr);
points.add(vec);
}
return points;
}
public static void main(String args[]) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
int k = 2;
List<Vector> vectors = getPoints(points);//根據初始點座標陣列構建成為聚類演算法能夠處理的vector的list集合格式
File testData = new File("clustering/testdata");//在hdfs建立存放資料集的目錄
if (!testData.exists()) {
testData.mkdir();
}
testData = new File("clustering/testdata/points");
if (!testData.exists()) {
testData.mkdir();
}
Configuration conf = new Configuration();
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.127:9000/"), conf, "hadoop");
writePointsToFile(vectors, "clustering/testdata/points/file1", fs, conf);//將點的集合寫到hdfs上的資料檔案中名稱為file1
Path path = new Path("clustering/testdata/clusters/part-00000");//序列化檔案的路徑
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
// 初始化中心點k=2
for (int i = 0; i < k; i++) {
Vector vec = vectors.get(i);
//cluster--{"r":[],"c":[1.0,1.0],"n":0,"identifier":"CL-0"}
Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
writer.append(new Text(cluster.getIdentifier()), cluster);
}
writer.close();
//執行聚類演算法
KMeansDriver.run(conf,
new Path(HDFS+"/user/hadoop/clustering/testdata/points"),//原始輸入
new Path(HDFS+"/user/hadoop/clustering/testdata/clusters/part-00000"),//初始中心點集合
new Path(HDFS+"/user/hadoop/clustering/output"),//聚類結果
0.001,
3,
true,
0,
false);
//讀取聚類結果
//@SuppressWarnings("deprecation")
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path("clustering/output/" +Cluster.CLUSTERS_DIR+"3"+ Cluster.FINAL_ITERATION_SUFFIX + "/part-r-00000"), conf);
IntWritable key = new IntWritable();
// WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();針對本地檔案
ClusterWritable value=new ClusterWritable();//針對叢集,也就是hadoop中的檔案
while (reader.next(key, value)) {
System.out.println(value.getValue().toString() + " belongs to cluster " + key.toString());
}
reader.close();
}
}
說明:該聚類演算法迭代了3次,演算法採用的資料是九個二維平面的點,陣列儲存了他們的座標。執行速度非常快,如果採用樣本檔案資料,迭代次數太多會導致聚類速度太慢,十幾分鍾都很正常。最後用ClusterWritable來展示結果,新版本mahout0.13,所以沒有之前版本的WeightedPropertyVectorWritable 類來展示結果。
最後聚類結果如下:
{"r":[0.748,0.748],"c":[1.8,1.8],"n":5,"identifier":"VL-0"} belongs to cluster 0
{"r":[0.5,0.5],"c":[8.5,8.5],"n":4,"identifier":"VL-1"} belongs to cluster 1
4、補充
如果以上程式執行過程中,報org.apache.hadoop.io.nativeio.NativeIO$Windows.access的錯誤,千萬不要用網上的一些做法,搞個什麼hadoop.dll拷貝到c盤系統目錄等等,太折騰,而且如果dll版本不對應,作業系統位數不對都會報錯,給大家一個直接解決問題的辦法,你可以看到錯誤報到了NativeIO的570行,window.access返回false,所以沒有許可權,大家可以直接修改為return
true,即可windows和hadoop的linux機器傳輸資料。辦法就是到hadoop2.4.1解壓縮包中找到NativeIO原始碼,拷貝到你自己專案中,當然要遵循原來的包結構,這樣就可以正常執行。
ok,收工,希望各位從中獲益,一切順利且happy!