1. 程式人生 > >MapReduce實現尋找共同好友

MapReduce實現尋找共同好友

public class SharedFriend {
	/*
	 第一階段的map函式主要完成以下任務
	 1.遍歷原始檔案中每行<所有朋友>資訊
	 2.遍歷“朋友”集合,以每個“朋友”為鍵,原來的“人”為值  即輸出<朋友,人>
	 */
	static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] person_friends = line.split(":");
			String person = person_friends[0];
			String[] friends = person_friends[1].split(",");
			
			for(String friend : friends){
				context.write(new Text(friend), new Text(person));
			}
		}
	}
	
	/*
	  第一階段的reduce函式主要完成以下任務
	  1.對所有傳過來的<朋友,list(人)>進行拼接,輸出<朋友,擁有這名朋友的所有人>
	 */
	static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			for(Text friend : values){
				sb.append(friend.toString()).append(",");
			}
			sb.deleteCharAt(sb.length()-1);
			context.write(key, new Text(sb.toString()));
		}
	}
	
	/*
	第二階段的map函式主要完成以下任務
	1.將上一階段reduce輸出的<朋友,擁有這名朋友的所有人>資訊中的 “擁有這名朋友的所有人”進行排序 ,以防出現B-C C-B這樣的重複
	2.將 “擁有這名朋友的所有人”進行兩兩配對,並將配對後的字串當做鍵,“朋友”當做值輸出,即輸出<人-人,共同朋友>
	 */
	static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] friend_persons = line.split("\t");
			String friend = friend_persons[0];
			String[] persons = friend_persons[1].split(",");
			Arrays.sort(persons); //排序
			
			//兩兩配對
			for(int i=0;i<persons.length-1;i++){
				for(int j=i+1;j<persons.length;j++){
					context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend));
				}
			}
		}
	}
	
	/*
	第二階段的reduce函式主要完成以下任務
	1.<人-人,list(共同朋友)> 中的“共同好友”進行拼接 最後輸出<人-人,兩人的所有共同好友>
	 */
	static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			StringBuffer sb = new StringBuffer();
			Set<String> set = new HashSet<String>();
			for(Text friend : values){
				if(!set.contains(friend.toString()))
					set.add(friend.toString());
			}
			for(String friend : set){
				sb.append(friend.toString()).append(",");
			}
			sb.deleteCharAt(sb.length()-1);
			
			context.write(key, new Text(sb.toString()));
		}
	}
	
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();

		//第一階段
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(SharedFriend.class);
		job1.setMapperClass(SharedFriendMapper01.class);
		job1.setReducerClass(SharedFriendReducer01.class);
		
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job1, new Path("H:/大資料/mapreduce/sharedfriend/input"));
		FileOutputFormat.setOutputPath(job1, new Path("H:/大資料/mapreduce/sharedfriend/output"));
		
		boolean res1 = job1.waitForCompletion(true);
		
		//第二階段
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(SharedFriend.class);
		job2.setMapperClass(SharedFriendMapper02.class);
		job2.setReducerClass(SharedFriendReducer02.class);
		
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job2, new Path("H:/大資料/mapreduce/sharedfriend/output"));
		FileOutputFormat.setOutputPath(job2, new Path("H:/大資料/mapreduce/sharedfriend/output01"));
		
		boolean res2 = job2.waitForCompletion(true);
		
		System.exit(res1?0:1);
	}
}

4.輸出