MapReduce實現尋找共同好友
阿新 • • 發佈:2019-01-11
public class SharedFriend { /* 第一階段的map函式主要完成以下任務 1.遍歷原始檔案中每行<所有朋友>資訊 2.遍歷“朋友”集合,以每個“朋友”為鍵,原來的“人”為值 即輸出<朋友,人> */ static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] person_friends = line.split(":"); String person = person_friends[0]; String[] friends = person_friends[1].split(","); for(String friend : friends){ context.write(new Text(friend), new Text(person)); } } } /* 第一階段的reduce函式主要完成以下任務 1.對所有傳過來的<朋友,list(人)>進行拼接,輸出<朋友,擁有這名朋友的所有人> */ static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for(Text friend : values){ sb.append(friend.toString()).append(","); } sb.deleteCharAt(sb.length()-1); context.write(key, new Text(sb.toString())); } } /* 第二階段的map函式主要完成以下任務 1.將上一階段reduce輸出的<朋友,擁有這名朋友的所有人>資訊中的 “擁有這名朋友的所有人”進行排序 ,以防出現B-C C-B這樣的重複 2.將 “擁有這名朋友的所有人”進行兩兩配對,並將配對後的字串當做鍵,“朋友”當做值輸出,即輸出<人-人,共同朋友> */ static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] friend_persons = line.split("\t"); String friend = friend_persons[0]; String[] persons = friend_persons[1].split(","); Arrays.sort(persons); //排序 //兩兩配對 for(int i=0;i<persons.length-1;i++){ for(int j=i+1;j<persons.length;j++){ context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend)); } } } } /* 第二階段的reduce函式主要完成以下任務 1.<人-人,list(共同朋友)> 中的“共同好友”進行拼接 最後輸出<人-人,兩人的所有共同好友> */ static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); Set<String> set = new HashSet<String>(); for(Text friend : values){ if(!set.contains(friend.toString())) set.add(friend.toString()); } for(String friend : set){ sb.append(friend.toString()).append(","); } sb.deleteCharAt(sb.length()-1); context.write(key, new Text(sb.toString())); } } public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); //第一階段 Job job1 = Job.getInstance(conf); job1.setJarByClass(SharedFriend.class); job1.setMapperClass(SharedFriendMapper01.class); job1.setReducerClass(SharedFriendReducer01.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job1, new Path("H:/大資料/mapreduce/sharedfriend/input")); FileOutputFormat.setOutputPath(job1, new Path("H:/大資料/mapreduce/sharedfriend/output")); boolean res1 = job1.waitForCompletion(true); //第二階段 Job job2 = Job.getInstance(conf); job2.setJarByClass(SharedFriend.class); job2.setMapperClass(SharedFriendMapper02.class); job2.setReducerClass(SharedFriendReducer02.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2, new Path("H:/大資料/mapreduce/sharedfriend/output")); FileOutputFormat.setOutputPath(job2, new Path("H:/大資料/mapreduce/sharedfriend/output01")); boolean res2 = job2.waitForCompletion(true); System.exit(res1?0:1); } }