mapreduce實現——騰訊大資料QQ共同好友推薦系統【你可能認識的人】
阿新 • • 發佈:2019-05-31
基於騰訊大資料QQ共同好友推薦系統,我們基於mapreduce來實現下
測試資料:前面代表QQ使用者,:後面代表使用者QQ好友
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
可以分兩個步驟來實現
第一步map和reduce階段
1.先根據使用者QQ好友作為key,使用者作為value來輸出:
比如第一行:B->A C->A D->A F->A E->A O->A
第二行:A->B C->B E->B K->B
然後根據相同的key使用reduce來聚合
比如輸出:B->A B->C B->D....
reduce會根據相同的key分發給相應的reduce task來執行
結果輸出為:B->A C D...
再寫程式碼組合輸出:
A-C B
A-D B
C-D B
.....
程式碼實現
package com.xuyu.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; public class CommonFriendsOne { public static class CommonFriendsMapper extends Mapper<LongWritable,Text,Text,Text>{ Text k=new Text(); Text v=new Text(); //A:B,C,D,F,E,O //輸出:B->A C->A D-A... @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] userAndFriends = value.toString().split(":"); String user=userAndFriends[0]; String[] friends= userAndFriends[1].split(","); v.set(user); for (String f:friends){ k.set(f); //friend user context.write(k,v); } } } public static class CommonFriendsOneReduce extends Reducer<Text,Text,Text,Text>{ //一組資料:B->A E F J... //一組資料:C->B F E J... //所以需要排序 @Override protected void reduce(Text friend, Iterable<Text> users, Context context) throws IOException, InterruptedException { ArrayList<String> userList=new ArrayList<String>(); for (Text user:users){ userList.add(user.toString()); } //排序 Collections.sort(userList); //組合輸出 for(int i=0;i<userList.size()-1;i++){ for(int j=i+1;j<userList.size();j++){ context.write(new Text(userList.get(i)+"-"+userList.get(j)),friend); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裡 job.setJarByClass(CommonFriendsOne.class); //2.封裝引數:本次job所要呼叫的mapper實現類 job.setMapperClass(CommonFriendsMapper.class); job.setReducerClass(CommonFriendsOneReduce.class); //3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4.封裝引數:本次Reduce返回的key,value資料型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\friends\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\friends\\output")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
結果輸出
part-r-000000檔案
B-C A B-D A B-F A B-G A B-H A B-I A B-K A B-O A C-D A C-F A C-G A C-H A C-I A C-K A C-O A D-F A D-G A D-H A D-I A D-K A D-O A F-G A F-H A F-I A F-K A F-O A G-H A G-I A G-K A G-O A H-I A H-K A H-O A I-K A I-O A K-O A A-E B A-F B A-J B E-F B E-J B F-J B A-B C A-E C A-F C A-G C A-H C A-K C B-E C B-F C B-G C B-H C B-K C E-F C E-G C E-H C E-K C F-G C F-H C F-K C G-H C G-K C H-K C A-C D A-E D A-F D A-G D A-H D A-K D A-L D C-E D C-F D C-G D C-H D C-K D C-L D E-F D E-G D E-H D E-K D E-L D F-G D F-H D F-K D F-L D G-H D G-K D G-L D H-K D H-L D K-L D A-B E A-D E A-F E A-G E A-H E A-L E A-M E B-D E B-F E B-G E B-H E B-L E B-M E D-F E D-G E D-H E D-L E D-M E F-G E F-H E F-L E F-M E G-H E G-L E G-M E H-L E H-M E L-M E A-C F A-D F A-G F A-L F A-M F C-D F C-G F C-L F C-M F D-G F D-L F D-M F G-L F G-M F L-M F C-O I D-E L E-F M A-F O A-H O A-I O A-J O F-H O F-I O F-J O H-I O H-J O I-J O
第二步:將第一步輸出的結果檔案再進行一次map reduce
輸出結果應該為這種:第一行表示:A和B使用者都有C和E這兩個共同好友
A-B [C, E]
A-C [D, F]
A-D [E, F]
A-E [B, C, D]
A-F [B, C, D, E, O]
A-G [C, D, E, F]
程式碼實現
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
public class CommonFriendsOne2 {
public static class CommonFriendsMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] friendsAndUser = value.toString().split("\t");
context.write(new Text(friendsAndUser[0]),new Text(friendsAndUser[1]));
}
}
public static class CommonFriendsOneReduce extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text friends, Iterable<Text> users, Context context) throws IOException, InterruptedException {
ArrayList<String> userList=new ArrayList<String>();
for (Text user:users){
userList.add(user.toString());
}
//排序
Collections.sort(userList);
context.write(friends,new Text(userList.toString()));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//動態獲取jar包在哪裡
job.setJarByClass(CommonFriendsOne2.class);
//2.封裝引數:本次job所要呼叫的mapper實現類
job.setMapperClass(CommonFriendsMapper.class);
job.setReducerClass(CommonFriendsOneReduce.class);
//3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//4.封裝引數:本次Reduce返回的key,value資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\friends\\output"));
FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\friends\\output2"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0:-1);
}
}
統計輸出結果為
A-B [C, E]
A-C [D, F]
A-D [E, F]
A-E [B, C, D]
A-F [B, C, D, E, O]
A-G [C, D, E, F]
A-H [C, D, E, O]
A-I [O]
A-J [B, O]
A-K [C, D]
A-L [D, E, F]
A-M [E, F]
B-C [A]
B-D [A, E]
B-E [C]
B-F [A, C, E]
B-G [A, C, E]
B-H [A, C, E]
B-I [A]
B-K [A, C]
B-L [E]
B-M [E]
B-O [A]
C-D [A, F]
C-E [D]
C-F [A, D]
C-G [A, D, F]
C-H [A, D]
C-I [A]
C-K [A, D]
C-L [D, F]
C-M [F]
C-O [A, I]
D-E [L]
D-F [A, E]
D-G [A, E, F]
D-H [A, E]
D-I [A]
D-K [A]
D-L [E, F]
D-M [E, F]
D-O [A]
E-F [B, C, D, M]
E-G [C, D]
E-H [C, D]
E-J [B]
E-K [C, D]
E-L [D]
F-G [A, C, D, E]
F-H [A, C, D, E, O]
F-I [A, O]
F-J [B, O]
F-K [A, C, D]
F-L [D, E]
F-M [E]
F-O [A]
G-H [A, C, D, E]
G-I [A]
G-K [A, C, D]
G-L [D, E, F]
G-M [E, F]
G-O [A]
H-I [A, O]
H-J [O]
H-K [A, C, D]
H-L [D, E]
H-M [E]
H-O [A]
I-J [O]
I-K [A]
I-O [A]
K-L [D]
K-O [A]
L-M [E, F]