MapReduce中的多Job串聯
阿新 • • 發佈:2019-01-22
求共同好友:
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K
以上是資料:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A使用者的好友。
1、求所有兩兩使用者之間的共同好友
CF1:
package mapreduce.exercise.cf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** *描述: 求共同好友的第一個MapRedcue * */ public class CF1MR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); job.setJarByClass(CF1MR.class); job.setMapperClass(CF1MRMapper.class); job.setReducerClass(CF1MRReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /** * 設定輸入輸出 */ Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inputPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); /** * 提交任務 */ boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } /** * Mapper階段的業務邏輯 */ private static class CF1MRMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text keyOut = new Text(); private Text valueOut = new Text(); /** * value : A:B,C,D,F,E,O */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":"); String outValue = split[0]; valueOut.set(outValue); String[] keys = split[1].split(","); for(String outKey : keys){ keyOut.set(outKey); context.write(keyOut, valueOut); } } } /** * Reducer階段的業務邏輯 */ private static class CF1MRReducer extends Reducer<Text, Text, Text, Text>{ private Text valueOut = new Text(); /** * 一次reduce方法呼叫的時候獲取的引數: */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text t : values){ sb.append(t.toString()).append("-"); } String outValue = sb.toString().substring(0, sb.toString().length()-1); valueOut.set(outValue); /** * key : E * value : A-B-G-... */ context.write(key, valueOut); } } }
CF2:
package mapreduce.exercise.cf; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CF2MR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); job.setJarByClass(CF2MR.class); job.setMapperClass(CF2MRMapper.class); job.setReducerClass(CF2MRReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /** * 設定輸入輸出 */ Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inputPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); /** * 提交任務 */ boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } /** * Mapper階段的業務邏輯 */ private static class CF2MRMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text keyOut = new Text(); private Text valueOut = new Text(); /** * key : 起始偏移量 * value : E A-B-G-... * * A-B-G-H */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String outValue = split[0]; valueOut.set(outValue); List userList = new ArrayList<>(); for(String t : split[1].split("-")){ userList.add(t); } /** * 為什麼要排序? * * E A-B-G-H * C B-A-H-G -----排序------> A-B-G-H * * A-B E * B-A C * * A-B E * A-B C * * A-B === B-A */ Collections.sort(userList); int size = userList.size(); for(int i = 0; i < size-1; i++){ for(int j = i + 1; j < size; j++){ String outKey = userList.get(i) +"-"+userList.get(j); keyOut.set(outKey); /** * key : A-B * value : E */ context.write(keyOut, valueOut); } } } } /** * Reducer階段的業務邏輯 */ private static class CF2MRReducer extends Reducer<Text, Text, Text, Text>{ private Text valueOut = new Text(); /** * key : A-B * values : E C G H */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text t : values){ sb.append(t.toString()).append(","); } String outValue = sb.toString().substring(0, sb.toString().length()-1); valueOut.set(outValue); context.write(key, valueOut); } } }
串聯在一起Job的程式碼:
package mapreduce.exercise.cf; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** *描述: 表示把多個MapReduce程式串聯成一個完整的任務 * */ public class CFMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); System.setProperty("HADOOP_USER_NAME","hadoop"); FileSystem fs = FileSystem.get(conf); Job cf1job = Job.getInstance(conf); cf1job.setJarByClass(CFMR.class); cf1job.setMapperClass(CF1MRMapper.class); cf1job.setReducerClass(CF1MRReducer.class); cf1job.setMapOutputKeyClass(Text.class); cf1job.setMapOutputValueClass(Text.class); cf1job.setOutputKeyClass(Text.class); cf1job.setOutputValueClass(Text.class); Path inputPath1 = new Path(args[0]); Path outputPath1 = new Path(args[1]); FileInputFormat.setInputPaths(cf1job, inputPath1); if(fs.exists(outputPath1)){ fs.delete(outputPath1, true); } FileOutputFormat.setOutputPath(cf1job, outputPath1); Job cf2job = Job.getInstance(conf); cf2job.setJarByClass(CFMR.class); cf2job.setMapperClass(CF2MRMapper.class); cf2job.setReducerClass(CF2MRReducer.class); cf2job.setMapOutputKeyClass(Text.class); cf2job.setMapOutputValueClass(Text.class); cf2job.setOutputKeyClass(Text.class); cf2job.setOutputValueClass(Text.class); Path inputPath2 = new Path(args[1]); Path outputPath2 = new Path(args[2]); FileInputFormat.setInputPaths(cf2job, inputPath2); if(fs.exists(outputPath2)){ fs.delete(outputPath2,true); } FileOutputFormat.setOutputPath(cf2job, outputPath2); /** * 提交 * 改進提交方式: 讓多個具有依賴關係的任務進行串聯執行 * * 比如: job2的執行要依賴於job1的執行結果。那就表示job2的執行必須在job1的後面。而且也必須要等到job1執行完成之後才能提交任務執行 * * 使用一種新的方式去管理這些任務的依賴關係以及提交(完整提交) */ ControlledJob job1 = new ControlledJob(cf1job,null); List<ControlledJob> dpdsJobs = new ArrayList<>(); dpdsJobs.add(job1); ControlledJob job2 = new ControlledJob(cf2job,dpdsJobs); JobControl jc = new JobControl("cf mr"); jc.addJob(job1); jc.addJob(job2); Thread cfThread = new Thread(jc); cfThread.start(); while(true){ boolean allFinished = jc.allFinished(); Thread.sleep(2000); System.out.println("任務是否完成:"+allFinished); if(allFinished){ break; } } System.exit(0); } private static class CF1MRMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text keyOut = new Text(); private Text valueOut = new Text(); /** * value : A:B,C,D,F,E,O */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":"); String outValue = split[0]; valueOut.set(outValue); String[] keys = split[1].split(","); for(String outKey : keys){ keyOut.set(outKey); context.write(keyOut, valueOut); } } } private static class CF1MRReducer extends Reducer<Text, Text, Text, Text>{ private Text valueOut = new Text(); /** * 一次reduce方法呼叫的時候獲取到的引數: * * E A E B E G E ... */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text t : values){ sb.append(t.toString()).append("-"); } String outValue = sb.toString().substring(0, sb.toString().length()-1); valueOut.set(outValue); /** * key : E * value : A-B-G-... */ context.write(key, valueOut); } } private static class CF2MRMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text keyOut = new Text(); private Text valueOut = new Text(); /** * key : 起始偏移量 * value : E A-B-G-... * * A-B-G-H */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String outValue = split[0]; valueOut.set(outValue); List<String> userList = new ArrayList<>(); for(String t : split[1].split("-")){ userList.add(t); } /** * 為什麼要排序? * * E A-B-G-H C B-A-H-G -----排序------> A-B-G-H * * A-B E B-A C * * A-B E A-B C * * A-B === B-A */ Collections.sort(userList); int size = userList.size(); for(int i = 0; i < size-1; i++){ for(int j = i+1; j < size; j++){ String outKey = userList.get(i)+"-"+userList.get(j); keyOut.set(outKey); /** * key : A-B * value : E */ context.write(keyOut, valueOut); } } } } private static class CF2MRReducer extends Reducer<Text, Text, Text, Text>{ private Text valueOut = new Text(); /** * key : A-B * value : E C G H */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text t : values){ sb.append(t.toString()).append(","); } String outValue = sb.toString().substring(0, sb.toString().length()-1); valueOut.set(outValue); context.write(key, valueOut); } } }