1. 程式人生 > 其它 >Mapreduce例項——單表join

Mapreduce例項——單表join

現有某電商的使用者好友資料檔案,名為 buyer1,buyer1中包含(buyer_id,friends_id)兩個欄位,內容是以"\t"分隔,編寫MapReduce進行單表連線,查詢出使用者的間接好友關係。例如:10001的好友是10002,而10002的好友是10005,那麼10001和10005就是間接好友關係。

buyer1(buyer_id,friends_id)
10001    10002
10002    10005
10003    10002
10004    10006
10005    10007
10006    10022
10007    10032
10009    10006
10010    10005
10011    10013
buyer1

mapreduce程式:

package mapreduce7;

import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //04.Mapreduce例項——單表join public class DanJoin { public static class Map extends Mapper<Object,Text,Text,Text>{ public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split("\t"); String mapkey=arr[0]; String mapvalue=arr[1]; String relationtype=new String(); relationtype="1"; context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue)); //System.out.println(relationtype+"+"+mapvalue); relationtype="2"; context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey)); //System.out.println(relationtype+"+"+mapvalue); } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ int buyernum=0; String[] buyer=new String[20]; int friendsnum=0; String[] friends=new String[20]; Iterator ite=values.iterator(); while(ite.hasNext()){ String record=ite.next().toString(); int len=record.length(); int i=2; if(0==len){ continue; } char relationtype=record.charAt(0); if('1'==relationtype){ buyer [buyernum]=record.substring(i); buyernum++; } if('2'==relationtype){ friends[friendsnum]=record.substring(i); friendsnum++; } } if(0!=buyernum&&0!=friendsnum){ for(int m=0;m<buyernum;m++){ for(int n=0;n<friendsnum;n++){ if(buyer[m]!=friends[n]){ context.write(new Text(buyer[m]),new Text(friends[n])); } } } } } } public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); String[] otherArgs=new String[2]; otherArgs[0]="hdfs://192.168.51.100:8020/mymapreduce7/in/buyer1"; otherArgs[1]="hdfs://192.168.51.100:8020/mymapreduce7/out"; Job job=new Job(conf," Table join"); job.setJarByClass(DanJoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } }

結果:

原理:

以本實驗的buyer1(buyer_id,friends_id)表為例來闡述單表連線的實驗原理。單表連線,連線的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一個表。因此,在map階段將讀入資料分割成buyer_id和friends_id之後,會將buyer_id設定成key,friends_id設定成value,直接輸出並將其作為左表;再將同一對buyer_id和friends_id中的friends_id設定成key,buyer_id設定成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的資訊,比如在value的String最開始處加上字元1表示左表,加上字元2表示右表。這樣在map的結果中就形成了左表和右表,然後在shuffle過程中完成連線。reduce接收到連線的結果,其中每個key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"關係。取出每個key的value-list進行解析,將左表中的buyer_id放入一個數組,右表中的friends_id放入一個數組,然後對兩個陣列求笛卡爾積就是最後的結果