1. 程式人生 > >MapReduce單表關聯

MapReduce單表關聯

資料:
找出孩子的爺爺奶奶姥姥老爺

child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Marry
Lucy Jesse
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma

結果:

Jone    Alice
Tom    Alice
Jone    Jesse
Tom    Jesse
Jone    Marry
Tom    Marry
Jone    Jesse
Tom    Jesse
Mark    Alice
Philip    Alice
Mark    Jesse
Philip    Jesse

Mapper:

一個坑:每次放入context.write()的時候都需要重新new 一個Text出來。不可以用原來的Text.set()方法

package _SingleTable;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author:Dapeng
 * @Discription:
 * @Date:Created in 上午 10:11 2018/11/8 0008
 
*/ public class SingleTableMap extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] wordArr = line.split("\\s+");
if(!"child".equals(wordArr[0])){ //設定parents context.write(new Text(wordArr[0]),new Text("1:" + wordArr[1])); //設定son context.write(new Text(wordArr[1]),new Text("2:" + wordArr[0])); } } }

Reducer

package _SingleTable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author:Dapeng
 * @Discription:
 * @Date:Created in 上午 10:11 2018/11/8 0008
 */
public class SingleTableReduce extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {


        List<String> parents = new ArrayList<String>();
        List<String> childs = new ArrayList<String>();
        Text t1 = new Text();
        Text t2 = new Text();

        for(Text t : values){
            String str = t.toString();
            String[] s = str.split(":");

            if ("1".equals(s[0])) {
                parents.add(s[1]);
            } else if("2".equals(s[0])) {
                childs.add(s[1]);
            }

        }

       for(String p :parents){
            for(String c:childs){
                t1.set(p);
                t2.set(c);
                context.write(t2,t1);
            }
       }
    }
}
package _SingleTable;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author:Dapeng
 * @Discription:
 * @Date:Created in 上午 10:11 2018/11/8 0008
 */
public class SingleTableMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //0.建立一個job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"single_table");
        job.setJarByClass(SingleTableMain.class);
        //1.輸入檔案
        //預設用TextInputFormat
        FileInputFormat.addInputPath(job,new Path("file:/D:/hadoopFile/singleTable/data.txt"));
        //2.編寫mapper
        job.setMapperClass(SingleTableMap.class);
        //設定輸出的格式
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //3.shuffle

        //4.reduce
        job.setReducerClass(SingleTableReduce.class);
        //設定輸出的格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //5.輸出
        FileOutputFormat.setOutputPath(job,new Path("file:/D:/hadoopFile/singleTable/out"));

        //6.執行

        boolean result = job.waitForCompletion(true);
        System.out.println(result);
    }
}