MapReduce之連線操作類應用
阿新 • • 發佈:2019-01-25
用MapReduce實現關係的自然連線
- 假設有關係R(A,B)和S(B,C),對二者進行自然連線操作
- 使用Map過程,把來自R的每個元組
<a,b>
轉換成一個鍵值對<b, <R,a>>
,其中的鍵就是屬性B的值。把關係R包含到值中,這樣做使得我們可以在Reduce階段,只把那些來自R的元組和來自S的元組進行匹配。類似地,使用Map過程,把來自S的每個元組<b,c>
,轉換成一個鍵值對<b,<S,c>>
- 所有具有相同B值的元組被髮送到同一個Reduce程序中,Reduce程序的任務是,把來自關係R和S的、具有相同屬性B值的元組進行合併
- Reduce程序的輸出則是連線後的元組
自然連線過程
應用示例
在HDFS中有兩個檔案,一個記錄了學生的基本資訊,包含了姓名和學號資訊,名為student_info.txt,內容為:
Jenny 00001
Hardy 00002
Bradley 00003
還有一個檔案記錄了學生的選課資訊表,包括了學號和課程名,名為student_class_info.txt,內容為:
00001 Chinese
00001 Math
00002 Music
00002 Math
00003 Physic
現在經join操作後,得出的結果為:
Jenny Chinese
Jenny Math
Hardy Music
Hardy Math
Bradley Physic
程式程式碼
JoinMapper
package com.test.join;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
private static final String STUDENT_FILENAME = "student_info.txt";
private static final String STUDENT_CLASS_FILENAME = "student_class_info.txt";
private static final String STUDENT_FLAG = "student";
private static final String STUDENT_CLASS_FLAG = "student_class";
private FileSplit fileSplit;
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
fileSplit = (FileSplit) context.getInputSplit();
String filePath = fileSplit.getPath().toString();
String line = value.toString();
String[] fields = StringUtils.split(line,"\t");
//判斷記錄來自哪個檔案
if (filePath.contains(STUDENT_FILENAME)) {
outKey.set(fields[1]);
outValue.set(STUDENT_FLAG + "\t" + fields[0]);
}
else if (filePath.contains(STUDENT_CLASS_FILENAME)) {
outKey.set(fields[0]);
outValue.set(STUDENT_CLASS_FLAG + "\t" + fields[1]);
}
context.write(outKey, outValue);
}
}
JoinReducer
package com.test.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
private static final String STUDENT_FLAG = "student";
private static final String STUDENT_CLASS_FLAG = "student_class";
private String fileFlag = null;
private String stuName = null;
private List<String> stuClassNames;
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
stuClassNames = new ArrayList<>();
for (Text val : values) {
String[] fields = StringUtils.split(val.toString(),"\t");
fileFlag = fields[0];
//判斷記錄來自哪個檔案,並根據檔案格式解析記錄。
if (fileFlag.equals(STUDENT_FLAG)) {
stuName = fields[1];
outKey.set(stuName);
}
else if (fileFlag.equals(STUDENT_CLASS_FLAG)) {
stuClassNames.add(fields[1]);
}
}
//求笛卡爾積
for (String stuClassName : stuClassNames) {
outValue.set(stuClassName);
context.write(outKey, outValue);
}
}
}
JoinRunner
package com.test.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinRunner extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Join");
job.setJarByClass(JoinRunner.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new JoinRunner(), args);
System.exit(res);
}
}
執行結果
Jenny Math
Jenny Chinese
Hardy Math
Hardy Music
Bradley Physic