Hadoop離線_MapReduce求共同好友
阿新 • • 發佈:2021-01-15
需求: 求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?
- 使用者及好友資料
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
分析: 按題目要求分為兩大步驟
- 哪些人倆倆之間有共同好友
按此格式從A-O表示:F-D-O-I-H-B-K-G-C- A
左邊表示“哪些人”,右邊表示他們的共同好友 - 他倆的共同好友都有誰
A-B C-E-
左邊表示“他倆”,右邊表示他們的共同好友是誰
程式碼演示:
步驟1:
建立第一步的包step1
在package中定義main、Mapper、Reducer三個類
- Mapper類:
package cn.itcast.demo1.step1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Step1Mapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//輸入資料如下格式 A:B,C,D,E,O
//將使用者和好友列表分開
String[] split = value.toString().split(":" );
//將好友列表分開,放到一個數組中去
String[] friendList = split[1].split(",");
//迴圈遍歷,輸出的k2,v2格式為 B [A,E]
for (String friend : friendList) {
context.write(new Text(friend), new Text(split[0]));
}
}
}
- Reducer類:
package cn.itcast.demo1.step1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Step1Reducer extends Reducer<Text,Text,Text,Text> {
/*
reduce接收到資料是 B [A,E]
B是好友,集合裡面裝的是多個使用者
將資料最終轉換成這樣的形式進行輸出 A-B-E-F-G-H-K- C
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//建立StringBuffer物件
StringBuffer sb = new StringBuffer();
//迴圈遍歷得到v2並拼接成字串
for (Text value : values) {
sb.append(value.toString()).append("-");
}
context.write(new Text(sb.toString()),key);
}
}
- 程式main函式入口
package cn.itcast.demo1.step1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Step1Main extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//建立job物件
Job job = Job.getInstance(super.getConf(), "step1");
//輸入資料,設定輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:///G:\\friends.txt"));
//自定義map邏輯
job.setMapperClass(Step1Mapper.class);
//設定k2,v2輸出型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//自定義reduce邏輯
job.setReducerClass(Step1Reducer.class);
//設定k3,v3輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//輸出資料,設定輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///G:\\step1_output"));
//將任務提交至叢集
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new Step1Main(), args);
System.exit(run);
}
}
執行完成後,得到第一步的資料
F-D-O-I-H-B-K-G-C- A
E-A-J-F- B
K-A-B-E-F-G-H- C
G-K-C-A-E-L-F-H- D
G-F-M-B-H-A-L-D- E
M-D-L-A-C-G- F
M- G
O- H
C-O- I
O- J
B- K
E-D- L
F-E- M
J-I-H-A-F- O
步驟2:
建立第二步的包step2
在package中定義main、Mapper、Reducer三個類
- Mapper類
package cn.itcast.demo1.step2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//對拿到的資料進行分割,得到使用者列表和好友
String[] split = value.toString().split("\t");
//再對使用者列表進行分割,得到使用者列表陣列
String[] userList = split[0].split("-");
//因為檔案中的資料並不是按照字典順序進行排序,所以有可能會出來A-E E-A的情況,reduceTask是無法將這種情況視為key相同的
//所以需要進行排序
Arrays.sort(userList);
for (int i = 0; i < userList.length - 1; i++) {
for (int j = i + 1; j < userList.length; j++) {
String userTwo = userList[i] + "-" + userList[j];
context.write(new Text(userTwo), new Text(split[1]));
}
}
}
}
- reducer類
package cn.itcast.demo1.step2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Step2Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//建立StringBuffer物件
StringBuffer sb = new StringBuffer();
for (Text value : values) {
//獲取共同好友列表
sb.append(value.toString()).append("-");
}
context.write(key, new Text(sb.toString()));
}
}
- main函式入口
package cn.itcast.demo1.step2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Step2Main extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//建立job物件
Job job = Job.getInstance(super.getConf(), "step2");
//輸入資料,設定輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("file:///G:\\step1_output"));
//自定義map邏輯
job.setMapperClass(Step2Mapper.class);
//設定k2,v2輸出型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//自定義reduce邏輯
job.setReducerClass(Step2Reducer.class);
//設定k3,v3輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//輸出資料,設定輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///G:\\step2_output"));
//提交任務至叢集
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new Step2Main(), args);
System.exit(run);
}
}
執行結果:
A-B C-E-
A-C D-F-
A-D F-E-
A-E C-B-D-
A-F D-O-E-B-C-
A-G C-D-F-E-
A-H E-C-O-D-
A-I O-
A-J O-B-
A-K C-D-
A-L E-D-F-
A-M F-E-
B-C A-
B-D E-A-
B-E C-
B-F E-A-C-
B-G A-E-C-
B-H E-C-A-
B-I A-
B-K A-C-
B-L E-
B-M E-
B-O A-
C-D F-A-
C-E D-
C-F A-D-
C-G F-D-A-
C-H D-A-
C-I A-
C-K A-D-
C-L D-F-
C-M F-
C-O I-A-
D-E L-
D-F A-E-
D-G F-A-E-
D-H A-E-
D-I A-
D-K A-
D-L F-E-
D-M F-E-
D-O A-
E-F M-C-B-D-
E-G C-D-
E-H C-D-
E-J B-
E-K C-D-
E-L D-
F-G A-D-E-C-
F-H D-O-C-E-A-
F-I O-A-
F-J B-O-
F-K A-D-C-
F-L D-E-
F-M E-
F-O A-
G-H E-A-C-D-
G-I A-
G-K C-D-A-
G-L D-E-F-
G-M E-F-
G-O A-
H-I O-A-
H-J O-
H-K D-A-C-
H-L E-D-
H-M E-
H-O A-
I-J O-
I-K A-
I-O A-
K-L D-
K-O A-
L-M F-E-