1. 程式人生 > 其它 >MapReduce的程式碼編寫----學生資料和總分資料關聯(join)

MapReduce的程式碼編寫----學生資料和總分資料關聯(join)

MapReduce的程式碼編寫----學生資料和總分資料關聯(join)

程式程式碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo4Join {
    //Map端
    public static class MyMapper extends Mapper<LongWritable, Text,LongWritable,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //區分value到底是哪個檔案的資料
            //先將資料轉化為String型別(因為Text無法呼叫切分方法)
            String v = value.toString();
            //判斷value是哪個檔案的資料
            if(v.contains(",")){
                //如果value包含逗號,那麼它是學生檔案的資料
                //按照逗號切分
                String[] stuSplite = v.split(",");
                //提取Id,前面所有的資料都轉化為了toString型別,
                //但是Id是Long型別,需要轉化過來
                long id = Long.parseLong(stuSplite[0]);
                //提取姓名
                String name = stuSplite[1];
                //提取班級
                String clazz = stuSplite[4];
                //傳送資料
                context.write(new LongWritable(id),new Text(name+","+clazz+"|"));
            }else{
                //如果value不包含逗號,那麼它是總分資料
                //總分資料按照4個空格切分(因為輸出的是4個空格)
                String[] sumScoreSplite = v.split("\t");
                //提取Id
                long sId = Long.parseLong(sumScoreSplite[0]);
                //提取總分(分數可以作為String型別,不需要轉化)
                String sScore = sumScoreSplite[1];
                //傳送資料
                context.write(new LongWritable(sId),new Text(sScore + "#"));
            }
        }
    }

    //Reduce端
    public static class MyReducer extends Reducer<LongWritable,Text,LongWritable,Text>{
        @Override
        protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //Reduce最後想要輸出學生資料+總分資料,需要定義一個變數等於一個空字串
            String stuV = " ";
            String sumScoreV = " ";
            //遍歷迭代器
            for (Text value : values) {
                String v = value.toString();
                if(v.contains("|")){
                    //如果values包含”|“,該資料是學生資料
                    //已經判斷出了包含”|“的是學生資料,將”|“替換為” “,不讓相加的時候,會帶著”|“
                     stuV = v.replace("|", " ");
                }else{
                    //總分資料
                    sumScoreV = v.replace("#"," ");
                }
            }
            //傳送的時候將二者相加
            context.write(key,new Text(stuV + "," + sumScoreV));
        }
    }

    //Driver端
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        // 設定MapReduce輸出的K-V的分隔符
        conf.set("mapred.textoutputformat.separator", ",");
        Job job = Job.getInstance(conf);
        job.setJobName("Demo4Join");
        job.setJarByClass(Demo4Join.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        // 配置輸入輸出路徑
        FileInputFormat.addInputPath(job, new Path("/student/input"));
        FileInputFormat.addInputPath(job, new Path("/student/score/output"));
        // 輸出路徑不需要提前建立,如果該目錄已存在則會報錯
        // 通過HDFS的JavaAPI判斷輸出路徑是否存在
        Path outPath = new Path("/student/join/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }

        FileOutputFormat.setOutputPath(job, outPath);

        // 等待job執行完成
        job.waitForCompletion(true);

    }
}
執行結果
1500100001,施笑槐,文科六班 ,406 
1500100002,呂金鵬,文科六班 ,440 
1500100003,單樂蕊,理科六班 ,359 
1500100004,葛德曜,理科三班 ,421 
1500100005,宣谷芹,理科五班 ,395 
1500100006,邊昂雄,理科二班 ,314 
1500100007,尚孤風,文科六班 ,418 
1500100008,符半雙,理科六班 ,363 
1500100009,沈德昌,理科一班 ,251 
1500100010,羿彥昌,理科六班 ,402 
1500100011,宰運華,理科三班 ,282 
1500100012,樑易槐,理科一班 ,459 
1500100013,逯君昊,文科二班 ,369 
1500100014,羿旭炎,理科五班 ,396 
1500100015,宦懷綠,理科一班 ,309 
1500100016,潘訪煙,文科一班 ,359 
1500100017,高芷天,理科五班 ,263 
1500100018,駱憐雪,文科六班 ,425 
1500100019,婁曦之,理科三班 ,433
...共6000行