1. 程式人生 > >Hadoop MapReduce開發--兩個輸入資料關聯

Hadoop MapReduce開發--兩個輸入資料關聯

測試資料:

dept.txt

#deptno    dname    loc
30    sales    chicago
20    research    dallas
10    accounting    newyork

employee.txt

#empno    ename    job            mgr        hiredate    sal        comm    deptno    loc
7499    allen    salesman    7698    1981-02-20    1600    300    30    
7782    clark    managers    7639    1981-06-09    2450        10    
7654    martin    salesman    7698    1981-03-20    1250    1400    30    boston
7900    james    clerk    7698    1981-01-09    950        30    
7788    scott    analyst    7566    1981-09-01    3000    100    20    

Emplyee:

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Emplyee implements WritableComparable<Emplyee> {
    private String empno = "";
    private String ename = "";
    private String deptno = "";
    private String dname = "";
    private int flag = 0;//0=員工/1=部門

    public Emplyee() {
    }

    public Emplyee(String empno, String ename, String deptno, String dname, int flag) {
        this.empno = empno;
        this.ename = ename;
        this.deptno = deptno;
        this.dname = dname;
        this.flag = flag;
    }

    public Emplyee(Emplyee e) {
        this.empno = e.getEmpno();
        this.ename = e.getEname();
        this.deptno = e.getDeptno();
        this.dname = e.getDname();
        this.flag = e.getFlag();
    }

    @Override
    public int compareTo(Emplyee o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(empno);
        dataOutput.writeUTF(ename);
        dataOutput.writeUTF(deptno);
        dataOutput.writeUTF(dname);
        dataOutput.writeInt(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.empno = dataInput.readUTF();
        this.ename = dataInput.readUTF();
        this.deptno = dataInput.readUTF();
        this.dname = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }

    @Override
    public String toString() {
        return this.empno + "   " + this.ename + "  " + this.dname + "  " + this.deptno;
    }

    public String getEmpno() {
        return empno;
    }

    public String getEname() {
        return ename;
    }

    public String getDeptno() {
        return deptno;
    }

    public String getDname() {
        return dname;
    }

    public int getFlag() {
        return flag;
    }

    public void setEmpno(String empno) {
        this.empno = empno;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public void setDeptno(String deptno) {
        this.deptno = deptno;
    }

    public void setDname(String dname) {
        this.dname = dname;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }
}

mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JoinOneMapper extends Mapper<LongWritable, Text, IntWritable, Emplyee> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString().trim();
        if(!line.startsWith("#")) {
            String[] arr = line.split("\t");
            if(arr.length == 3) {//部門資料
                Emplyee e = new Emplyee();
                e.setDeptno(arr[0]);
                e.setDname(arr[1]);
                e.setFlag(0);

                context.write(new IntWritable(Integer.valueOf(arr[0])), e);
            } else {//員工資訊
                Emplyee e = new Emplyee();
                e.setEmpno(arr[0]);
                e.setEname(arr[1]);
                //e.setDeptno(arr[7]);
                e.setFlag(1);

                context.write(new IntWritable(Integer.valueOf(arr[7])), e);
            }
        }
    }
}

reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JoinOneReducer extends Reducer<IntWritable, Emplyee, NullWritable, Text> {
    @Override
    protected void reduce(IntWritable key, Iterable<Emplyee> values, Context context) throws IOException, InterruptedException {
        Emplyee dept = null;
        List<Emplyee> list = new ArrayList<Emplyee>();
        for(Emplyee val : values) {
            if(val.getFlag() == 0) {//部門資訊
                dept = new Emplyee(val);
            } else if(val.getFlag() == 1) {//員工資訊
                list.add(new Emplyee(val));
            }
        }

        //迴圈員工資訊列表,依次輸出資訊
        for(Emplyee emp : list) {
            emp.setDname(dept.getDname());
            emp.setDeptno(dept.getDeptno());
            context.write(NullWritable.get(), new Text(emp.toString()));
        }
    }
}

job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 兩個輸入資料關聯。
 * 輸出資料格式類似如下:
 * select e.empno, e.ename, d.dname,d.deptno from emp e join dept d on e.deptno = d.deptno;
 *
 * 思路:
 * key:deptno
 * 思路1:value(Text):empno_ename_0/deptno_dname_1
 *
 * 思路2:自定義實體bean(欄位:empno、ename/deptno/dname/flag)
 */
public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        if(args.length != 2) {
            System.err.println("Usage: Join<input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Join job1");
        job.setJarByClass(JobMain.class);

        job.setMapperClass(JoinOneMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Emplyee.class);

        job.setReducerClass(JoinOneReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));

        Path outDirPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outDirPath)) {
            fs.delete(outDirPath, true);
        }
        FileOutputFormat.setOutputPath(job, outDirPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

結果:

7782   clark  accounting  10
7788   scott  research  20
7900   james  sales  30
7654   martin  sales  30
7499   allen  sales  30