1. 程式人生 > >MapReduce中的多Job串聯

MapReduce中的多Job串聯

求共同好友:

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K

以上是資料:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A使用者的好友。

1、求所有兩兩使用者之間的共同好友

CF1:

package mapreduce.exercise.cf;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *描述: 求共同好友的第一個MapRedcue 
 *
 */

public class CF1MR {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(CF1MR.class);
		
		job.setMapperClass(CF1MRMapper.class);
		job.setReducerClass(CF1MRReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		/**
		 * 設定輸入輸出
		 */
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		/**
		 * 提交任務
		 */
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);

	}
	
	/**
	 * Mapper階段的業務邏輯
	 */
	private static class CF1MRMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		private Text keyOut = new Text();
		private Text valueOut = new Text();
		
		/**
		 * value : A:B,C,D,F,E,O
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String[] split = value.toString().split(":");
			String outValue = split[0];
			valueOut.set(outValue);
			
			String[] keys = split[1].split(",");
			for(String outKey : keys){
				keyOut.set(outKey);
				context.write(keyOut, valueOut);
			}
	
		}
	}
		
		/**
		 * Reducer階段的業務邏輯
		 */
		private static class CF1MRReducer extends Reducer<Text, Text, Text, Text>{
			
			private Text valueOut = new Text();
			/**
			 * 一次reduce方法呼叫的時候獲取的引數:
			 */

			@Override
			protected void reduce(Text key, Iterable<Text> values, Context context)
					throws IOException, InterruptedException {

				StringBuilder sb = new StringBuilder();
				
				for(Text t : values){
					sb.append(t.toString()).append("-");
				}
				String outValue = sb.toString().substring(0, sb.toString().length()-1);
				
				valueOut.set(outValue);
				/**
				 * key : E
				 * value : A-B-G-...
				 */
				context.write(key, valueOut);

			}
			
		}

}

CF2:

package mapreduce.exercise.cf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class CF2MR {
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(CF2MR.class);
		
		job.setMapperClass(CF2MRMapper.class);
		job.setReducerClass(CF2MRReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		/**
		 * 設定輸入輸出
		 */
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		/**
		 * 提交任務
		 */
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);
		
	}
	
	/**
	 * Mapper階段的業務邏輯
	 */
	private static class CF2MRMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		private Text keyOut = new Text();
		private Text valueOut = new Text();
		/**
		 * key : 起始偏移量
		 * value : E A-B-G-...
		 * 
		 * A-B-G-H
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split("\t");
			String outValue = split[0];
			valueOut.set(outValue);
			
			List userList = new ArrayList<>();
			
			for(String t : split[1].split("-")){
				userList.add(t);
			}
			
			
			/**
			 * 為什麼要排序?
			 * 
			 * E A-B-G-H
			 * C B-A-H-G   -----排序------>  A-B-G-H
			 * 
			 * A-B E
			 * B-A C
			 * 
			 * A-B E
			 * A-B C
			 * 
			 * A-B === B-A
			 */
			Collections.sort(userList);
			
			int size = userList.size();
			for(int i = 0; i < size-1; i++){
				for(int j = i + 1; j <  size; j++){
					String outKey = userList.get(i) +"-"+userList.get(j);
					keyOut.set(outKey);
					
					/**
					 * key : A-B
					 * value : E
					 */
					context.write(keyOut, valueOut);	
				}
			}
			
		}
		
	}
		
		/**
		 * Reducer階段的業務邏輯
		 */
		private static class CF2MRReducer extends Reducer<Text, Text, Text, Text>{
			
			private Text valueOut = new Text();
			/**
			 * key : A-B
			 * values : E C G H
			 */

			@Override
			protected void reduce(Text key, Iterable<Text> values, Context context)
					throws IOException, InterruptedException {

				StringBuilder sb = new StringBuilder();
				for(Text t : values){
					sb.append(t.toString()).append(",");
				}
				
				String outValue = sb.toString().substring(0, sb.toString().length()-1);
				valueOut.set(outValue);
				context.write(key, valueOut);
			}
		}

	
}

串聯在一起Job的程式碼:

package mapreduce.exercise.cf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *描述: 表示把多個MapReduce程式串聯成一個完整的任務 
 *
 */

public class CFMR {
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME","hadoop");
		
		FileSystem fs = FileSystem.get(conf);
		
		Job cf1job = Job.getInstance(conf);
		cf1job.setJarByClass(CFMR.class);
		cf1job.setMapperClass(CF1MRMapper.class);
		cf1job.setReducerClass(CF1MRReducer.class);
		cf1job.setMapOutputKeyClass(Text.class);
		cf1job.setMapOutputValueClass(Text.class);
		cf1job.setOutputKeyClass(Text.class);
		cf1job.setOutputValueClass(Text.class);
		
		Path inputPath1 = new Path(args[0]);
		Path outputPath1 = new Path(args[1]);
		FileInputFormat.setInputPaths(cf1job, inputPath1);
		
