求共同好友,多job運用
阿新 • • 發佈:2018-12-24
package bd1805day09;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ManyJob {
//多job串聯,求共同好友
/**
* A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
第一步將每個好友關注哪些使用者
第二步將兩兩使用者的共同好友求出來
建立兩個MR可以和容易完成
*/
//第一個MapReduce
static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//A-B
//A:B,C,D,F,E,O
String line = value.toString();
String[] user_friends = line.split(":");
//獲取的是所有的好友
String[] friends = user_friends[1].split(",");
//迴圈遍歷好友 和使用者拼接傳送
for(String f:friends){
context.write(new Text(f), new Text(user_friends[0]));
}
}
}
static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
//同一個好友的所有使用者,A:B,C,D
//將values進行迴圈遍歷拼接
StringBuffer sb=new StringBuffer();
for(Text v:values){
sb.append(v.toString()).append(","); //A F,I,O,K,G,D,C,H,B
}
context.write(key, new Text(sb.substring(0,sb.length()-1)));
}
}
第二個MapReduce
static class MyMapper2 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] friend_users = line.split("\t");
String[] users = friend_users[1].split(",");
//A F,I,O,K,G,D,C,H,B
//迴圈遍歷使用者 進行兩兩拼接 拼接順序問題 a-b b-a
for(String ul:users){//拼左側的
for(String ur:users){//拼右側的
if(ul.charAt(0)<ur.charAt(0)){
String uu=ul+"-"+ur;
System.out.println(uu);
context.write(new Text(uu), new Text(friend_users[0]));//A-E C
}
}
}
}
}
static class MyReducer2 extends Reducer<Text, Text, Text, Text>{
//相同的兩兩使用者為一組
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();
for(Text v:values){
sb.append(v.toString()).append(",");
}
context.write(key, new Text(sb.substring(0, sb.length()-1))); //A-E C,D,B
}
}
//建立驅動,載入job任務
public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, InterruptedException {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
Job job1=Job.getInstance(conf); //建立第一個job
job1.setJarByClass(bd1805day09.ManyJob.class);
job1.setMapperClass(MyMapper.class);
job1.setReducerClass(MyReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path("hdfs://hadoop02:9000/friendin"));
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop02:9000"), conf);//建立一個hdfs的檔案系統
Path path = new Path("/friendout_01");
if(fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job1, new Path("/friendout_01"));
Job job2=Job.getInstance(conf); //建立第二個job
job2.setJarByClass(bd1805day09.ManyJob.class);
job2.setMapperClass(MyMapper2.class);
job2.setReducerClass(MyReducer2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path("/friendout_01"));
Path path1=new Path("/friendout_03");
if(fs.exists(path1)){ //對所在路徑下的檔案清除
fs.delete(path1, true);
}
FileOutputFormat.setOutputPath(job2,path1);
//提交兩個job 組:需要一起執行的job 組名隨意
JobControl jc=new JobControl("wc_sort");
//job.xml
ControlledJob ajob=new ControlledJob(job1.getConfiguration());
ControlledJob bjob=new ControlledJob(job2.getConfiguration());
//需要新增多個job之間的依賴關係
bjob.addDependingJob(ajob);
//bjob.addDependingJob(cjob);
jc.addJob(ajob);
jc.addJob(bjob);
//提交job 啟動可一個執行緒
new Thread(jc).start();
//關閉了這個執行緒 應該在上面的執行緒執行完成之後進行關閉
//判斷jc物件上的job是否全部執行完成 執行完成 true 不完成 false
while(!jc.allFinished()){
Thread.sleep(500);
}
jc.stop();
}
}