1. 程式人生 > >案例3-使用hadoop-mapreduce來統計並進行好友推薦

案例3-使用hadoop-mapreduce來統計並進行好友推薦

常見的需求如QQ中的推薦好友,例如下圖:

我們想給如花推薦好友,途中相鄰連線的人之間是彼此直接好友的關係,那麼我們推薦的規則是同一對”好友的好友”(簡稱FOF)出現的次數,比如:如花的好友的好友有“小明”“李剛”“鳳姐”,而FOF關係如下:

                                        如花  小明   李剛  鳳姐

那麼對於如花來說,小明,李剛,鳳姐三者之間都是以如花為中心的FOF好友的好友關係。

接下來,我們使用mapreduce來計算應該給每個使用者推薦哪些好友?

在這個例子中,我們會有兩個mapTask和reduceTask:

第一組mapTask和reducetask:

                 根據直接好友關係的輸入檔案,計算出每組FOF關係及出現總次數

第二組mapTask和reducetask:

                根據第一組輸出的FOF及總次數,計算出給每個使用者推薦的好友順序。

程式碼:

package com.jeff.mr.friend;

import org.apache.hadoop.io.Text;

public class Fof extends Text{

	public Fof(){
		super();
	}
	
	public Fof(String a,String b){
		/**
		 * 主要:
		 *   此行程式碼保證了無論a和b,還是b和a的順序傳進來,這兩對組合在reduce端洗牌時都被分到一組去
		 */
		super(getFof(a, b));
	}

	/**
	 * 定義a和b,b和a傳進來的順序不同,是兩組不一樣的資料
	 * @param a
	 * @param b
	 * @return
	 */
	public static String getFof(String a,String b){
		int r =a.compareTo(b);
		if(r<0){
			return a+"\t"+b;
		}else{
			return b+"\t"+a;
		}
	}
}
package com.jeff.mr.friend;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;

