1. 程式人生 > 其它 >04.Mapreduce例項——單表join

04.Mapreduce例項——單表join

04Mapreduce例項——單表join

實驗原理

以本實驗的buyer1(buyer_id,friends_id)表為例來闡述單表連線的實驗原理。單表連線,連線的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一個表。因此,在map階段將讀入資料分割成buyer_id和friends_id之後,會將buyer_id設定成key,friends_id設定成value,直接輸出並將其作為左表;再將同一對buyer_id和friends_id中的friends_id設定成key,buyer_id設定成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的資訊,比如在value的String最開始處加上字元1表示左表,加上字元2表示右表。這樣在map的結果中就形成了左表和右表,然後在shuffle過程中完成連線。reduce接收到連線的結果,其中每個key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"關係。取出每個key的value-list進行解析,將左表中的buyer_id放入一個數組,右表中的friends_id放入一個數組,然後對兩個陣列求笛卡爾積就是最後的結果了。

實驗步驟

  1. 開啟Hadoop服務

Start-all.sh

  1. 建立目錄

mkdir-p/data/mapreduce7

  1. 將buyer1檔案上傳到該目錄下
  2. 上傳hadoop2lib檔案並解壓

unzip hadoop2lib.zip

  1. 在hdfs上新建/mymapreduce7/in目錄,然後將Linux本地/data/mapreduce7目錄下的buyer1檔案匯入到hdfs的/mymapreduce7/in目錄中。

hadoop fs -mkdir -p /mymapreduce7/in

hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in

  1. IDEA中編寫Java程式碼
  2. package mapreduce4;
    import java.io.IOException;
    import java.util.Iterator;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class DanJoin {
    public static class Map extends Mapper<Object,Text,Text,Text>{
    public void map(Object key,Text value,Context context)
    throws IOException,InterruptedException{
    String line = value.toString();
    String[] arr = line.split(",");
    String mapkey=arr[0];
    String mapvalue=arr[1];
    String relationtype=new String();
    relationtype="1";
    context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));
    //System.out.println(relationtype+"+"+mapvalue);
    relationtype="2";
    context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));
    //System.out.println(relationtype+"+"+mapvalue);
    }
    }
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key,Iterable<Text> values,Context context)
    throws IOException,InterruptedException{
    int buyernum=0;
    String[] buyer=new String[20];
    int friendsnum=0;
    String[] friends=new String[20];
    Iterator ite=values.iterator();
    while(ite.hasNext()){
    String record=ite.next().toString();
    int len=record.length();
    int i=2;
    if(0==len){
    continue;
    }
    char relationtype=record.charAt(0);
    if('1'==relationtype){
    buyer [buyernum]=record.substring(i);
    buyernum++;
    }
    if('2'==relationtype){
    friends[friendsnum]=record.substring(i);
    friendsnum++;
    }
    }
    if(0!=buyernum&&0!=friendsnum){
    for(int m=0;m<buyernum;m++){
    for(int n=0;n<friendsnum;n++){
    if(buyer[m]!=friends[n]){
    context.write(new Text(buyer[m]),new Text(friends[n]));
    }
    }
    }
    }
    }
    }
    public static void main(String[] args) throws Exception{

    Configuration conf=new Configuration();
    String[] otherArgs=new String[2];
    otherArgs[0]="hdfs://192.168.149.10:9000/mymapreduce7/in/buyer1";
    otherArgs[1]="hdfs://192.168.149.10:9000/mymapreduce7/out";
    Job job=new Job(conf," Table join");
    job.setJarByClass(DanJoin.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath
    (job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true)?0:1);

    }
    }
  1. 將hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
  2. 拷貝log4j.properties檔案
  3. 執行結果