1. 程式人生 > 實用技巧 >MapReduce之多個Job串聯的案例

MapReduce之多個Job串聯的案例

@目錄

需求

有三個檔案,裡面記錄著一些單詞,請統計每個單詞分別在每個檔案出現的次數

資料輸入

期待輸出
比如:atguigu c.txt-->2 b.txt-->2 a.txt-->3

分析

如果一個需求,一個MRjob無法完成,可以將需求拆分為若干Job,多個Job按照依賴關係依次執行!

Job1
Mapper: 預設一個MapTask只處理一個切片的資料,預設的切片策略,一個切片只屬於一個檔案。

  • keyin-valuein:atguigu pingping
  • keyout-valueout:atguigu-a.txt,1

Reducer

  • keyin-valuein: atguigu-a.txt,1(mapper的輸出,作為reducer的輸入
  • keyout-valueout: atguigu-a.txt,3
    pingping-a.txt,2
    atguigu-b.txt,3
    pingping-b.txt,2

Job2
Mapper: 預設一個MapTask只處理一個切片的資料,預設的切片策略,一個切片只屬於一個檔案。

  • keyin-valuein: pingping,a.txt-2(上一個Job的reducer的輸出,作為本次job的mapper的輸入
  • keyout-valueout: pingping,a.txt-2(原封不動的輸出

Reducer

  • keyin-valuein:
    pingping,a.txt-2
    pingping,b.txt-2
  • keyout-valueout:pingping,a.txt-2 b.txt-2(最後將相同key下的value拼接即可

程式碼實現

Mapper1.java

/*
 * 1.輸入
 * 		atguigu pingping
 * 2.輸出
 * 		atguigu-a.txt,1
 */
public class Example1Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	private String filename;
	private Text out_key=new Text();
	private IntWritable out_value=new IntWritable(1);
	
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		
		InputSplit inputSplit = context.getInputSplit();
		
		FileSplit split=(FileSplit)inputSplit;
		
		filename=split.getPath().getName();
		
	}
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split(" ");
		
		for (String word : words) {
			
			out_key.set(word+"-"+filename);
			
			context.write(out_key, out_value);
		}
		
	}

}

Reducer1.java

/*
 * 1.輸入
 * 		atguigu-a.txt,1
 * 		atguigu-a.txt,1
 * 		atguigu-a.txt,1
 * 2.輸出
 * 		atguigu-a.txt,3
 */
public class Example1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	private IntWritable out_value=new IntWritable();
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		
		int sum=0;
		
		for (IntWritable value : values) {
			sum+=value.get();
		}
		
		out_value.set(sum);
		
		context.write(key, out_value);
		
	}

}

Mapper2.java

/*
 * 1.輸入
 * 		atguigu-a.txt\t3
 * 		atguigu-b.txt\t3
 * 		使用KeyValueTextInputFormat,可以使用一個分隔符,分隔符之前的作為key,之後的作為value
 * 2.輸出
 * 		atguigu,a.txt\t3
 * 		atguigu,b.txt\t3
 */
public class Example1Mapper2 extends Mapper<Text, Text, Text, Text>{
	//不用重寫map方法,父方法會自動將輸入的key-value強轉成輸出的key-value
}

Reducer2.java

/*
 * 1.輸入
 * 		atguigu,a.txt\t3
 * 		atguigu,b.txt\t3
 * 		
 * 2.輸出
 * 		atguigu,a.txt\t3 b.txt\t3
 * 		
 */
public class Example1Reducer2 extends Reducer<Text, Text, Text, Text>{
	
	private Text out_value=new Text();
	
	@Override
	protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
	
		StringBuffer sb = new StringBuffer();
		
		//拼接value
		for (Text value : values) {
			
			sb.append(value.toString()+" ");
			
		}
		
		out_value.set(sb.toString());
		
		context.write(key, out_value);
		
	}

}

Driver.java

/*
 * 1. Example1Driver 提交兩個Job
 * 			Job2 必須 依賴於 Job1,必須在Job1已經執行完成之後,生成結果後,才能執行!
 * 
 * 2. JobControl: 定義一組MR jobs,還可以指定其依賴關係
 * 				可以通過addJob(ControlledJob aJob)向一個JobControl中新增Job物件!
 * 
 * 3. ControlledJob: 可以指定依賴關係的Job物件
 * 			addDependingJob(ControlledJob dependingJob): 為當前Job新增依賴的Job
 * 			 public ControlledJob(Configuration conf) : 基於配置構建一個ControlledJob
 * 
 */
public class Example1Driver {
	
public static void main(String[] args) throws Exception {
		
		//定義路徑
		Path inputPath=new Path("e:/mrinput/index");
		Path outputPath=new Path("e:/mroutput/index");
		Path finalOutputPath=new Path("e:/mroutput/finalindex");
		
		//作為整個Job的配置
		Configuration conf1 = new Configuration();
		Configuration conf2 = new Configuration();
		conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "-");
		
		
		//保證輸出目錄不存在
		FileSystem fs=FileSystem.get(conf1);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		if (fs.exists(finalOutputPath)) {
			
			fs.delete(finalOutputPath, true);
			
		}
		
		// ①建立Job
		Job job1 = Job.getInstance(conf1);
		Job job2 = Job.getInstance(conf2);
		
		// 設定Job名稱
		job1.setJobName("index1");
		job2.setJobName("index2");
		
		// ②設定Job1
		job1.setMapperClass(Example1Mapper1.class);
		job1.setReducerClass(Example1Reducer1.class);
		
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);
		
		// 設定輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job1, inputPath);
		FileOutputFormat.setOutputPath(job1, outputPath);
		
		// ②設定Job2
		job2.setMapperClass(Example1Mapper2.class);
		job2.setReducerClass(Example1Reducer2.class);
				
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
				
		// 設定輸入目錄和輸出目錄
		FileInputFormat.setInputPaths(job2, outputPath);
		FileOutputFormat.setOutputPath(job2, finalOutputPath);
		
		// 設定job2的輸入格式
		job2.setInputFormatClass(KeyValueTextInputFormat.class);
		
		//--------------------------------------------------------
		//構建JobControl
		JobControl jobControl = new JobControl("index");
		
		//建立執行的Job
		ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
		ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
		
		//指定依賴關係
		controlledJob2.addDependingJob(controlledJob1);
		
		// 向jobControl設定要執行哪些job
		jobControl.addJob(controlledJob1);
		jobControl.addJob(controlledJob2);
		
		//執行JobControl
		Thread jobControlThread = new Thread(jobControl);
		//設定此執行緒為守護執行緒
		jobControlThread.setDaemon(true);
		
		jobControlThread.start();
		
		//獲取JobControl執行緒的執行狀態
		while(true) {
			
			//判斷整個jobControl是否全部執行結束
			if (jobControl.allFinished()) {
				
				System.out.println(jobControl.getSuccessfulJobList());
				
				return;
				
			}
			
		}
	
	}	
}

輸出結果

job1的輸出:

最終結果的輸出: