1. 程式人生 > >大數據筆記(九)——Mapreduce的高級特性(B)

大數據筆記(九)——Mapreduce的高級特性(B)

try getc 引入 個數 turn tsa tasks throw ()

二.排序

對象排序

員工數據 Employee.java ----> 作為key2輸出

需求:按照部門和薪水升序排列

Employee.java

package mr.object;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//?????: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{ private int empno; private String ename; private String job; private int mgr; private String hiredate; private int sal; private int comm; private int deptno;// @Override // public int compareTo(Employee o) {
// // 一個列的排序規則:按照員工的薪水排序 // if(this.sal >= o.getSal()){ // return 1; // }else{ // return -1; // } // } @Override public int compareTo(Employee o) { // 兩個列排序規則:部門 if(this.deptno > o.getDeptno()){ return 1; }else
if(this.deptno < o.getDeptno()){ return -1; } //薪水 if(this.sal >= o.getSal()){ return 1; }else{ return -1; } } @Override public String toString() { return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]"; } @Override public void write(DataOutput output) throws IOException { output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); } @Override public void readFields(DataInput input) throws IOException { this.empno = input.readInt(); this.ename = input.readUTF(); this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public int getMgr() { return mgr; } public void setMgr(int mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } }

EmployeeSortMapper.java

package mr.object;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                   Key2
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {

    @Override
    protected void map(LongWritable key1, Text value1,Context context)
            throws IOException, InterruptedException {
        // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();
        
        //分詞
        String[] words = data.split(",");
        
        //創建員工對象
        Employee e = new Employee();
        
        //員工號
        e.setEmpno(Integer.parseInt(words[0]));
        //員工姓名
        e.setEname(words[1]);
        
        //job
        e.setJob(words[2]);
        
        //經理號:註意 有些員工沒有經理
        try{
            e.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex){
            //null
            e.setMgr(0);
        }
        
        //入職日期
        e.setHiredate(words[4]);
        
        //薪水
        e.setSal(Integer.parseInt(words[5]));
        
        //獎金
        try{
            e.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex){
            //無獎金
            e.setComm(0);
        }
        
        //部門
        e.setDeptno(Integer.parseInt(words[7]));
        
        
        //輸出key2
        context.write(e, NullWritable.get());
    }
}

EmployeeSortMain.java

package mr.object;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeSortMain {

    public static void main(String[] args) throws Exception {
        // job = map + reduce
        Job job = Job.getInstance(new Configuration());
        //?任務入口
        job.setJarByClass(EmployeeSortMain.class);
        
        job.setMapperClass(EmployeeSortMapper.class);
        job.setMapOutputKeyClass(Employee.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        job.setReducerClass(EmployeeSortReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class); 
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //執行任務
        job.waitForCompletion(true);
    }

}

結果:

技術分享圖片

三.分區分區:Partition:

根據Map的輸出(k2 v2)進行分區

默認情況下,MapReduce只有一個分區(只有一個輸出文件)

作用:提高查詢的效率

建立分區:根據條件的不同

技術分享圖片

需求:按照員工的部門號進行分區,相同部門號的員工輸出到一個分區中

EmpPartionMapper.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//k2部門號 v2 員工對象
public class EmpPartionMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        
        // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
                String data = value1.toString();
                
                String[] words = data.split(",");
                
                Employee e = new Employee();
                
                e.setEmpno(Integer.parseInt(words[0]));

                e.setEname(words[1]);

                e.setJob(words[2]);
                
                try{
                    e.setMgr(Integer.parseInt(words[3]));
                }catch(Exception ex){
                    //null
                    e.setMgr(0);
                }
                
                e.setHiredate(words[4]);
                
                e.setSal(Integer.parseInt(words[5]));
                
                try{
                    e.setComm(Integer.parseInt(words[6]));
                }catch(Exception ex){
                    e.setComm(0);
                }
                
                e.setDeptno(Integer.parseInt(words[7]));
                
                //輸出 k2是部門號 v2是員工對象
                context.write(new LongWritable(e.getDeptno()), e);
    }
}

EmpPartionReducer.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

//把相同部門的員工輸出到HDFS                                                                                                                          K4: 部門號 v4: 員工對象
public class EmpPartionReducer extends Reducer<LongWritable, Employee, LongWritable, Employee>{

    @Override
    protected void reduce(LongWritable k3, Iterable<Employee> v3, Context context)
            throws IOException, InterruptedException {
        for (Employee e : v3) {
            context.write(k3, e);
        }
    }
    
}

MyEmployeePartitioner.java

package demo.partion;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//分區規則:根據Map的輸出建立分區                                                                                              k2            v2
public class MyEmployeePartitioner extends Partitioner<LongWritable, Employee>{

    /* 
     * numParts 分區個數
     */
    @Override
    public int getPartition(LongWritable k2, Employee v2, int numParts) {
        //分區規則
        int deptno = v2.getDeptno();
        if (deptno == 10) {
            //放入一號分區
            return 1%numParts;
        }else if (deptno == 20) {
            //放入二號分區
            return 2%numParts;
        }else {
            //放入0號分區
            return 3%numParts;
        }
    }
}

EmpPartitionMain.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpPartitionMain {

    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(EmpPartitionMain.class);
        
        job.setMapperClass(EmpPartionMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Employee.class);
        
        //指定分區規則
        job.setPartitionerClass(MyEmployeePartitioner.class);
        //指定分區的個數
        job.setNumReduceTasks(3);
        
        job.setReducerClass(EmpPartionReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.waitForCompletion(true);

    }

}

結果:建立了三個分區

技術分享圖片

一號分區:

技術分享圖片

二號分區:

技術分享圖片

0號分區:

技術分享圖片

四.合並:Combiner

1、MapReduce的任務中,可以沒有Combiner
2、Combiner是一種特殊的Reducer,是在Mapper端先做一次Reducer,用來減少Map的輸出,從而提高的效率。
3、註意事項:
(1)有些情況,不能使用Combiner -----> 求平均值
(2)引入Combiner,不引人Combiner,一定不能改變原理的邏輯。(MapReduce編程案例:實現倒排索引)

技術分享圖片

WordCountMapper.java

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        
        //取出數據: I love beijing
        String data = v1.toString();
        
        //分詞
        String[] words = data.split(" ");
        
        //輸出K2:單詞  V2:記一次數
        for (String w : words) {
            context.write(new Text(w), new LongWritable(1));
        }
        
    }
    
}

WordCountReducer.java

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text k3, Iterable<LongWritable> v3,
            Context context) throws IOException, InterruptedException {
        long total = 0;
        for (LongWritable l : v3) {
            total = total + l.get();
        }
        
        //輸出K4 V4
        context.write(k3, new LongWritable(total));
    }
    
}

WordCountMain.java:增加Combiner

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WordCountMain.class);
    
        //Mapper
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//指定k2
        job.setMapOutputValueClass(LongWritable.class);//指定v2
        
        //Combiner
        job.setCombinerClass(WordCountReducer.class);
        
        //?reducer
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //?mapper/reducer路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //?執行任務
        job.waitForCompletion(true);
    }        
}

大數據筆記(九)——Mapreduce的高級特性(B)