1. 程式人生 > >MR計算模型二

MR計算模型二

mapreduce函式的編寫  1

map函式

繼承Mapper<Object, Object, Object, Object>

重寫public void map(Object key, Object value, Context context) throws IOException, InterruptedException 方法

map函式主要用於資料的清洗和原始處理

map函式的輸入輸出

map函式每執行一次,處理一條資料

map的輸入,key預設是行號的偏移量,value是一行的內容

context.write(Object, Object)方法輸出

map的輸出是reduce的輸入

mapreduce函式的編寫  2

reduce函式

繼承Reducer<Object, Object, Object, Object>

重寫public void reduce(Object key, Iterable<Object> values, Context context) throws IOException, InterruptedException 方法 reduce函式是主要的業務處理和資料探勘部分

reduce函式的輸入輸出

context.write(data, new IntWritable(1))方法輸出

reduce的輸入時map的輸出,但不是直接輸出,而是按照相同key彙總過後的集合

context.write(Object, Object)方法輸出

mapreduce函式的編寫  3

編寫job

 logger.warn("HelloHadoopSort已啟動");
        Configuration coreSiteConf = new Configuration();
		coreSiteConf.addResource(Resources.getResource("core-site.xml"));

        Job job = Job.getInstance(coreSiteConf, "HelloHadoopSort");
        job.setJarByClass(HelloHadoopSort.class);
        //設定Map和Reduce處理類
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        //設定map輸出型別
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/sort/input"));
        FileOutputFormat.setOutputPath(job, new Path("/sort/output"));
        boolean flag = job.waitForCompletion(true);
        logger.warn("HelloHadoopSort已完成,執行結果:" + flag);

WordCountMap類繼承了

org.apache.hadoop.mapreduce.Mapper,4個泛型型別分別是map函式輸入key的型別,輸入value的型別,輸出key的型別,輸出value的型別。

WordCountReduce類繼承了org.apache.hadoop.mapreduce.Reducer,4個泛型型別含義與map類相同。

map的輸出型別與reduce的輸入型別相同,而一般情況下,map的輸出型別與reduce的輸出型別相同,因此,reduce的輸入型別與輸出型別相同。

在map中,讀取一行內容,按照空格分組,得到一行中的每個單詞,把單詞做為key輸出,value的內容可以為空或任意內容。

在reduce中,獲取到某個單詞及所有集合,集合的尺寸即是該單詞出現的數量,把單詞及其數量輸出到hdfs中