MapReduce簡單例項解析map、reduce、combiner、partition一條龍
阿新 • • 發佈:2019-02-12
需求:通過MapReduce對紅樓夢TXT檔案統計笑、喜、哭、怒在全書的數量,使用combiner減少IO,通過partition輸出到兩個檔案中。
通過MapReduce外掛建立MapReduce project,這樣需要的包都會自動匯入
主函式:
package com.zhiyou100;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat ;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MyApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapreduce.output.textoutputformat.separator" , ":");
Path inputPath = new Path("hdfs://master:9000/mark/hlm-utf8.txt");
Path outputPath = new Path("hdfs://master:9000/result/hml02");
FileSystem fs = FileSystem.newInstance(conf);
// 如果檔案已存在就刪除
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
fs.close();
// job相當於一個model
Job job = Job.getInstance(conf, "HLM");
job.setJarByClass(MyApp.class);
// 指定輸入目錄
FileInputFormat.addInputPath(job, inputPath);
// 指定對輸入資料進行格式化處理的類(可以省略)
job.setInputFormatClass(TextInputFormat.class);
// 指定自定義的Mapper類
job.setMapperClass(MyMapper.class);
// map輸入
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 分割槽(可以省略)
job.setPartitionerClass(MyPartition.class);
// 設定要執行的Reducer的數量(可以省略)
job.setNumReduceTasks(2);
// 指定自定義的Reducer類
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
// 泛型類在編譯時會被當成?所以要指定
// 指定map輸出的<K,V>型別(如果<k3,v3>的型別與<k2,v2>的型別一致則可以省略)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定輸出目錄
FileOutputFormat.setOutputPath(job, outputPath);
// 指定對輸出資料進行格式化處理的類(可以省略)
job.setOutputFormatClass(TextOutputFormat.class);
// 把任務提交到叢集,輪尋方式
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
mapper:
package com.zhiyou100;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
static {
System.out.println("my_-mapper");
}
private IntWritable num = new IntWritable();
private Text word = new Text();
private int no = 0;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "《 》 、 ! , 。 ? :; “ ” ‘ ’ ");
while (st.hasMoreElements()) {
String text = st.nextElement().toString().trim();
no += 1;
context.getCounter("ZY", "statement").increment(1);
if (text.contains("笑")) {
word.set("笑");
num.set(no);
context.write(word, num);
}
if (text.contains("喜")) {
word.set("喜");
num.set(no);
context.write(word, num);
}
if (text.contains("哭")) {
word.set("哭");
num.set(no);
context.write(word, num);
}
if (text.contains("怒")) {
word.set("怒");
num.set(no);
context.write(word, num);
}
}
}
}
reduce:
package com.zhiyou100;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
static {
System.out.println("my_-reducer");
}
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
combiner:
package com.zhiyou100;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
static {
System.out.println("my_-combiner");
}
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val : values) {
sum += 1;
}
result.set(sum);
context.write(key, result);
}
}
partition:
package com.zhiyou100;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<Text, IntWritable>{
static {
System.out.println("my_-partition");
}
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
if(key.toString().contains("笑") || key.toString().contains("喜")) {
return 0;
}else {
return 1;
}
}
}
輸出結果:
通過每個類上定義的靜態方法列印的日誌也可以看出job在呼叫MapReduce 及 combiner partition的先後順序
my_-mapper –> my_-combiner –> my_-partition –> my_-reducer