1. 程式人生 > >19-hadoop-fof好友推薦

19-hadoop-fof好友推薦

append compare mat ping 好友 count group imp context

好友推薦的案例, 需要兩個job, 第一個進行好友關系度計算, 第二個job將計算的關系進行推薦

1, fof關系類

package com.wenbronk.friend;

import org.apache.hadoop.io.Text;

/**
 * 定義fof關系
 * @author root
 *
 */
public class Fof extends Text{

    public Fof() {
        super();
    }
    
    /**‘
     * 不論誰在前,返回一致的順序
     * @param a
     * @param b
     
*/ public Fof(String a, String b) { super(getFof(a, b)); } /** * 按字典順序排序, 保證兩個fof為同一組輸出 * @param a * @param b * @return */ public static String getFof(String a, String b) { int r = a.compareTo(b); if (r < 0) { return a + "
\t" + b; }else { return b + "\t" + a; } } }

2, user類

package com.wenbronk.friend;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class User implements WritableComparable<User>{

    
private String uname; private int friedsCount; public String getUname() { return uname; } public void setUname(String uname) { this.uname = uname; } public int getFriedsCount() { return friedsCount; } public void setFriedsCount(int friedsCount) { this.friedsCount = friedsCount; } public User() { super(); } public User(String uname, int friedsCount) { super(); this.uname = uname; this.friedsCount = friedsCount; } @Override public void readFields(DataInput arg0) throws IOException { this.uname = arg0.readUTF(); this.friedsCount = arg0.readInt(); } @Override public void write(DataOutput arg0) throws IOException { arg0.writeUTF(uname); arg0.writeInt(friedsCount); } @Override public int compareTo(User o) { int result = this.uname.compareTo(o.getUname()); if (result == 0) { return Integer.compare(this.friedsCount, o.getFriedsCount()); } return result; } }

3, sort

package com.wenbronk.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 排序
 * @author root
 *
 */
public class FofSort extends WritableComparator {

    public FofSort() {
        super(User.class, true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        User user1 = (User) a;
        User user2 = (User) b;
        
        int compareTo = user1.getUname().compareTo(user2.getUname());
        if (compareTo == 0) {
            compareTo = Integer.compare(user1.getFriedsCount(), user2.getFriedsCount());
        }
        return compareTo;
    }
    
}

4, group

package com.wenbronk.friend;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義分組
 * @author root
 *
 */
public class FofGroup extends WritableComparator {

    public FofGroup() {
        super(User.class, true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        User u1 = (User) a;
        User u2 = (User) b;
        return u1.getUname().compareTo(u2.getUname());
    }
    
}

5, job

package com.wenbronk.friend;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 1個mapreduce找到所有的fof關系 第二個mapreduce執行排序
 * 
 * @author root
 */
public class RunJob {

    public static void main(String[] args) throws IOException {
        Configuration configuration = new Configuration();
//        configuration.set("mapred.jar", "C:/Users/wenbr/Desktop/fof.jar");
        
        // 本地運行
        configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020    ");
        configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106");
        
        
        if (runFindFof(configuration)) {
            // 根據foffind進行排序
            run2(configuration);
        }
        
    }

    /**
     * 找到所有的fof關系
     * @throws IOException 
     */
    private static boolean runFindFof(Configuration conf) throws IOException {
        try {
            FileSystem fs = FileSystem.get(conf);
            Job job = Job.getInstance(conf);
            job.setJobName("friend");
            
            job.setJarByClass(RunJob.class);
            job.setMapperClass(FofMapper.class);
            job.setReducerClass(FofReduce.class);
            job.setMapOutputKeyClass(Fof.class);
            job.setMapOutputValueClass(IntWritable.class);
            
//            job.setJar("C:/Users/wenbr/Desktop/friend.jar");
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);
//            FileInputFormat.addInputPath(job, new Path("/usr/friend.txt"));
            FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\friend.txt"));
            
            Path path = new Path("/root/usr/fof/f1");
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job, path);            
            return job.waitForCompletion(true);
        }catch(Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    static class FofMapper extends Mapper<Text, Text, Fof, IntWritable> {
        @Override
        protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // super.map(key, value, context);
            String user = key.toString();
            String[] frieds = StringUtils.split(value.toString(), \t);

            for (int i = 0; i < frieds.length; i++) {
                String f1 = frieds[i];
                // 去掉是直接好友的, 按組輸出, 如果組中有value=0 的, 整組數據舍棄
                context.write(new Fof(user, f1), new IntWritable(0));
                for (int j = i + 1; j < frieds.length; j++) {
                    String f2 = frieds[j];

                    Fof fof = new Fof(f1, f2);
                    context.write(fof, new IntWritable(1));
                }
            }
        }
    }
    static class FofReduce extends Reducer<Fof, IntWritable, Fof, IntWritable> {
        @Override
        protected void reduce(Fof arg0, Iterable<IntWritable> arg1,
                Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2) throws IOException, InterruptedException {
            boolean flag = false;
            int sum = 0;
            for (IntWritable count : arg1) {
                // 值有0的, 整組數據舍棄
                if (count.get() == 0) {
                    flag = true;
                    break;
                } else {
                    sum += count.get();
                }
            }
            
            if (!flag) {
                arg2.write(arg0, new IntWritable(sum));
            }
        }
    }
    
    /**
     * 向用戶推薦好友
     * @param config
     */
    public static void run2(Configuration config) {
        try {
            FileSystem fileSystem = FileSystem.get(config);
            Job job = Job.getInstance(config);
            
            job.setJobName("fof2");
            
            job.setMapperClass(SortMapper.class);
            job.setReducerClass(SortReduce.class);
            job.setSortComparatorClass(FofSort.class);
            job.setGroupingComparatorClass(FofGroup.class);
            
            job.setMapOutputKeyClass(User.class);
            job.setMapOutputValueClass(User.class);
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            
            // 設置MR執行的輸入文件
            FileInputFormat.addInputPath(job, new Path("/usr/output/f1"));
            
            // 設置輸出文件, 文件不可存在
            Path path = new Path("/root/usr/fof/f2");
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
            
            FileOutputFormat.setOutputPath(job, path);
            
            boolean f = job.waitForCompletion(true);
            if (f) {
                System.out.println("job, 成功執行");
            }
            
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    static class SortMapper extends Mapper<Text, Text, User, User> {
        @Override
        protected void map(Text key, Text value, Mapper<Text, Text, User, User>.Context context)
                throws IOException, InterruptedException {
            String[] args = StringUtils.split(value.toString(), \t);
            String other = args[0];
            int friendsCount = Integer.parseInt(args[1]);
            // 輸出兩次, 同時給fof兩個用戶推薦好友
            context.write(new User(key.toString(), friendsCount), new User(other, friendsCount));
            context.write(new User(other, friendsCount), new User(key.toString(), friendsCount));
        }
    }
    static class SortReduce extends Reducer<User, User, Text, Text>{
        @Override
        protected void reduce(User arg0, Iterable<User> arg1, Reducer<User, User, Text, Text>.Context arg2)
                throws IOException, InterruptedException {
            String uname = arg0.getUname();
            StringBuilder stringBuilder = new StringBuilder();
            for (User user : arg1) {
                stringBuilder.append(user.getUname() + ": " + user.getFriedsCount());
                stringBuilder.append(", ");
            }
            arg2.write(new Text(uname), new Text(stringBuilder.toString()));
        }
    }

}

初始文檔

小明    老王    如花    林誌玲
老王    小明    鳳姐
如花    小明    李剛    鳳姐
林誌玲    小明    李剛    鳳姐    郭美美
李剛    如花    鳳姐    林誌玲
郭美美    鳳姐    林誌玲
鳳姐    如花    老王    林誌玲    郭美美

系列來自尚學堂視頻

19-hadoop-fof好友推薦