1. 程式人生 > >mapreduce中加入combiner

mapreduce中加入combiner

combine mage rim opened alt 不用 一次 apr configure

combiner相當於是一個本地的reduce,它的存在是為了減少網絡的負擔,在本地先進行一次計算再叫計算結果提交給reduce進行二次處理。

現在的流程為:

技術分享圖片

對於combiner我們有這些理解:

技術分享圖片

技術分享圖片

Mapper代碼展示:

技術分享圖片
package com.nenu.mprd.test;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MyMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException { // TODO Auto-generated method stub //獲取到單詞 String line=value.toString(); String[] words=line.split(" "); //獲取到文件名 FileSplit filesplit = (FileSplit)context.getInputSplit(); String fileName = filesplit.getPath().getName().trim();//
.substring(0,5). String outkey=null; for (String word : words) { //字母+:+文件名 outkey=word.trim()+":"+fileName; System.out.println("map:"+outkey); context.write(new Text(outkey), new Text("1")); } } }
View Code

Combiner代碼展示:

技術分享圖片
package com.nenu.mprd.test;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyCombiner extends Reducer<Text, Text, Text, Text>{
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
           Text n = null;//輸出key
           int count=0;
           Text m=null;//輸出value
           for(Text v :values){ //對同一個map輸出的k,v對進行按k進行一次匯總。不同map的k,v匯總必須要用reduce方法
                 String[] words=key.toString().split(":");
                 n=new Text(words[0].trim());//字母--key
                 System.out.println("MyCombiner KEY:"+n);
                 
                 count+=Integer.parseInt(v.toString());
                 m=new Text("("+words[1].trim()+" "+count+")");
                 
           }
           System.out.println("MyCombiner value:"+m);
           context.write(n, m);
    }

}
View Code

 Reduce代碼展示:

技術分享圖片
package com.nenu.mprd.test;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReduce extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        System.out.println("reduce: key"+key);
        String out="";
        for (Text Text : values) {
            //sum+=intWritable.get();
            out+=Text.toString()+" ";
        }
        System.out.println("reduce value:"+out);
        context.write(key, new Text(out));
    }
}
View Code

 Job代碼展示:

技術分享圖片
package com.nenu.mprd.test;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool{
    
    public static void main(String[] args) throws Exception {
        MyJob myJob=new MyJob();
        ToolRunner.run(myJob, null);
    }
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.64.141:9000");
        
        //添加自動刪除hadoop下的文件
        //如果導成架包則需要改變一些參數作為手動輸入
        FileSystem filesystem =FileSystem.get(new URI("hdfs://192.168.64.141:9000"), conf, "root");
        Path deletePath=new Path("/hadoop/wordcount/city/out");
        if(filesystem.exists(deletePath)){
            filesystem.delete(deletePath,true);//str:  b:
        }
        
        
        Job job=Job.getInstance(conf);
        job.setJarByClass(MyJob.class);
        job.setMapperClass(MyMap.class);
        
        //設置combiner 如果combiner和reduce一樣則可以不用設置
        job.setCombinerClass(MyCombiner.class);
        
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("/hadoop/wordcount/city"));
        FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordcount/city/out"));
        job.waitForCompletion(true);
        return 0;
    }

}
View Code

mapreduce中加入combiner