1. 程式人生 > >MapReduce之連線操作類應用

MapReduce之連線操作類應用

用MapReduce實現關係的自然連線

join
  • 假設有關係R(A,B)和S(B,C),對二者進行自然連線操作
  • 使用Map過程,把來自R的每個元組<a,b>轉換成一個鍵值對<b, <R,a>>,其中的鍵就是屬性B的值。把關係R包含到值中,這樣做使得我們可以在Reduce階段,只把那些來自R的元組和來自S的元組進行匹配。類似地,使用Map過程,把來自S的每個元組<b,c>,轉換成一個鍵值對<b,<S,c>>
  • 所有具有相同B值的元組被髮送到同一個Reduce程序中,Reduce程序的任務是,把來自關係R和S的、具有相同屬性B值的元組進行合併
  • Reduce程序的輸出則是連線後的元組

自然連線過程

joinProcess

應用示例

在HDFS中有兩個檔案,一個記錄了學生的基本資訊,包含了姓名和學號資訊,名為student_info.txt,內容為:

Jenny   00001
Hardy   00002
Bradley 00003

還有一個檔案記錄了學生的選課資訊表,包括了學號和課程名,名為student_class_info.txt,內容為:

00001   Chinese
00001   Math
00002   Music
00002   Math
00003   Physic

現在經join操作後,得出的結果為:

Jenny   Chinese
Jenny   Math
Hardy   Music
Hardy   Math
Bradley Physic

程式程式碼

JoinMapper

package com.test.join;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public
class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{ private static final String STUDENT_FILENAME = "student_info.txt"; private static final String STUDENT_CLASS_FILENAME = "student_class_info.txt"; private static final String STUDENT_FLAG = "student"; private static final String STUDENT_CLASS_FLAG = "student_class"; private FileSplit fileSplit; private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { fileSplit = (FileSplit) context.getInputSplit(); String filePath = fileSplit.getPath().toString(); String line = value.toString(); String[] fields = StringUtils.split(line,"\t"); //判斷記錄來自哪個檔案 if (filePath.contains(STUDENT_FILENAME)) { outKey.set(fields[1]); outValue.set(STUDENT_FLAG + "\t" + fields[0]); } else if (filePath.contains(STUDENT_CLASS_FILENAME)) { outKey.set(fields[0]); outValue.set(STUDENT_CLASS_FLAG + "\t" + fields[1]); } context.write(outKey, outValue); } }

JoinReducer

package com.test.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer<Text, Text, Text, Text>{
    private static final String STUDENT_FLAG = "student";
    private static final String STUDENT_CLASS_FLAG = "student_class";

    private String fileFlag = null;
    private String stuName = null;
    private List<String> stuClassNames;

    private Text outKey = new Text();
    private Text outValue = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        stuClassNames = new ArrayList<>();

        for (Text val : values) {
            String[] fields = StringUtils.split(val.toString(),"\t");
            fileFlag = fields[0];
            //判斷記錄來自哪個檔案,並根據檔案格式解析記錄。
            if (fileFlag.equals(STUDENT_FLAG)) {
                stuName = fields[1];
                outKey.set(stuName);
            }
            else if (fileFlag.equals(STUDENT_CLASS_FLAG)) {
                stuClassNames.add(fields[1]);
            }
        }

        //求笛卡爾積
        for (String stuClassName : stuClassNames) {
            outValue.set(stuClassName);
            context.write(outKey, outValue);
        }
    }

}

JoinRunner

package com.test.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinRunner extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Join");
        job.setJarByClass(JoinRunner.class);

        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new JoinRunner(), args);
        System.exit(res);
    }
}

執行結果

Jenny   Math
Jenny   Chinese
Hardy   Math
Hardy   Music
Bradley Physic