MapReduce之多個Job串聯的案例
阿新 • • 發佈:2020-08-17
@目錄
需求
有三個檔案,裡面記錄著一些單詞,請統計每個單詞分別在每個檔案出現的次數。
資料輸入
期待輸出
比如:atguigu c.txt-->2 b.txt-->2 a.txt-->3
分析
如果一個需求,一個MRjob無法完成,可以將需求拆分為若干Job,多個Job按照依賴關係依次執行!
Job1
:
Mapper
: 預設一個MapTask只處理一個切片的資料,預設的切片策略,一個切片只屬於一個檔案。
- keyin-valuein:atguigu pingping
- keyout-valueout:atguigu-a.txt,1
Reducer
- keyin-valuein: atguigu-a.txt,1(mapper的輸出,作為reducer的輸入)
- keyout-valueout: atguigu-a.txt,3
pingping-a.txt,2
atguigu-b.txt,3
pingping-b.txt,2
Job2
:
Mapper
: 預設一個MapTask只處理一個切片的資料,預設的切片策略,一個切片只屬於一個檔案。
- keyin-valuein: pingping,a.txt-2(上一個Job的reducer的輸出,作為本次job的mapper的輸入)
- keyout-valueout: pingping,a.txt-2(原封不動的輸出
Reducer
:
- keyin-valuein:
pingping,a.txt-2
pingping,b.txt-2 - keyout-valueout:pingping,a.txt-2 b.txt-2(最後將相同key下的value拼接即可)
程式碼實現
Mapper1.java
/* * 1.輸入 * atguigu pingping * 2.輸出 * atguigu-a.txt,1 */ public class Example1Mapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{ private String filename; private Text out_key=new Text(); private IntWritable out_value=new IntWritable(1); @Override protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { InputSplit inputSplit = context.getInputSplit(); FileSplit split=(FileSplit)inputSplit; filename=split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String word : words) { out_key.set(word+"-"+filename); context.write(out_key, out_value); } } }
Reducer1.java
/*
* 1.輸入
* atguigu-a.txt,1
* atguigu-a.txt,1
* atguigu-a.txt,1
* 2.輸出
* atguigu-a.txt,3
*/
public class Example1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable out_value=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum+=value.get();
}
out_value.set(sum);
context.write(key, out_value);
}
}
Mapper2.java
/*
* 1.輸入
* atguigu-a.txt\t3
* atguigu-b.txt\t3
* 使用KeyValueTextInputFormat,可以使用一個分隔符,分隔符之前的作為key,之後的作為value
* 2.輸出
* atguigu,a.txt\t3
* atguigu,b.txt\t3
*/
public class Example1Mapper2 extends Mapper<Text, Text, Text, Text>{
//不用重寫map方法,父方法會自動將輸入的key-value強轉成輸出的key-value
}
Reducer2.java
/*
* 1.輸入
* atguigu,a.txt\t3
* atguigu,b.txt\t3
*
* 2.輸出
* atguigu,a.txt\t3 b.txt\t3
*
*/
public class Example1Reducer2 extends Reducer<Text, Text, Text, Text>{
private Text out_value=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
//拼接value
for (Text value : values) {
sb.append(value.toString()+" ");
}
out_value.set(sb.toString());
context.write(key, out_value);
}
}
Driver.java
/*
* 1. Example1Driver 提交兩個Job
* Job2 必須 依賴於 Job1,必須在Job1已經執行完成之後,生成結果後,才能執行!
*
* 2. JobControl: 定義一組MR jobs,還可以指定其依賴關係
* 可以通過addJob(ControlledJob aJob)向一個JobControl中新增Job物件!
*
* 3. ControlledJob: 可以指定依賴關係的Job物件
* addDependingJob(ControlledJob dependingJob): 為當前Job新增依賴的Job
* public ControlledJob(Configuration conf) : 基於配置構建一個ControlledJob
*
*/
public class Example1Driver {
public static void main(String[] args) throws Exception {
//定義路徑
Path inputPath=new Path("e:/mrinput/index");
Path outputPath=new Path("e:/mroutput/index");
Path finalOutputPath=new Path("e:/mroutput/finalindex");
//作為整個Job的配置
Configuration conf1 = new Configuration();
Configuration conf2 = new Configuration();
conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "-");
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf1);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(finalOutputPath)) {
fs.delete(finalOutputPath, true);
}
// ①建立Job
Job job1 = Job.getInstance(conf1);
Job job2 = Job.getInstance(conf2);
// 設定Job名稱
job1.setJobName("index1");
job2.setJobName("index2");
// ②設定Job1
job1.setMapperClass(Example1Mapper1.class);
job1.setReducerClass(Example1Reducer1.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job1, inputPath);
FileOutputFormat.setOutputPath(job1, outputPath);
// ②設定Job2
job2.setMapperClass(Example1Mapper2.class);
job2.setReducerClass(Example1Reducer2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job2, outputPath);
FileOutputFormat.setOutputPath(job2, finalOutputPath);
// 設定job2的輸入格式
job2.setInputFormatClass(KeyValueTextInputFormat.class);
//--------------------------------------------------------
//構建JobControl
JobControl jobControl = new JobControl("index");
//建立執行的Job
ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
//指定依賴關係
controlledJob2.addDependingJob(controlledJob1);
// 向jobControl設定要執行哪些job
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);
//執行JobControl
Thread jobControlThread = new Thread(jobControl);
//設定此執行緒為守護執行緒
jobControlThread.setDaemon(true);
jobControlThread.start();
//獲取JobControl執行緒的執行狀態
while(true) {
//判斷整個jobControl是否全部執行結束
if (jobControl.allFinished()) {
System.out.println(jobControl.getSuccessfulJobList());
return;
}
}
}
}
輸出結果
job1的輸出:
最終結果的輸出: