1. 程式人生 > >MapReduce簡單例項解析map、reduce、combiner、partition一條龍

MapReduce簡單例項解析map、reduce、combiner、partition一條龍

需求:通過MapReduce對紅樓夢TXT檔案統計笑、喜、哭、怒在全書的數量,使用combiner減少IO,通過partition輸出到兩個檔案中。
通過MapReduce外掛建立MapReduce project,這樣需要的包都會自動匯入

主函式:

package com.zhiyou100;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.output.textoutputformat.separator"
, ":"); Path inputPath = new Path("hdfs://master:9000/mark/hlm-utf8.txt"); Path outputPath = new Path("hdfs://master:9000/result/hml02"); FileSystem fs = FileSystem.newInstance(conf); // 如果檔案已存在就刪除 if (fs.exists(outputPath)) { fs.delete(outputPath, true); } fs.close(); // job相當於一個model Job job = Job.getInstance(conf, "HLM"); job.setJarByClass(MyApp.class); // 指定輸入目錄 FileInputFormat.addInputPath(job, inputPath); // 指定對輸入資料進行格式化處理的類(可以省略) job.setInputFormatClass(TextInputFormat.class); // 指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // map輸入 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 分割槽(可以省略) job.setPartitionerClass(MyPartition.class); // 設定要執行的Reducer的數量(可以省略) job.setNumReduceTasks(2); // 指定自定義的Reducer類 job.setReducerClass(MyReducer.class); job.setCombinerClass(MyCombiner.class); // 泛型類在編譯時會被當成?所以要指定 // 指定map輸出的<K,V>型別(如果<k3,v3>的型別與<k2,v2>的型別一致則可以省略) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定輸出目錄 FileOutputFormat.setOutputPath(job, outputPath); // 指定對輸出資料進行格式化處理的類(可以省略) job.setOutputFormatClass(TextOutputFormat.class); // 把任務提交到叢集,輪尋方式 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

mapper:

package com.zhiyou100;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
    static {
        System.out.println("my_-mapper");
    }

    private IntWritable num = new IntWritable();
    private Text word = new Text();
    private int no = 0;

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        StringTokenizer st = new StringTokenizer(value.toString(), "《 》 、 ! , 。 ? :;  “ ” ‘ ’ ");
        while (st.hasMoreElements()) {
            String text = st.nextElement().toString().trim();
            no += 1;
            context.getCounter("ZY", "statement").increment(1);

            if (text.contains("笑")) {
                word.set("笑");
                num.set(no);
                context.write(word, num);
            }
            if (text.contains("喜")) {
                word.set("喜");
                num.set(no);
                context.write(word, num);
            }

            if (text.contains("哭")) {
                word.set("哭");
                num.set(no);
                context.write(word, num);
            }
            if (text.contains("怒")) {
                word.set("怒");
                num.set(no);
                context.write(word, num);
            }
        }

    }

}

reduce:

package com.zhiyou100;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    static {
        System.out.println("my_-reducer");
    }
    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        int sum = 0;
        for(IntWritable val : values) {
            sum += val.get();
        }

        result.set(sum);
        context.write(key, result);

    }
}

combiner:

package com.zhiyou100;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    static {
        System.out.println("my_-combiner");
    }

    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        int sum = 0;
        for(IntWritable val : values) {
            sum += 1;
        }

        result.set(sum);
        context.write(key, result);

    }
}

partition:

package com.zhiyou100;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartition extends Partitioner<Text, IntWritable>{
    static {
        System.out.println("my_-partition");
    }
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        if(key.toString().contains("笑") || key.toString().contains("喜")) {
            return 0;
        }else {
            return 1;
        }
    }

}

輸出結果:
這裡寫圖片描述

通過每個類上定義的靜態方法列印的日誌也可以看出job在呼叫MapReduce 及 combiner partition的先後順序
my_-mapper –> my_-combiner –> my_-partition –> my_-reducer