結合案例講解MapReduce重要知識點 -------- 記憶體排序
阿新 • • 發佈:2018-12-20
TOP N
資料:
hello qianfeng
hello qianfeng
qianfeng is best
qianfeng better
hadoop is good
spark is nice
取統計後的前三名: qianfeng 4 is 3 hello 2
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MemSort extends ToolRunner implements Tool{ /** * 自定義的myMapper */ static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text k = new Text(); Text v = new Text("1"); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String lines [] = line.split(" "); for (String s : lines) { k.set(s); context.write(k, v); } } } /** * 自定義MyReducer */ static class MyReducer extends Reducer<Text, Text, Text, Text>{ List<String> li = new ArrayList<String>(); @Override protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException, InterruptedException { int counter = 0; for (Text t : value) { counter += Integer.parseInt(t.toString()); } //context.write(new Text(counter+""), key); li.add(key.toString()+"_"+counter); /** * li(qianfeng_4,is_3,hello_2) */ } @Override protected void cleanup(Context context)throws IOException, InterruptedException { //對ist中的元素的第二個進行排序 for (int i = 0; i < li.size()-1; i++) { for (int j = i+1; j < li.size(); j++) { //判斷 if(Integer.parseInt(li.get(i).split("_")[1]) < Integer.parseInt(li.get(j).split("_")[1])){ String tmp = ""; tmp = li.get(i); li.set(i, li.get(j)); li.set(j, tmp); } } } //輸出 for (int i = 0; i < 3; i++) { String l [] = li.get(i).split("_"); context.write(new Text(l[0]), new Text(l[1])); } } } @Override public void setConf(Configuration conf) { conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); } @Override public Configuration getConf() { return new Configuration(); } /** * 驅動方法 */ @Override public int run(String[] args) throws Exception { //1、獲取conf物件 Configuration conf = getConf(); //2、建立job Job job = Job.getInstance(conf, "model01"); //3、設定執行job的class job.setJarByClass(MemSort.class); //4、設定map相關屬性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); //5、設定reduce相關屬性 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //判斷輸出目錄是否存在,若存在則刪除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //6、提交執行job int isok = job.waitForCompletion(true) ? 0 : 1; return isok; } /** * job的主入口 * @param args */ public static void main(String[] args) { try { //對輸入引數作解析 String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs(); System.exit(ToolRunner.run(new MemSort(), argss)); } catch (Exception e) { e.printStackTrace(); } } }