大數據技術之找博客共同好友案例
阿新 • • 發佈:2018-07-03
image mapred top 代碼 jar split set 代碼實現 port
7.9 找博客共同好友案例
1)需求:
以下是博客的好友列表數據,冒號前是一個用戶,冒號後是該用戶的所有好友(數據中的好友關系是單向的)
A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J 多對多的關系 數據庫:學生 課程 成績表 學生表和課程表的自然連接 A 1 100friends.txtA 2 90 A : B A : C B : C A I,K,C,B,G,F,H,O,D, B A,F,J,E, C A,B D A,B A-B C,D
求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?
2)需求分析:
先求出A、B、C、….等是誰的好友
第一次輸出結果
A I,K,C,B,G,F,H,O,D, B A,F,J,E, C A,E,B,H,F,G,K, D G,C,K,A,L,F,E,H, E G,M,L,H,A,F,B,D, F L,M,D,C,G,A, G M, H O, I O,C, J O, K B, L D,E, M E,F, O A,H,I,J,F,
第二次輸出結果
A-B E C A-C D F A-D E F A-E D B C A-F O B C D E A-G F E C D A-H E C D O A-I O A-J O B A-K D C A-L F E D A-M E F B-C A B-D A E B-E C B-F E A C B-G C E A B-H A E C B-I A B-K C A B-L E BView Code-M E B-O A C-D A F C-E D C-F D A C-G D F A C-H D A C-I A C-K A D C-L D F C-M F C-O I A D-E L D-F A E D-G E A F D-H A E D-I A D-K A D-L E F D-M F E D-O A E-F D M C B E-G C D E-H C D E-J B E-K C D E-L D F-G D C A E F-H A D O E C F-I O A F-J B O F-K D C A F-L E D F-M E F-O A G-H D C E A G-I A G-K D A C G-L D F E G-M E F G-O A H-I O A H-J O H-K A C D H-L D E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E F
3)代碼實現:
(1)第一次Mapper
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 1 獲取一行 A:B,C,D,F,E,O String line = value.toString(); // 2 切割 String[] fileds = line.split(":"); // 3 獲取person和好友 String person = fileds[0]; String[] friends = fileds[1].split(","); // 4寫出去 for(String friend: friends){ // 輸出 <好友,人> context.write(new Text(friend), new Text(person)); } } }
(2)第一次Reducer
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); //1 拼接 for(Text person: values){ sb.append(person).append(","); } //2 寫出 context.write(key, new Text(sb.toString())); } }
(3)第一次Driver
package com.xyg.mapreduce.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OneShareFriendsDriver { public static void main(String[] args) throws Exception { // 1 獲取job對象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定jar包運行的路徑 job.setJarByClass(OneShareFriendsDriver.class); // 3 指定map/reduce使用的類 job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class); // 4 指定map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6 指定job的輸入原始所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
(4)第二次Mapper
package com.xyg.mapreduce.friends; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // A I,K,C,B,G,F,H,O,D, // 友 人,人,人 String line = value.toString(); String[] friend_persons = line.split("\t"); String friend = friend_persons[0]; String[] persons = friend_persons[1].split(","); Arrays.sort(persons); for (int i = 0; i < persons.length - 1; i++) { for (int j = i + 1; j < persons.length; j++) { // 發出 <人-人,好友> ,這樣,相同的“人-人”對的所有好友就會到同1個reduce中去 context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend)); } } } }
(5)第二次Reducer
package com.xyg.mapreduce.friends; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text friend : values) { sb.append(friend).append(" "); } context.write(key, new Text(sb.toString())); } }
(6)第二次Driver
package com.xyg.mapreduce.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TwoShareFriendsDriver { public static void main(String[] args) throws Exception { // 1 獲取job對象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定jar包運行的路徑 job.setJarByClass(TwoShareFriendsDriver.class); // 3 指定map/reduce使用的類 job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class); // 4 指定map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6 指定job的輸入原始所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
大數據技術之找博客共同好友案例