Hadoop MapReduce開發--兩個輸入資料關聯
阿新 • • 發佈:2019-01-12
測試資料:
dept.txt
#deptno dname loc
30 sales chicago
20 research dallas
10 accounting newyork
employee.txt
#empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark managers 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-20 1250 1400 30 boston
7900 james clerk 7698 1981-01-09 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20
Emplyee:
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Emplyee implements WritableComparable<Emplyee> { private String empno = ""; private String ename = ""; private String deptno = ""; private String dname = ""; private int flag = 0;//0=員工/1=部門 public Emplyee() { } public Emplyee(String empno, String ename, String deptno, String dname, int flag) { this.empno = empno; this.ename = ename; this.deptno = deptno; this.dname = dname; this.flag = flag; } public Emplyee(Emplyee e) { this.empno = e.getEmpno(); this.ename = e.getEname(); this.deptno = e.getDeptno(); this.dname = e.getDname(); this.flag = e.getFlag(); } @Override public int compareTo(Emplyee o) { return 0; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(empno); dataOutput.writeUTF(ename); dataOutput.writeUTF(deptno); dataOutput.writeUTF(dname); dataOutput.writeInt(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.empno = dataInput.readUTF(); this.ename = dataInput.readUTF(); this.deptno = dataInput.readUTF(); this.dname = dataInput.readUTF(); this.flag = dataInput.readInt(); } @Override public String toString() { return this.empno + " " + this.ename + " " + this.dname + " " + this.deptno; } public String getEmpno() { return empno; } public String getEname() { return ename; } public String getDeptno() { return deptno; } public String getDname() { return dname; } public int getFlag() { return flag; } public void setEmpno(String empno) { this.empno = empno; } public void setEname(String ename) { this.ename = ename; } public void setDeptno(String deptno) { this.deptno = deptno; } public void setDname(String dname) { this.dname = dname; } public void setFlag(int flag) { this.flag = flag; } }
mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class JoinOneMapper extends Mapper<LongWritable, Text, IntWritable, Emplyee> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if(!line.startsWith("#")) { String[] arr = line.split("\t"); if(arr.length == 3) {//部門資料 Emplyee e = new Emplyee(); e.setDeptno(arr[0]); e.setDname(arr[1]); e.setFlag(0); context.write(new IntWritable(Integer.valueOf(arr[0])), e); } else {//員工資訊 Emplyee e = new Emplyee(); e.setEmpno(arr[0]); e.setEname(arr[1]); //e.setDeptno(arr[7]); e.setFlag(1); context.write(new IntWritable(Integer.valueOf(arr[7])), e); } } } }
reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JoinOneReducer extends Reducer<IntWritable, Emplyee, NullWritable, Text> {
@Override
protected void reduce(IntWritable key, Iterable<Emplyee> values, Context context) throws IOException, InterruptedException {
Emplyee dept = null;
List<Emplyee> list = new ArrayList<Emplyee>();
for(Emplyee val : values) {
if(val.getFlag() == 0) {//部門資訊
dept = new Emplyee(val);
} else if(val.getFlag() == 1) {//員工資訊
list.add(new Emplyee(val));
}
}
//迴圈員工資訊列表,依次輸出資訊
for(Emplyee emp : list) {
emp.setDname(dept.getDname());
emp.setDeptno(dept.getDeptno());
context.write(NullWritable.get(), new Text(emp.toString()));
}
}
}
job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 兩個輸入資料關聯。
* 輸出資料格式類似如下:
* select e.empno, e.ename, d.dname,d.deptno from emp e join dept d on e.deptno = d.deptno;
*
* 思路:
* key:deptno
* 思路1:value(Text):empno_ename_0/deptno_dname_1
*
* 思路2:自定義實體bean(欄位:empno、ename/deptno/dname/flag)
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length != 2) {
System.err.println("Usage: Join<input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Join job1");
job.setJarByClass(JobMain.class);
job.setMapperClass(JoinOneMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emplyee.class);
job.setReducerClass(JoinOneReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outDirPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDirPath)) {
fs.delete(outDirPath, true);
}
FileOutputFormat.setOutputPath(job, outDirPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
結果:
7782 clark accounting 10
7788 scott research 20
7900 james sales 30
7654 martin sales 30
7499 allen sales 30