1. 程式人生 > >基於ALS演算法電影推薦(java版)

基於ALS演算法電影推薦(java版)

基於ALS演算法的最佳電影推薦(java版)

package spark;

import java.util.Arrays;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark
.api.java.JavaSparkContext; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; public class SparkALSDemo { public static void main(String ... args) throws Exception { Logger logger = Logger.getLogger
(SparkALSDemo.class); // 設定日誌的等級 並關閉jetty容器的日誌 Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF); // 設定執行環境,並建立SparkContext SparkConf sparkConf = new SparkConf().setAppName("MovieLensALS"
); sparkConf.setMaster("local[4]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // 裝載樣本評分資料,並按照Timestamp模10的分為10份 String movielensHomeDir = "F:/ml-1m"; JavaRDD<Tuple2<Long, Rating>> ratings = jsc.textFile(movielensHomeDir + "/ratings.dat").map( line -> { String[] fields = line.split("::"); return new Tuple2<Long, Rating>(Long.parseLong(fields[3]) % 10, new Rating(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Double.parseDouble(fields[2]))); }); // 裝載使用者評分,該評分由評分器生成(即生成檔案personalRatings.txt) JavaRDD<String> data = jsc.textFile("F:/ml-1m/personalRatings.txt"); JavaRDD<Rating> myRatingsRDD = data.map(s -> { String[] sarray = s.split("::"); return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2])); }); // 統計樣本資料中的評分概要 logger.info("Got " + ratings.count() + " ratings from " + ratings.map(tupe -> tupe._2.user()).distinct().count() + " users " + ratings.map(tupe -> tupe._2.product()).distinct().count() + " movies"); // 用於訓練是rating中key=[0-5]的資料 JavaRDD<Rating> training = ratings.filter(x -> x._1 < 6).map(tupe2 -> tupe2._2).union(myRatingsRDD) .repartition(4).persist(StorageLevel.MEMORY_ONLY()); // 用於校驗是rating中key=[6-7]的資料 JavaRDD<Rating> validation = ratings.filter(x -> x._1 >= 6 && x._1 < 8).map(tupe2 -> tupe2._2).repartition(4) .persist(StorageLevel.MEMORY_ONLY()); // 用於測試的是rating中key=[8-9]的資料 JavaRDD<Rating> test = ratings.filter(x -> x._1 >= 8).map(tupe2 -> tupe2._2).persist(StorageLevel.MEMORY_ONLY()); logger.info("Training: " + training.count() + " validation: " + validation.count() + " test: " + test.count()); // 定義不同的引數。計算均方根誤差值,找到均方根誤差值最小的模型。即:最優模型 List<Integer> ranks = (List<Integer>)Arrays.asList(8, 10, 12); List<Double> lambdas = (List<Double>)Arrays.asList(0.1, 2.5, 5.0); List<Integer> numIters = (List<Integer>)Arrays.asList(10, 15, 20); MatrixFactorizationModel bestModel = null; double bestValidationRmse = Double.MAX_VALUE; int bestRank = 0; double bestLambda = -1.0; int bestNumIter = -1; for (int i = 0; i < ranks.size(); i++) { MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(training), ranks.get(i), numIters.get(i), lambdas.get(i)); double validationRmse = SparkALSDemo.computeRMSEAverage(model, validation, validation.count()); if (validationRmse < bestValidationRmse) { bestModel = model; bestValidationRmse = validationRmse; bestRank = ranks.get(i); bestLambda = lambdas.get(i); bestNumIter = numIters.get(i); } } double testRmse = SparkALSDemo.computeRMSEAverage(bestModel, test, test.count()); logger.info("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + "."); // 建立一個基準資料集,該資料集是訓練資料集[training]與校驗資料集[validation]的交集.最優模型就是從這個基礎資料集計算得來的 JavaRDD<Double> rdd = training.union(validation).map(d -> d.rating()); double meanRating = rdd.reduce((a, b) -> a + b) / rdd.count(); double baselineRmse = Math.sqrt(test.map(x -> (meanRating - x.rating()) * (meanRating - x.rating())).reduce((a1, a2) -> a1 + a2)/ test.count()); double improvement = (baselineRmse - testRmse) / baselineRmse * 100; logger.info("The best model improves the baseline by " + String.format("%1.2f", improvement) + "%."); // 載入電影資料 JavaRDD<Tuple2<Integer, String>> movies = jsc.textFile(movielensHomeDir + "/movies.dat").map(line -> { String[] fields = line.split("::"); return new Tuple2<Integer, String>(Integer.parseInt(fields[0]), fields[1]); }); //將使用者已經評過分的資料濾掉 List<Integer> myRatedMovieIds = myRatingsRDD.map(d -> d.product()).collect(); JavaRDD<Integer> candidates = movies.map(s -> s._1).filter(m -> !myRatedMovieIds.contains(m)); //預測使用者100最喜歡的10部電影 JavaRDD<Rating> rr = bestModel.predict(JavaPairRDD.fromJavaRDD(candidates.map(d -> new Tuple2<Integer, Integer>(100, d)))).sortBy(f->f.rating(), false, 4); logger.info("Movies recommended for you:"); rr.take(10).forEach(a -> logger.info("使用者" + a.user() + "-[ " + a.product() + "]-[" + a.rating() + "]")); //jsc.stop(); } /** * 根據模型model計算data的平均均方根誤差 * * @param model * @param data * @param n * @return */ public static double computeRMSEAverage(MatrixFactorizationModel model, JavaRDD<Rating> data, long n) { JavaRDD<Rating> jddRat = model.predict(JavaPairRDD.fromJavaRDD(data.map(d -> new Tuple2<Integer, Integer>(d.user(), d .product())))); JavaPairRDD<String, Double> pre = JavaPairRDD.fromJavaRDD(jddRat.map(f -> new Tuple2<String, Double>(f.user() + "_" + f.product(), f.rating()))); JavaPairRDD<String, Double> rea = JavaPairRDD.fromJavaRDD(data.map(f -> new Tuple2<String, Double>(f.user() + "_" + f.product(), f.rating()))); // 相當於SQl中的內聯 JavaRDD<Tuple2<Double, Double>> d = pre.join(rea).values(); return d.map(f -> Math.pow(f._1 - f._2, 2)).reduce((a, b) -> a + b) / n; } }