mapreduce中加入combiner
阿新 • • 發佈:2018-07-24
combine mage rim opened alt 不用 一次 apr configure
combiner相當於是一個本地的reduce,它的存在是為了減少網絡的負擔,在本地先進行一次計算再叫計算結果提交給reduce進行二次處理。
現在的流程為:
對於combiner我們有這些理解:
Mapper代碼展示:
package com.nenu.mprd.test; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;View Codeimport org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MyMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException { // TODO Auto-generated method stub //獲取到單詞 String line=value.toString(); String[] words=line.split(" "); //獲取到文件名 FileSplit filesplit = (FileSplit)context.getInputSplit(); String fileName = filesplit.getPath().getName().trim();//.substring(0,5). String outkey=null; for (String word : words) { //字母+:+文件名 outkey=word.trim()+":"+fileName; System.out.println("map:"+outkey); context.write(new Text(outkey), new Text("1")); } } }
Combiner代碼展示:
package com.nenu.mprd.test; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyCombiner extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { Text n = null;//輸出key int count=0; Text m=null;//輸出value for(Text v :values){ //對同一個map輸出的k,v對進行按k進行一次匯總。不同map的k,v匯總必須要用reduce方法 String[] words=key.toString().split(":"); n=new Text(words[0].trim());//字母--key System.out.println("MyCombiner KEY:"+n); count+=Integer.parseInt(v.toString()); m=new Text("("+words[1].trim()+" "+count+")"); } System.out.println("MyCombiner value:"+m); context.write(n, m); } }View Code
Reduce代碼展示:
package com.nenu.mprd.test; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub System.out.println("reduce: key"+key); String out=""; for (Text Text : values) { //sum+=intWritable.get(); out+=Text.toString()+" "; } System.out.println("reduce value:"+out); context.write(key, new Text(out)); } }View Code
Job代碼展示:
package com.nenu.mprd.test; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool{ public static void main(String[] args) throws Exception { MyJob myJob=new MyJob(); ToolRunner.run(myJob, null); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.64.141:9000"); //添加自動刪除hadoop下的文件 //如果導成架包則需要改變一些參數作為手動輸入 FileSystem filesystem =FileSystem.get(new URI("hdfs://192.168.64.141:9000"), conf, "root"); Path deletePath=new Path("/hadoop/wordcount/city/out"); if(filesystem.exists(deletePath)){ filesystem.delete(deletePath,true);//str: b: } Job job=Job.getInstance(conf); job.setJarByClass(MyJob.class); job.setMapperClass(MyMap.class); //設置combiner 如果combiner和reduce一樣則可以不用設置 job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/hadoop/wordcount/city")); FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordcount/city/out")); job.waitForCompletion(true); return 0; } }View Code
mapreduce中加入combiner