結合案例講解MapReduce重要知識點 --------- 簡單排序
阿新 • • 發佈:2018-12-20
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * @author lyd *簡單排序 * *資料: * *899 *45 *654564 *432 *45236 *76 *654 *32 *643 *45 *754 *34 * * *詞頻統計並按照次數高低排序??取前三個?? *hello qianfeng hello qianfeng world hello hadoop hello qianfeng hadoop hello 4 qianfeng 3 hadoop 2 * */ public class SortSample { //自定義myMapper public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ //只在map方法執行之前執行一次。(僅執行一次) @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(new IntWritable(Integer.parseInt(value.toString())), new Text("")); } //map方法執行完後執行一次(僅執行一次) @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } /*//自定義myReducer public static class MyReducer extends Reducer<Text, Text, Text, Text>{ //在reduce方法執行之前執行一次。(僅一次) @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException, InterruptedException { context.write(new Text(value.toString()), new Text("")); } //在reduce方法執行之後執行一次。(僅一次) @Override protected void cleanup(Context context) throws IOException, InterruptedException { } }*/ /** * job的驅動方法 * @param args */ public static void main(String[] args) { try { //1、獲取Conf Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); //2、建立job Job job = Job.getInstance(conf, "model01"); //3、設定執行job的class job.setJarByClass(SortSample.class); //4、設定map相關屬性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); //5、設定reduce相關屬性 /*job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);*/ //判斷輸出目錄是否存在,若存在則刪除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //6、提交執行job int isok = job.waitForCompletion(true) ? 0 : 1; //退出 System.exit(isok); } catch (IOException | ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } } }