hadoop下實現kmeans演算法——一個mapreduce的實現方法
寫mapreduce程式實現kmeans演算法,我們的思路可能是這樣的
1. 用一個全域性變數存放上一次迭代後的質心
2. map裡,計算每個質心與樣本之間的距離,得到與樣本距離最短的質心,以這個質心作為key,樣本作為value,輸出
3. reduce裡,輸入的key是質心,value是其他的樣本,這時重新計算聚類中心,將聚類中心put到一個全部變數t中。
4. 在main裡比較前一次的質心和本次的質心是否發生變化,如果變化,則繼續迭代,否則退出。
本文的思路基本上是按照上面的步驟來做的,只不過有幾個問題需要解決
1. hadoop是不存在自定義的全域性變數的,所以上面定義一個全域性變數存放質心的想法是實現不了的,所以一個替代的思路是將質心存放在檔案中
2. 存放質心的檔案在什麼地方讀取,如果在map中讀取,那麼可以肯定我們是不能用一個mapreduce實現一次迭代,所以我們選擇在main函式裡讀取質心,然後將質心set到configuration中,configuration在map和reduce都是可讀
3. 如何比較質心是否發生變化,是在main裡比較麼,讀取本次質心和上一次質心的檔案然後進行比較,這種方法是可以實現的,但是顯得不夠高富帥,這個時候我們用到了自定義的counter,counter是全域性變數,在map和reduce中可讀可寫,在上面的思路中,我們看到reduce是有上次迭代的質心和剛剛計算出來的質心的,所以直接在reduce中進行比較就完全可以,如果沒發生變化,counter加1。只要在main裡比較獲取counter的值就行了。
梳理一下,具體的步驟如下
1. main函式讀取質心檔案
2. 將質心的字串放到configuration中
3. 在mapper類重寫setup方法,獲取到configuration的質心內容,解析成二維陣列的形式,代表質心
4. mapper類中的map方法讀取樣本檔案,跟所有的質心比較,得出每個樣本跟哪個質心最近,然後輸出<質心,樣本>
5. reducer類中重新計算質心,如果重新計算出來的質心跟進來時的質心一致,那麼自定義的counter加1
6. main中獲取counter的值,看是否等於質心,如果不相等,那麼繼續迭代,否在退出
具體的實現如下
1. pom依賴
這個要跟叢集的一致,因為如果不一致在計算其他問題的時候沒有問題,但是在使用counter的時候會出現問題
java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected
原因是:其實從2.0開始,org.apache.hadoop.mapreduce.Counter從1.0版本的class改為interface,可以看一下你匯入的這個類是class還是interface,如果是class那麼就是導包匯入的不對,需要修改
2. 樣本
例項樣本如下
1,1
2,2
3,3
-3,-3
-4,-4
-5,-5
3. 質心
這個質心是從樣本中隨機找的
1,1
2,2
4. 程式碼實現
首先定義一個Center類,這個類主要存放了質心的個數k,還有兩個從hdfs上讀取質心檔案的方法,一個用來讀取初始的質心,這個實在檔案中,還有一個是用來讀取每次迭代後的質心資料夾,這個是在資料夾中的,程式碼如下
Center類
public class Center {
protected static int k = 2; //質心的個數
/**
* 從初始的質心檔案中載入質心,並返回字串,質心之間用tab分割
* @param path
* @return
* @throws IOException
*/
public String loadInitCenter(Path path) throws IOException {
StringBuffer sb = new StringBuffer();
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FSDataInputStream dis = hdfs.open(path);
LineReader in = new LineReader(dis, conf);
Text line = new Text();
while(in.readLine(line) > 0) {
sb.append(line.toString().trim());
sb.append("\t");
}
return sb.toString().trim();
}
/**
* 從每次迭代的質心檔案中讀取質心,並返回字串
* @param path
* @return
* @throws IOException
*/
public String loadCenter(Path path) throws IOException {
StringBuffer sb = new StringBuffer();
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileStatus[] files = hdfs.listStatus(path);
for(int i = 0; i < files.length; i++) {
Path filePath = files[i].getPath();
if(!filePath.getName().contains("part")) continue;
FSDataInputStream dis = hdfs.open(filePath);
LineReader in = new LineReader(dis, conf);
Text line = new Text();
while(in.readLine(line) > 0) {
sb.append(line.toString().trim());
sb.append("\t");
}
}
return sb.toString().trim();
}
}
KmeansMR類
public class KmeansMR {
private static String FLAG = "KCLUSTER";
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text>{
double[][] centers = new double[Center.k][];
String[] centerstrArray = null;
@Override
public void setup(Context context) {
//將放在context中的聚類中心轉換為陣列的形式,方便使用
String kmeansS = context.getConfiguration().get(FLAG);
centerstrArray = kmeansS.split("\t");
for(int i = 0; i < centerstrArray.length; i++) {
String[] segs = centerstrArray[i].split(",");
centers[i] = new double[segs.length];
for(int j = 0; j < segs.length; j++) {
centers[i][j] = Double.parseDouble(segs[j]);
}
}
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] segs = line.split(",");
double[] sample = new double[segs.length];
for(int i = 0; i < segs.length; i++) {
sample[i] = Float.parseFloat(segs[i]);
}
//求得距離最近的質心
double min = Double.MAX_VALUE;
int index = 0;
for(int i = 0; i < centers.length; i++) {
double dis = distance(centers[i], sample);
if(dis < min) {
min = dis;
index = i;
}
}
context.write(new Text(centerstrArray[index]), new Text(line));
}
}
public static class IntSumReducer
extends Reducer<Text,Text,NullWritable,Text> {
Counter counter = null;
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
double[] sum = new double[Center.k];
int size = 0;
//計算對應維度上值的加和,存放在sum陣列中
for(Text text : values) {
String[] segs = text.toString().split(",");
for(int i = 0; i < segs.length; i++) {
sum[i] += Double.parseDouble(segs[i]);
}
size ++;
}
//求sum陣列中每個維度的平均值,也就是新的質心
StringBuffer sb = new StringBuffer();
for(int i = 0; i < sum.length; i++) {
sum[i] /= size;
sb.append(sum[i]);
sb.append(",");
}
/**判斷新的質心跟老的質心是否是一樣的*/
boolean flag = true;
String[] centerStrArray = key.toString().split(",");
for(int i = 0; i < centerStrArray.length; i++) {
if(Math.abs(Double.parseDouble(centerStrArray[i]) - sum[i]) > 0.00000000001) {
flag = false;
break;
}
}
//如果新的質心跟老的質心是一樣的,那麼相應的計數器加1
if(flag) {
counter = context.getCounter("myCounter", "kmenasCounter");
counter.increment(1l);
}
context.write(null, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
Path kMeansPath = new Path("/dsap/middata/kmeans/kMeans"); //初始的質心檔案
Path samplePath = new Path("/dsap/middata/kmeans/sample"); //樣本檔案
//載入聚類中心檔案
Center center = new Center();
String centerString = center.loadInitCenter(kMeansPath);
int index = 0; //迭代的次數
while(index < 5) {
Configuration conf = new Configuration();
conf.set(FLAG, centerString); //將聚類中心的字串放到configuration中
kMeansPath = new Path("/dsap/middata/kmeans/kMeans" + index); //本次迭代的輸出路徑,也是下一次質心的讀取路徑
/**判斷輸出路徑是否存在,如果存在,則刪除*/
FileSystem hdfs = FileSystem.get(conf);
if(hdfs.exists(kMeansPath)) hdfs.delete(kMeansPath);
Job job = new Job(conf, "kmeans" + index);
job.setJarByClass(KmeansMR.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, samplePath);
FileOutputFormat.setOutputPath(job, kMeansPath);
job.waitForCompletion(true);
/**獲取自定義counter的大小,如果等於質心的大小,說明質心已經不會發生變化了,則程式停止迭代*/
long counter = job.getCounters().getGroup("myCounter").findCounter("kmenasCounter").getValue();
if(counter == Center.k) System.exit(0);
/**重新載入質心*/
center = new Center();
centerString = center.loadCenter(kMeansPath);
index ++;
}
System.exit(0);
}
public static double distance(double[] a, double[] b) {
if(a == null || b == null || a.length != b.length) return Double.MAX_VALUE;
double dis = 0;
for(int i = 0; i < a.length; i++) {
dis += Math.pow(a[i] - b[i], 2);
}
return Math.sqrt(dis);
}
}
5. 結果
產生了兩個資料夾,分別是第一次、第二次迭代後的聚類中心
最後的聚類中心的內容如下