Hadoop 2.6 使用MapReduce實現基於物品的推薦系統
阿新 • • 發佈:2019-02-02
一、基於物品的推薦系統
1、餘弦相似度
例如,如果兩個向量完全相同,則其夾角為0度,cos = 1
如果兩個向量相互垂直,則其夾角為90度,cos=0,此時相似度最低
2、基於物品的協同過濾推薦演算法
思想:給使用者推薦那些和他們之前喜歡的商品相似的商品
步驟:
二、輸入
將useraction.txt檔案上傳到Hadoop HDFS /input目錄下
hadoop fs -put useraction.txt /input
(如果沒有該目錄則建立之)
hadoop fs -mkdir /input
A,1,1 C,3,5 B,2,3 B,5,3 B,6,5 A,2,10 C,3,10 C,4,5 C,1,5 A,1,1 A,6,5 A,4,3
每一行代表一次使用者行為,其中第一列為使用者ID,第二列為商品ID,第三列為事件分值
三、程式碼實現
step1:
根據使用者的行為列表計算使用者-商品的評分矩陣
package hadoop2; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Step1 { /*** * input: /input/useraction.txt * userID,itemID,score * A,1,1 C,3,5 B,2,3 B,5,3 B,6,5 A,2,10 C,3,10 C,4,5 C,1,5 A,1,1 A,6,5 A,4,3 * output: * (itemID,userID_score) * ("1","A_1") ("3","C_5") ("2","B_3") ("5","B_3") ("6","B_5") ("2","A_10") ("3","C_10") ("4","C_5") ("1","C_5") ("1","A_1") ("6","A_5") ("4","A_3") * * 即map操作是將(使用者ID,物品ID,行為分值)轉為(物品ID,使用者ID,行為分值) * @author chenjie * */ public static class Mapper1 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //System.out.println("map,key=" + key + ",value=" + value.toString()); String values[] = value.toString().split(","); String userID = values[0]; String itemID = values[1]; String score = values[2]; outKey.set(itemID); outValue.set(userID + "_" + score); context.write(outKey, outValue); System.out.println("(\"" + itemID + "\",\"" + userID + "_" + score + "\")"); } } /*** * input: * itemID [userID_socre...] * ("1",["A_1","C_5","A_1"]) ("2",["A_10","B_3"]) ("3",["C_10","C_5"]) ("4",["A_3","C_5"]) ("5",["B_3"]) ("6",["A_5","B_5"]) output: itemID [userID_sumScore...] 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 即reduce操作是將(物品ID,使用者ID,行為分值)中對於物品ID和使用者ID相同的行為分值進行累加 如 ("1",["A_1","C_5","A_1"])中對於1號物品,A號使用者,1+1=2 那麼將1號物品,A號使用者,總分2分存在map中,(1,“A_2”) 同理將1號物品,C號使用者,總分5分存在map中,(1,“C_5”) ... 然後將1號物品的所有資訊輸出 key:1 value:A_2,C_5 同理將2號物品的所有資訊輸出 key:2 value:A_10,B_3 ... * @author chenjie * */ public static class Reducer1 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String itemID = key.toString(); StringBuilder log = new StringBuilder(); log.append("(\"" + itemID + "\",["); Map<String,Integer> map = new HashMap<String,Integer>(); for(Text value : values) { log.append("\"" + value + "\","); String userID = value.toString().split("_")[0]; String score = value.toString().split("_")[1]; if(map.get(userID) == null) { map.put(userID, Integer.valueOf(score)); } else { Integer preScore = map.get(userID); map.put(userID, preScore + Integer.valueOf(score)); } } if(log.toString().endsWith(",")) log.deleteCharAt(log.length()-1); log.append("])"); System.out.println(log); StringBuilder sb = new StringBuilder(); for(Map.Entry<String, Integer> entry : map.entrySet()) { String userID = entry.getKey(); String score = String.valueOf(entry.getValue()); sb.append(userID + "_" + score + ","); } String line = null; if(sb.toString().endsWith(",")) { line = sb.substring(0, sb.length()-1); } outKey.set(itemID); outValue.set(line); context.write(outKey, outValue); } } private static final String INPATH = "/input/useraction.txt";//輸入檔案路徑 private static final String OUTPATH = "/output/tuijian1";//輸出檔案路徑 private static final String HDFS = "hdfs://pc1:9000";//HDFS路徑 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); @SuppressWarnings("deprecation") Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 設定job名稱和 job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); //為job設定Mapper類 //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類 job.setReducerClass(Reducer1.class); //為job設定Reduce類 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //設定輸出key的型別 job.setOutputValueClass(Text.class);// 設定輸出value的型別 //TODO job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類 設定輸入路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類 設定輸出路徑 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step1().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } } }
step2:
計算每兩行的相似度,最終形成一個相似度矩陣
package hadoop2; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * *計算每兩行的相似度,最終形成一個相似度矩陣 * * input * itemID [userID_sumScore...] 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 同時將input拷貝一份到快取cache中,然後對input和cache的每一行就行求值 output: itemID [itemID_cos...] 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88 3 4_0.86,3_1.00,1_0.93 4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49 5 2_0.29,5_1.00,6_0.71 6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36 * @author chenjie * */ public class Step2 { /*** * input: * itemID [userID_sumScore...] 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 cache : = input output: 1 1_1.00 1 2_0.36 1 3_0.93 1 4_0.99 1 6_0.26 2 1_0.36 2 2_1.00 2 4_0.49 2 5_0.29 2 6_0.88 3 1_0.93 3 3_1.00 3 4_0.86 4 1_0.99 4 2_0.49 4 3_0.86 4 4_1.00 4 6_0.36 5 2_0.29 5 5_1.00 5 6_0.71 6 1_0.26 6 2_0.88 6 4_0.36 6 5_0.71 6 6_1.00 * @author chenjie * */ public static class Mapper2 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<String>(); private DecimalFormat df = new DecimalFormat("0.00"); /*** * 將檔案快取到記憶體中,每一行為一個字串,是所有行構成list */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileReader fr = new FileReader("itemUserScore1"); BufferedReader br = new BufferedReader(fr); String line = null; while((line = br.readLine()) != null) { cacheList.add(line); } fr.close(); br.close(); } /*** * 以 * value :1 A_2,C_5 cacheList : 2 A_10,B_3 為例 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("map,key=" + key + ",value=" + value.toString()); String[] rowAndline = value.toString().split("\t"); //獲得行號 //rowAndline : 1 A_2,C_5 String row_matrix1 = rowAndline[0]; //row_matrix1 :1 String[] column_value_array_matrix1 = rowAndline[1].split(","); //獲得各列 //rowAndline[1] : A_2,C_5 //column_value_array_matrix1 : [A_2,C_5] //|x|=sqrt(x1^2+x2^2+...) double denominator1 = 0; //定義向量1的模 for(String colunm : column_value_array_matrix1)//對於向量1的每一個分量 { String score = colunm.split("_")[1]; denominator1 += Double.valueOf(score) * Double.valueOf(score); //計算分量的平方並累加到模 } denominator1 = Math.sqrt(denominator1);//開跟號得到模 for(String line : cacheList)// 以line 2 A_10,B_3 為例 { String[] rowAndline2 = line.toString().split("\t"); //rowAndline2 : 2 A_10,B_3 String row_matrix2 = rowAndline2[0]; //row_matrix2 :2 String[] column_value_array_matrix2 = rowAndline2[1].split(","); //column_value_array_matrix2 : A_10,B_3 double denominator2 = 0;//求向量2的模 for(String colunm : column_value_array_matrix2) { String score = colunm.split("_")[1]; denominator2 += Double.valueOf(score) * Double.valueOf(score); } denominator2 = Math.sqrt(denominator2); int numerator = 0; //儲存成績累加結果 for(String column_value_matrix1 : column_value_array_matrix1)//對於向量1的每一列(分量) A_2,C_5 { String column_maxtrix1 = column_value_matrix1.split("_")[0]; //獲得使用者ID String value_matrix1 = column_value_matrix1.split("_")[1]; //獲得分數 for(String column_value_matrix2 : column_value_array_matrix2)//對於向量2的每一列(分量) A_10,B_3 { String column_maxtrix2 = column_value_matrix2.split("_")[0]; //獲得使用者ID String value_matrix2 = column_value_matrix2.split("_")[1]; //獲得分數 //如果是同一個分量 if(column_maxtrix2.equals(column_maxtrix1))//這裡也體現了為什麼要標明列號,只有列號明確且相等,才證明是同一個位置的分量 { numerator += Integer.valueOf(value_matrix1) * Integer.valueOf(value_matrix2); //numerator += 2×10 } } } double cos = numerator / (denominator1 * denominator2); //求餘弦 if(cos == 0) continue; outKey.set(row_matrix1);//輸出的key值設定為左側矩陣的行號 outValue.set(row_matrix2 + "_" + df.format(cos));//輸出的value值設定為右側轉置矩陣的行號(實際矩陣的列號)_該位置的值 context.write(outKey, outValue); System.out.println(outKey + "\t" + outValue); } } } /*** * input: * ("1",["1_1.00","2_0.36","3_0.93","4_0.99","6_0.26"]) ("2",["1_0.36","2_1.00","4_0.49","5_0.29","6_0.88"]) ("3",["4_0.86","3_1.00","1_0.93"]) ("4",["1_0.99","4_1.00","6_0.36","3_0.86","2_0.49"]) ("5",["2_0.29","5_1.00","6_0.71"]) ("6",["1_0.26","5_0.71","6_1.00","2_0.88","4_0.36"]) output: 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88 3 4_0.86,3_1.00,1_0.93 4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49 5 2_0.29,5_1.00,6_0.71 6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36 即將分量連起來 得到最終的相似度矩陣 * * @author chenjie * */ public static class Reducer2 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //System.out.println(ReduceUtils.getReduceInpt(key, values)); //只能遍歷一次? StringBuilder sb = new StringBuilder(); for(Text text : values) { sb.append(text + ","); } String line = ""; if(sb.toString().endsWith(",")) { line = sb.substring(0,sb.length()-1); } outKey.set(key); outValue.set(line); context.write(outKey, outValue); } } //private static final String INPATH = "/input/itemUserScore1.txt"; private static final String INPATH = "/output/tuijian1/part-r-00000"; private static final String OUTPATH = "/output/tuijian2"; //private static final String CACHE = "/input/itemUserScore1.txt"; private static final String CACHE = "/output/tuijian1/part-r-00000"; private static final String HDFS = "hdfs://pc1:9000"; public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); @SuppressWarnings("deprecation") Job job = new Job(conf, "step2");//Job(Configuration conf, String jobName) 設定job名稱和 job.setJarByClass(Step2.class); job.setMapperClass(Mapper2.class); //為job設定Mapper類 //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類 job.setReducerClass(Reducer2.class); //為job設定Reduce類 job.addCacheArchive(new URI(CACHE + "#itemUserScore1")); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //設定輸出key的型別 job.setOutputValueClass(Text.class);// 設定輸出value的型別 //TODO //job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類 設定輸入路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類 設定輸出路徑 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step2().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
step3:轉置推薦列表(為什麼要轉置以及如何轉置的細節請看我之前的博文)
package hadoop2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 矩陣相乘:
* 1、轉置評分矩陣
* 2、相似度矩陣 與 (轉置評分矩陣)
* 這裡進行1:轉置
*
* input:
* 1 A_2,C_5
2 A_10,B_3
3 C_15
4 A_3,C_5
5 B_3
6 A_5,B_5
output:
A 6_5,4_3,2_10,1_2
B 6_5,5_3,2_3
C 4_5,3_15,1_5
* @author chenjie
*
*/
public class Step3 {
public static class Mapper3 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
//對於每一行,以第一行為例
//key : 1
//value : "1 1_0,2_3,3_-1,4_2,5_-3"
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] rowAndline = value.toString().split("\t");
//rowAndline : {"1","1_0,2_3,3_-1,4_2,5_-3"}
String row = rowAndline[0];
//row "1"
String[] lines = rowAndline[1].split(",");
//rowAndline[1] : "1_0,2_3,3_-1,4_2,5_-3"
//lines : {"1_0","2_3","3_-1","4_2","5_-3"}
for(String line : lines)//對於每一列,以第一列為例,line "1_0"
{
String colunm = line.split("_")[0];
//colunm : 1
String valueStr = line.split("_")[1];
//valueStr : 0
outKey.set(colunm);
//將列作為行
outValue.set(row + "_" + valueStr);
//將行作為列
context.write(outKey, outValue);
// 產生(1,"1_0")
}
//迴圈結束,對於{"1_0","2_3","3_-1","4_2","5_-3"}
//產生(1,"1_0") 第一行,第一列_0 (2,"1_3") 第二行,第一列_3 (3,"1_-1") (4,"1_2")(5,"1_-3")
/*
目標轉置矩陣
0 1 1 -2
3 3 1 2
-1 5 4 -1
2 -2 -1 1
-3 -1 2 2
*/
//正好對應於轉置矩陣的第一列
}
/*
所有map操作產生
("1","1_0") ("2","1_3") ("3","1_-1") ("4","1_2") ("5","1_-3")
("1","2_1") ("2","2_3") ("3","2_5") ("4","2_-2") ("5","2_-1")
("1","3_0") ("2","3_1") ("3","3_4") ("4","3_-1") ("5","3_2")
("1","4_-2") ("2","4_2") ("3","4_-1") ("4","4_1") ("5","4_2")
*/
}
/*
Reduce任務,將map操作產生的所有鍵值對集合進行合併,生成轉置矩陣的儲存表示
key值相同的值會組成值的集合
如:
key:"1"時
values:{"3_0","1_0","4_-2","2_1"}
注意:這裡就是為什麼要進行列標號的原因,values的順序不一定就是原來矩陣列的順序
*/
public static class Reducer3 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
//sb : "3_0,1_0,4_-2,2_1,"
//注意這裡末尾有個逗號
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//去掉逗號
//line : "3_0,1_0,4_-2,2_1"
outKey.set(key);
outValue.set(line);
//("1","3_0,1_0,4_-2,2_1")
context.write(outKey, outValue);
}
}
private static final String INPATH = "hdfs://pc1:9000/output/tuijian1/part-r-00000";//輸入檔案路徑
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian3";//輸出檔案路徑
private static final String HDFS = "hdfs://pc1:9000";//HDFS路徑
public int run() throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step3");//Job(Configuration conf, String jobName) 設定job名稱和
job.setJarByClass(Step3.class);
job.setMapperClass(Mapper3.class); //為job設定Mapper類
//job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類
job.setReducerClass(Reducer3.class); //為job設定Reduce類
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(Text.class);// 設定輸出value的型別
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類 設定輸入路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類 設定輸出路徑
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;
/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/
}
public static void main(String[] args)
{
try {
new Step3().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
step4:相似度矩陣 × 轉置評分矩陣=推薦列表(矩陣乘法請看之前博文)
package hadoop2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 矩陣相乘:
* 1、轉置評分矩陣
* 2、相似度矩陣 與 (轉置評分矩陣)
* 這裡進行2:相似度矩陣 與 (轉置評分矩陣)相乘
* input:
1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
3 4_0.86,3_1.00,1_0.93
4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
5 2_0.29,5_1.00,6_0.71
6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
*
* cache:
* A 6_5,4_3,2_10,1_2
B 6_5,5_3,2_3
C 4_5,3_15,1_5
output:
1 A_9.87,B_2.38,C_23.90
2 A_16.59,B_8.27,C_4.25
3 C_23.95,A_4.44
4 B_3.27,C_22.85,A_11.68
5 A_6.45,B_7.42
6 C_3.10,A_15.40,B_9.77
如:
map
1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
×
A 6_5,4_3,2_10,1_2
=
1.00*2+0.36*10+0.99*3+0.26*5
=9.87
生成(1,A_9.9)
reduce 將所有的合併生成推薦列表
* @author chenjie
*
*/
public class Step4 {
public static class Mapper4 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
private DecimalFormat df = new DecimalFormat("0.00");
/***
* 將儲存右側矩陣的檔案快取到記憶體中,每一行為一個字串,是所有行構成list
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileReader fr = new FileReader("myfile");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line = br.readLine()) != null)
{
cacheList.add(line);
System.out.println("----------------------cache line :" + line);
}
fr.close();
br.close();
}
/* 左側矩陣邏輯形式
* 1 2 -2 0
* 3 3 4 -3
* -2 0 2 3
* 5 3 -1 2
* -4 2 0 2
* 左側矩陣物理形式
* 1 1_1,2_2,3_-2,4_0
* 2 1_3,2_3,3_4,4_-3
* 3 1_-2,2_0,3_2,4_3
* 4 1_5,2_3,3_-1,4_2
* 5 1_-4,2_2,3_0,4_2
*
* 右側矩陣(已轉置)物理形式
* 1 3_0,1_0,4_-2,2_1
2 3_1,4_2,2_3,1_3
3 4_-1,1_-1,3_4,2_5
4 1_2,3_-1,4_1,2_-2
5 4_2,3_2,1_-3,2_-1
key: "1"
value: "1 1_1,2_2,3_-2,4_0"
* */
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("-------------------map,key=" + key + "value=" + value);
String[] rowAndline = value.toString().split("\t");
//獲得行號
//rowAndline : {"1","1_1,2_2,3_-2,4_0"}
String row_matrix1 = rowAndline[0];
//row_matrix1 :"1"
String[] column_value_array_matrix1 = rowAndline[1].split(",");
//獲得各列
//rowAndline[1] : "1_1,2_2,3_-2,4_0"
//column_value_array_matrix1 : {"1_1","2_2","3_-2","4_0"}
for(String line : cacheList)// 以line:"3 4_-1,1_-1,3_4,2_5"為例
{
String[] rowAndline2 = line.toString().split("\t");
//rowAndline2 : {"3","4_-1,1_-1,3_4,2_5"}
String row_matrix2 = rowAndline2[0];
//獲得轉置矩陣line行的行號(原右矩陣的列號)
String[] column_value_array_matrix2 = rowAndline2[1].split(",");
//rowAndline2[1] : "4_-1,1_-1,3_4,2_5"
//column_value_array_matrix2 : {"4_-1","1,-1","3_4","2_5"}
double result = 0;
//儲存成績累加結果
for(String column_value_matrix1 : column_value_array_matrix1)//對於左側矩陣line行的每一列(分量) "1_1","2_2","3_-2","4_0"
{
String column_maxtrix1 = column_value_matrix1.split("_")[0];
//獲得列號
String value_matrix1 = column_value_matrix1.split("_")[1];
//獲得該列的值
for(String column_value_matrix2 : column_value_array_matrix2)//對於右側矩陣的line行的每一列(分量) "4_-1","1,-1","3_4","2_5"
{
String column_maxtrix2 = column_value_matrix2.split("_")[0];
//獲得列號
String value_matrix2 = column_value_matrix2.split("_")[1];
//獲得該列的值
if(column_maxtrix2.equals(column_maxtrix1))//這裡也體現了為什麼要標明列號,只有列號明確且相等,才證明是同一個位置的分量
{
result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2);
//result += 1 * (-1)
//result += 2 * 5
//result += -2 * 4
//result += 0 * (-1)
}
}
}
if(result == 0)
continue;
outKey.set(row_matrix1);//輸出的key值設定為左側矩陣的行號
outValue.set(row_matrix2 + "_" +df.format(result));//輸出的value值設定為右側轉置矩陣的行號(實際矩陣的列號)_該位置的值
context.write(outKey, outValue);
//("1","3_1")
}
//("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
//("2","1_9")...
//....
}
}
public static class Reducer4 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
/**
* 將map產生的key-value對進行組合,拼接成結果矩陣的物理形式
* ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* ("2","1_9")...
* ...
* 對於key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* 會將其組合
* key : "1"
* values : {"2_7","3_1","2_4","4_0","5_9"}
*
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
// sb : "2_7,3_1,2_4,4_0,5_9,"
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//line :"2_7,3_1,2_4,4_0,5_9"
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
// ("1","2_7,3_1,2_4,4_0,5_9")
}
}
private static final String INPATH = "hdfs://pc1:9000/output/tuijian2/part-r-00000";
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian4";
private static final String CACHE = "hdfs://pc1:9000/output/tuijian3/part-r-00000";
private static final String HDFS = "hdfs://pc1:9000";
public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 設定job名稱和
job.setJarByClass(Step4.class);
job.setMapperClass(Mapper4.class); //為job設定Mapper類
//job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類
job.setReducerClass(Reducer4.class); //為job設定Reduce類
job.addCacheArchive(new URI(CACHE + "#myfile"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(Text.class);// 設定輸出value的型別
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類 設定輸入路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類 設定輸出路徑
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;
/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/
}
public static void main(String[] args)
{
try {
new Step4().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
5、推薦列表去重
package hadoop2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/***
* 去掉推薦列表中,使用者已經操作過的商品,例如使用者A已經購買過iphone7,則將iphone7從推薦列表中刪除
* input:相似度矩陣
* 1 A_9.87,B_2.38,C_23.90
2 A_16.59,B_8.27,C_4.25
3 C_23.95,A_4.44
4 B_3.27,C_22.85,A_11.68
5 A_6.45,B_7.42
6 C_3.10,A_15.40,B_9.77
* cache:操作記錄
* 1 A_2,C_5
2 A_10,B_3
3 C_15
4 A_3,C_5
5 B_3
6 A_5,B_5
map:
例如
1商品的推薦列表:1 A_9.87,B_2.38,C_23.90
1商品的操作記錄:1 A_2,C_5
則對於1商品,由於A已經有2分,C已經右5分
應該把A和C從1的推薦列表中刪除,
只保留B
而最終是要根據使用者來推薦商品,於是將使用者作為key,物品和推薦度作為value返回
(B,1_2.38)
reduce:
將同一使用者推薦的商品合併輸出
output:
A 5_6.45,3_4.44
B 4_3.27,1_2.38
C 6_3.10,2_4.25
* @author chenjie
*
*/
public class Step5 {
public static class Mapper5 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
private DecimalFormat df = new DecimalFormat("0.00");
/***
* 將儲存右側矩陣的檔案快取到記憶體中,每一行為一個字串,是所有行構成list
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileReader fr = new FileReader("itemUserScore3");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line = br.readLine()) != null)
{
cacheList.add(line);
System.out.println("----------------------cache line :" + line);
}
fr.close();
br.close();
}
/**
* 以
* 1商品的推薦列表:1 A_9.87,B_2.38,C_23.90
1商品的操作記錄:1 A_2,C_5
為例
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
System.out.println("-------------------map,key=" + key + "value=" + value);
String item_matrix1 = value.toString().split("\t")[0];
//推薦列表商品號 1
String[] user_score_array_matrix1 = value.toString().split("\t")[1].split(",");
//推薦列表 A_9.87,B_2.38,C_23.90
for(String line : cacheList)//商品的操作記錄列表
{
String item_matrix2 = line.toString().split("\t")[0];
//操作記錄商品號 1
String[] user_score_array_matrix2 = line.toString().split("\t")[1].split(",");
//操作記錄 A_2,C_5
if(item_matrix1.equals(item_matrix2))//如果推薦列表商品號==操作記錄商品號,證明是同一商品,才能操作
{
for(String user_score : user_score_array_matrix1)//對於推薦列表中每一個使用者 A_9.87,B_2.38,C_23.90
{
boolean flag = false;//預設操作過標誌位
String user_matrix1 = user_score.split("_")[0];
//使用者ID
String score_matrix1 = user_score.split("_")[1];
//推薦度
for(String user_score2 : user_score_array_matrix2)//對於操作記錄中的每一條記錄 A_2,C_5
{
String user_matrix2 = user_score2.split("_")[0];
//使用者ID
if(user_matrix1.equals(user_matrix2))//如果兩個ID相等 如A_9.87 和A_2 則證明使用者A操作過該商品
{
flag = true;
}
}
if(flag == false)//如果使用者A沒有操作過該物品
{
outKey.set(user_matrix1);//將使用者ID作為Key
outValue.set(item_matrix1 + "_" +score_matrix1 );//將商品ID_推薦度作為value
context.write(outKey, outValue);//寫入結果集
}
}
}
}
}
}
public static class Reducer5 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
/**
* 將map產生的key-value對進行組合,拼接成結果矩陣的物理形式
* ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* ("2","1_9")...
* ...
* 對於key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* 會將其組合
* key : "1"
* values : {"2_7","3_1","2_4","4_0","5_9"}
*
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
// sb : "2_7,3_1,2_4,4_0,5_9,"
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//line :"2_7,3_1,2_4,4_0,5_9"
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
// ("1","2_7,3_1,2_4,4_0,5_9")
}
}
private static final String INPATH = "hdfs://pc1:9000/output/tuijian4/part-r-00000";
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian5";
private static final String CACHE = "hdfs://pc1:9000/output/tuijian1/part-r-00000";
private static final String HDFS = "hdfs://pc1:9000";
public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 設定job名稱和
job.setJarByClass(Step5.class);
job.setMapperClass(Mapper5.class); //為job設定Mapper類
//job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類
job.setReducerClass(Reducer5.class); //為job設定Reduce類
job.addCacheArchive(new URI(CACHE + "#itemUserScore3"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(Text.class);// 設定輸出value的型別
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類 設定輸入路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類 設定輸出路徑
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;
/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/
}
public static void main(String[] args)
{
try {
new Step5().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
6、排程程式
每個程式可單獨執行檢視結果,也可序列
package hadoop2;
import java.io.IOException;
import java.net.URISyntaxException;
public class JobRunner {
public static void main(String[] args) {
int result1 = -1;
int result2 = -1;
int result3 = -1;
int result4 = -1;
int result5 = -1;
try {
result1 = new Step1().run();
}
catch (Exception e) {
result1 = -1;
}
if(result1 == 1)
{
System.out.println("Step1 run success");
try {
result2 = new Step2().run();
} catch (ClassNotFoundException | IOException | InterruptedException | URISyntaxException e) {
result2 = -1;
}
}
else
{
System.out.println("Step1 run failed");
}
if(result2 == 1)
{
System.out.println("Step2 run success");
try {
result3 = new Step3().run();
} catch (Exception e) {
result3 = -1;
}
}
else
{
System.out.println("Step2 run failed");
}
if(result3 == 1)
{
System.out.println("Step3 run success");
try {
result4 = new Step4().run();
} catch (Exception e) {
result4 = -1;
}
}
else
{
System.out.println("Step3 run failed");
}
if(result4 == 1)
{
System.out.println("Step4 run success");
try {
result5 = new Step5().run();
} catch (Exception e) {
result5 = -1;
}
}
else
{
System.out.println("Step4 run failed");
}
if(result5 == 1)
{
System.out.println("Step5 run success");
System.out.println("job finished ");
}
else
{
System.out.println("Step5 run failed");
}
}
}
7、各步的結果截圖