MapReduce尋找共同好友
阿新 • • 發佈:2018-12-12
初始資料
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
===============================================================================================
第一階段
找出每個朋友共同認識的人。
package cn.hkj.bigdata.fans; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CommonFriendsStepOne { /* * A:B,C,D,F,E,O */ static class CommonFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] person_Friend = line.split(":"); System.out.println(Arrays.toString(person_Friend)); String person = person_Friend[0]; String[] friends = person_Friend[1].split(","); for(String friend : friends) { //朋友,人 context.write(new Text(friend), new Text(person)); } } } /* * 朋友,人 * B A * B ... */ static class CommonFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String k = key.toString(); k = k+":"; StringBuffer sb = new StringBuffer(); for(Text value : values) { sb.append(value.toString()+" "); } //B A C D context.write(new Text(k), new Text(sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("CommonFriendsStepOne"); job.setJarByClass(CommonFriendsStepOne.class); job.setMapperClass(CommonFriendsStepOneMapper.class); job.setReducerClass(CommonFriendsStepOneReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)) { fs.delete(output,true); System.out.println("目的地址已初始完畢"); } FileOutputFormat.setOutputPath(job, output); System.exit(job.waitForCompletion(true) ? 0:1); }
===============================================================================================
第二階段
共同的人兩兩配對
package cn.hkj.bigdata.fans; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CommonFriendsStepTwo { /* * A: I K C B G F H O D */ static class CommonFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] firend_person = line.split(":"); String friend = firend_person[0]; String[] persons = firend_person[1].trim().split(" "); //去掉左右空格再切分 Arrays.sort(persons); //排序 System.out.println(Arrays.toString(persons)); for(int i=0;i<persons.length-1;i++) { for(int j=i+1;j<persons.length;j++) { String k = persons[i] + "->" + persons[j] + ":"; //System.out.println(k); context.write(new Text(k), new Text(friend)); } } } } static class CommonFriendsStepTwoReducer extends Reducer<Text, Text, Text ,Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for(Text value : values) { sb.append(value.toString() + " "); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("CommonFriendsStepTwo"); job.setJarByClass(CommonFriendsStepTwo.class); job.setMapperClass(CommonFriendsStepTwoMapper.class); job.setReducerClass(CommonFriendsStepTwoReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)) { fs.delete(output,true); System.out.println("目的地址已初始完畢"); } FileOutputFormat.setOutputPath(job, output); System.exit(job.waitForCompletion(true) ? 0:1); } }
=============================================================================================
結果
A->B: E C A->C: D F A->D: E F A->E: D B C A->F: O B C D E A->G: F E C D A->H: E C D O A->I: O A->J: O B A->K: D C A->L: F E D A->M: E F B->C: A B->D: A E B->E: C B->F: E A C B->G: C E A B->H: A E C B->I: A B->K: C A B->L: E B->M: E B->O: A C->D: A F C->E: D C->F: D A C->G: D F A C->H: D A C->I: A C->K: A D C->L: D F C->M: F C->O: I A D->E: L D->F: A E D->G: E A F D->H: A E D->I: A D->K: A D->L: E F D->M: F E D->O: A E->F: D M C B E->G: C D E->H: C D E->J: B E->K: C D E->L: D F->G: D C A E F->H: A D O E C F->I: O A F->J: B O F->K: D C A F->L: E D F->M: E F->O: A G->H: D C E A G->I: A G->K: D A C G->L: D F E G->M: E F G->O: A H->I: O A H->J: O H->K: A C D H->L: D E H->M: E H->O: A I->J: O I->K: A I->O: A K->L: D K->O: A L->M: E F