		if(fs.exists(outputPath1)){
			fs.delete(outputPath1, true);
		}
		FileOutputFormat.setOutputPath(cf1job, outputPath1);
		
		Job cf2job = Job.getInstance(conf);
		cf2job.setJarByClass(CFMR.class);
		cf2job.setMapperClass(CF2MRMapper.class);
		cf2job.setReducerClass(CF2MRReducer.class);
		cf2job.setMapOutputKeyClass(Text.class);
		cf2job.setMapOutputValueClass(Text.class);
		cf2job.setOutputKeyClass(Text.class);
		cf2job.setOutputValueClass(Text.class);
		Path inputPath2 = new Path(args[1]);
		Path outputPath2 = new Path(args[2]);
		FileInputFormat.setInputPaths(cf2job, inputPath2);
		if(fs.exists(outputPath2)){
			fs.delete(outputPath2,true);
		}
		FileOutputFormat.setOutputPath(cf2job, outputPath2);
		
		/**
		 * 提交
		 * 改進提交方式: 讓多個具有依賴關係的任務進行串聯執行
		 * 
		 * 比如: job2的執行要依賴於job1的執行結果。那就表示job2的執行必須在job1的後面。而且也必須要等到job1執行完成之後才能提交任務執行
		 * 
		 * 使用一種新的方式去管理這些任務的依賴關係以及提交(完整提交)
		 */
		ControlledJob job1 = new ControlledJob(cf1job,null);
		List<ControlledJob> dpdsJobs = new ArrayList<>();
		dpdsJobs.add(job1);
		ControlledJob job2 = new ControlledJob(cf2job,dpdsJobs);
		
		JobControl jc  = new JobControl("cf mr");
		
		jc.addJob(job1);
		jc.addJob(job2);
		
		Thread cfThread = new Thread(jc);
		
		cfThread.start();
		
		while(true){
			boolean allFinished = jc.allFinished();
			Thread.sleep(2000);
			System.out.println("任務是否完成:"+allFinished);
			if(allFinished){
				break;
			}
				
		}
		System.exit(0);
		
	}
	
	private static class CF1MRMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		private Text keyOut = new Text();
		private Text valueOut = new Text();
		
		/**
		 * value : A:B,C,D,F,E,O
		 */
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split(":");
			
			String outValue = split[0];
			valueOut.set(outValue);
			
			String[] keys = split[1].split(",");
			for(String outKey : keys){
				keyOut.set(outKey);
				context.write(keyOut, valueOut);
			}
		}
	}
	private static class CF1MRReducer extends Reducer<Text, Text, Text, Text>{
		
		private Text valueOut = new Text();

		
		/**
		 * 一次reduce方法呼叫的時候獲取到的引數:
		 * 
		 * E A E B E G E ...
		 */
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder sb = new StringBuilder();
			
			for(Text t : values){
				sb.append(t.toString()).append("-");
			}
			String outValue = sb.toString().substring(0, sb.toString().length()-1);
			
			valueOut.set(outValue);
			
			/**
			 * key : E 
			 * value : A-B-G-...
			 */
			context.write(key, valueOut);
			
		}

	}

	
	private static class CF2MRMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		private Text keyOut = new Text();
		private Text valueOut = new Text();
		
		
		/**
		 * key : 起始偏移量
		 * value : E	A-B-G-...
		 * 
		 * A-B-G-H
		 */
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split("\t");
			
			String outValue = split[0];
			valueOut.set(outValue);
			
			List<String> userList = new ArrayList<>();
			
			for(String t : split[1].split("-")){
				userList.add(t);
			}
			/**
			 * 為什麼要排序?
			 * 
			 * E A-B-G-H C B-A-H-G -----排序------> A-B-G-H
			 * 
			 * A-B E B-A C
			 * 
			 * A-B E A-B C
			 * 
			 * A-B === B-A
			 */
			Collections.sort(userList);
			
			int size = userList.size();
			for(int i = 0; i < size-1; i++){
				for(int j = i+1; j < size; j++){
					String outKey = userList.get(i)+"-"+userList.get(j);
					keyOut.set(outKey);
					
					/**
					 * key : A-B
					 * value : E
					 */
					context.write(keyOut, valueOut);
				}
			}
		}
	}
		private static class CF2MRReducer extends Reducer<Text, Text, Text, Text>{
			
			private Text valueOut = new Text();

			/**
			 * key : A-B
			 * value : E C G H
			 */
			@Override
			protected void reduce(Text key, Iterable<Text> values, Context context)
					throws IOException, InterruptedException {

				StringBuilder sb = new StringBuilder();
				for(Text t : values){
					sb.append(t.toString()).append(",");
				}
				String outValue = sb.toString().substring(0, sb.toString().length()-1);
				valueOut.set(outValue);
				context.write(key, valueOut);

			}
	
		}
		
		
	}