public class RunJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
//		config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//		config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
		if(run1(config)){
			run2(config);
		}
	}
	
	public static boolean run1(Configuration config) {
		try {
			FileSystem fs =FileSystem.get(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(RunJob.class);
			job.setJobName("friend");
			job.setMapperClass(FofMapper.class);
			job.setReducerClass(FofReducer.class);
			job.setMapOutputKeyClass(Fof.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setInputFormatClass(KeyValueTextInputFormat.class);
			
			FileInputFormat.addInputPath(job, new Path("/usr/input/friend"));
			
			Path outpath =new Path("/usr/output/f1");
			if(fs.exists(outpath)){
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);
			
			boolean f= job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}
	
	public static void run2(Configuration config) {
		try {
			FileSystem fs =FileSystem.get(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(RunJob.class);
			
			job.setJobName("fof2");
			
			job.setMapperClass(SortMapper.class);
			job.setReducerClass(SortReducer.class);
			job.setSortComparatorClass(FoFSort.class);
			job.setGroupingComparatorClass(FoFGroup.class);
			job.setMapOutputKeyClass(User.class);
			job.setMapOutputValueClass(User.class);
			
			job.setInputFormatClass(KeyValueTextInputFormat.class);
			
			//設定MR執行的輸入檔案
			FileInputFormat.addInputPath(job, new Path("/usr/output/f1"));
			
			//該目錄表示MR執行之後的結果資料所在目錄,必須不能存在
			Path outputPath=new Path("/usr/output/f2");
			if(fs.exists(outputPath)){
				fs.delete(outputPath, true);
			}
			FileOutputFormat.setOutputPath(job, outputPath);
			
			boolean f =job.waitForCompletion(true);
			if(f){
				System.out.println("job 成功執行");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
	
	/**
	 * 定義MapTask
	 * 第一階段洗牌,將資料輸出為:<FOF,出現次數>
	 * 
	 * @desc 輸入檔案中每一行的key是第一個字串就代表了使用者,之後的所有以\t隔開的字串表示了使用者對應的直接好友
	 * 
	 * @author jeffSheng
	 * 2018年9月23日
	 */
	static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{
		protected void map(Text key, Text value,
				Context context)
				throws IOException, InterruptedException {
			//輸入檔案中的每一行的key仍然是按照第一個製表符\t拆分
			String user =key.toString();
			//按照\t拆分為user的所有直接好友,他們彼此之間是fof關係(不重複計算)
			String[] friends =StringUtils.split(value.toString(), '\t');
			for (int i = 0; i < friends.length; i++) {
				String f1 = friends[i];
				/**
				 * 為了防止出現FOF關係中的兩個使用者其實是直接好友的關係,我們將<使用者,好友>這種關係也加入到輸出鍵值對中,以0來標識這種關係,在計算階段可以用來剔除去重
				 */
				Fof ofof =new Fof(user, f1);
				context.write(ofof, new IntWritable(0));
				for (int j = i+1; j < friends.length; j++) {
					String f2 = friends[j];
					Fof fof =new Fof(f1, f2);
					context.write(fof, new IntWritable(1));
				}
			}
		}
	}
	
	/**
	 * 定義ReduceTask(第一個REDUCE),計算FOF關係及出現的次數
	 * 第二階段洗牌:key為FOF,value是每個FOF出現的次數可能是0和1:0標識直接好友,1標識FOF關係
	 * 
	 * @author jeffSheng
	 * 2018年9月23日
	 */
	static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{
		protected void reduce(Fof fof, Iterable<IntWritable> iterable,
				Context context)
				throws IOException, InterruptedException {
			int sum =0;
			boolean f =true;
			/**
			 * 迭代IntWritable,每組FOF中的value值,0或者1,
			 * 是0則FOF為直接好友,終止當前組的計算迴圈,設定f為false不輸出結果
			 * 是1則累計次數sum,最後輸出FOF的次數
			 */
			for(IntWritable i: iterable){
				if(i.get()==0){
					f=false;
					break;
				}else{
					sum=sum+i.get();
				}
			}
			if(f){
				context.write(fof, new IntWritable(sum));
			}
		}
	}
	
	
	/**
	 * 1 以第一個Reduce分割槽輸出的結果檔案為第二個mapTask的輸入資料
	 * 2 輸入資料的key是FOF關係的第一個值,比如: 老王  如花  3,則key是老王,value是如花 3
	 * 3 mapTask輸出資料格式為<User, User>,第一個User是要推薦的使用者,第二個是推薦的好友是誰,一個FOF輸出相互推薦,context.write兩次
	 * @author jeffSheng
	 * 2018年9月23日
	 */
	static class SortMapper extends Mapper<Text, Text, User, User>{
		
		
		protected void map(Text key, Text value,Context context)
				throws IOException, InterruptedException {
			/**
			 * 以老王  如花  3為例,args為如花  3,other就是如花,key就是老王了,friendsCount就是3
			 */
			String[] args=StringUtils.split(value.toString(),'\t');
			String other=args[0];
			int friendsCount =Integer.parseInt(args[1]);
			/**
			 * key是比如老王  如花  3為例的老王,other是如花,那麼老王和如花是FOF關係,他們關係出現的次數是friendsCount
			 * 輸出資料的時候,推薦給老王一個使用者如花,推薦給如花一個使用者老王。
			 */
			context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));
			context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));
		}
	}
	
	
	static class SortReducer extends Reducer<User, User, Text, Text>{
		protected void reduce(User arg0, Iterable<User> arg1,
				Context arg2)
				throws IOException, InterruptedException {
			String user = arg0.getUname();
			StringBuffer sb = new StringBuffer();
			for(User u: arg1 ){
				sb.append(u.getUname()+":"+u.getFriendsCount());
				sb.append(",");
			}
			arg2.write(new Text(user), new Text(sb.toString()));
		}
	}
}
package com.jeff.mr.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義分組
 * @author jeffSheng
 * 2018年9月24日
 */
public class FoFGroup extends WritableComparator{

	public FoFGroup() {
		super(User.class,true);
	}
	
	/**
	 * 就根據姓名分組
	 */
	public int compare(WritableComparable a, WritableComparable b) {
		User u1 =(User) a;
		User u2=(User) b;
		
		return u1.getUname().compareTo(u2.getUname());
	}
}
package com.jeff.mr.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義排序
 * @author jeffSheng
 * 2018年9月24日
 */
public class FoFSort extends WritableComparator{

	public FoFSort() {
		super(User.class,true);
	}
	
	/**
	 * 推薦規則,先根據姓名字典排序,如果姓名相同則根據FOF出現次數排序
	 */
	public int compare(WritableComparable a, WritableComparable b) {
		User u1 =(User) a;
		User u2=(User) b;
		int result =u1.getUname().compareTo(u2.getUname());
		if(result==0){
			return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());
		}
		return result;
	}
}
package com.jeff.mr.friend;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class User implements WritableComparable<User>{

	private String uname;
	private int friendsCount;
	
	public String getUname() {
		return uname;
	}
	public void setUname(String uname) {
		this.uname = uname;
	}
	public int getFriendsCount() {
		return friendsCount;
	}
	public void setFriendsCount(int friendsCount) {
		this.friendsCount = friendsCount;
	}

	public User() {
		// TODO Auto-generated constructor stub
	}
	
	public User(String uname,int friendsCount){
		this.uname=uname;
		this.friendsCount=friendsCount;
	}
	
	public void write(DataOutput out) throws IOException {
		out.writeUTF(uname);
		out.writeInt(friendsCount);
	}

	
	public void readFields(DataInput in) throws IOException {
		this.uname=in.readUTF();
		this.friendsCount=in.readInt();
	}

	public int compareTo(User o) {
		int result = this.uname.compareTo(o.getUname());
		if(result==0){
			return Integer.compare(this.friendsCount, o.getFriendsCount());
		}
		return result;
	}

	
	
}

上傳輸入檔案friend,表示的是直接好友關係表

 

檔案內容,就是文章開始那幅圖的數字化表示。

小明	老王	如花	林志玲
老王	小明	鳳姐
如花	小明	李剛	鳳姐
林志玲	小明	李剛	鳳姐	郭美美
李剛	如花	鳳姐	林志玲
郭美美	鳳姐	林志玲
鳳姐	如花	老王	林志玲	郭美美

執行Runjob.java:

我們開始執行時發現報錯,原因是我們設定的是node1作為config,但是node1並不是active二是standby,所以我們需要改成node1.

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

因為node4是active狀態,所以要改成node4

      Configuration config =new Configuration();

      config.set("fs.defaultFS", "hdfs://node4:8020");

      config.set("yarn.resourcemanager.hostname", "node4");

任務1和任務2執行結束:

 

輸出檔案計算結果:

第一組MapTask和reduceTask計算結果:列出了每一組FOF出現的總次數

 

第二組MapTask和reduceTask計算結果:

如“給李剛推薦的順序就是如花(3)、老王(2)”,可以展示在QQ推薦面板上!