MapReduce的兩表join一般操作
案例:(部門員工兩表的join查詢)
原始資料
員工表(emp):
empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark manager 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-22 1250 1400 30 boston
7900 james clerk 7698 1981-03-20 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20
部門表(dep):
deptno dname loc
30 sales chicago
20 research dallas
10 accouting newyork
實現的功能類似於:select e.empno,e.ename,d.deptno,dname from emp e join dept d on e.deptno=d.deptno;
用join on後面的欄位作為key
一對多關係
解析:
最後輸出的結果包含兩張表共四個屬性(員工id,員工姓名,部門id(外來鍵),部門名稱)
我們可以將部門id作為map傳值的key,將四個屬性構造一個JavaBean作為map傳值的value,其中自定義的JavaBean中除了包含四個屬性外,還應有區分是員工表還是部門表的欄位flag.
1.JavaBean
/* *實現join的兩張表通用的一個bean,並且bean中加一個通用的標識flag,用於區分兩張表 *實現writableCompare介面(由於資料要在網路上傳輸必須序列化,hadoop處理的時候需要分組和排序) */ public class Bean implements WritableComparable<Bean> { // 兩個表共查詢的屬性 private String empno = " "; private String empname = " "; private String depno = " "; private String depname = " "; private int flag = 0; // 0:部門 1:員工 public Bean() { } public Bean(String empno, String empname, String depno, String depname, int flag) { super(); this.empno = empno; this.empname = empname; this.depno = depno; this.depname = depname; this.flag = flag; } public Bean(Bean bean) { this.empno = bean.getEmpno(); this.empname = bean.getEmpname(); this.depno = bean.getDepno(); this.depname = bean.getDepname(); this.flag = bean.getFlag(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub // 寫資料 out.writeUTF(empno); out.writeUTF(empname); out.writeUTF(depno); out.writeUTF(depname); out.writeInt(flag); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub // 讀資料 this.empno = in.readUTF(); this.empname = in.readUTF(); this.depno = in.readUTF(); this.depname = in.readUTF(); this.flag = in.readInt(); } @Override public String toString() { return "empno=" + empno + ", empname=" + empname + ", depno=" + depno + ", depname=" + depname; } @Override public int compareTo(Bean arg0) { // TODO Auto-generated method stub return 0; } public String getEmpno() { return empno; } public void setEmpno(String empno) { this.empno = empno; } public String getEmpname() { return empname; } public void setEmpname(String empname) { this.empname = empname; } public String getDepno() { return depno; } public void setDepno(String depno) { this.depno = depno; } public String getDepname() { return depname; } public void setDepname(String depname) { this.depname = depname; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } }
2.Map類( map的輸出的key為join時員工表的deptno,輸出的value為物件Bean,reduce時 會將兩個相同的key組成一起)
/*
* map的輸出的key為join時員工表的deptno,輸出的value為物件Bean
* reduce時 會將兩個相同的key組成一起
*/
/*
* 將emp和dep定義相同的bean來處理
*/
public class StaffDepMap extends Mapper<LongWritable, Text, IntWritable, Bean> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, IntWritable, Bean>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] str = line.split("\t");
// 判斷表的型別
if (str.length == 3) {
// 部門資料
Bean dep = new Bean();
dep.setDepno(str[0]);
dep.setDepname(str[1]);
dep.setFlag(0);
// 傳遞部門資料
context.write(new IntWritable(Integer.parseInt(str[0])), dep);
} else {
// 員工資料
Bean emp = new Bean();
emp.setEmpno(str[0]);
emp.setEmpname(str[1]);
emp.setDepno(str[7]);
emp.setFlag(1);
// 傳遞員工資料
context.write(new IntWritable(Integer.parseInt(str[7])), emp);
}
}
}
3.Reduce類(輸入的即是兩個表的depno)
public class StaffDepRedu extends
Reducer<IntWritable, Bean, NullWritable, Text> {
@Override
protected void reduce(IntWritable key, Iterable<Bean> values,
Reducer<IntWritable, Bean, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
Bean dep = null;
List<Bean> emps = new ArrayList<Bean>();
for (Bean bean : values) {
if (bean.getFlag() == 0) {
// 部門資料
dep = new Bean(bean); // 重新構造物件
} else {
// 員工資料
emps.add(new Bean(bean));
}
}
// 給員工資料list新增部門的dname
for (Bean emp : emps) {
emp.setDepname(dep.getDepname());
context.write(NullWritable.get(), new Text(emp.toString()));
}
}
}
4.job類
public class StaffDepMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(StaffDepMain.class);
job.setMapperClass(StaffDepMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Bean.class);
job.setReducerClass(StaffDepRedu.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}