1. 程式人生 > >Spark RDD分析各種型別的最喜愛電影TopN技巧

Spark RDD分析各種型別的最喜愛電影TopN技巧

楔子

學習《spark大資料商業實戰》第12章節
通過RDD分析大資料電影點評系統各種型別的電影最喜愛電影TopN。本次分析最受男性(女性)喜愛的電影Top10

裡面複用了Spark RDD實現電影流行度分析

思路

  1. 因為要使用電影資料RDD,所以複用了Spark RDD實現電影流行度分析
  2. 根據性別過濾資料
  3. 要進行join 需要key-values
  4. join之後的資料(2828,((3793,3),M)) -->(使用者(電影id,評分)性別) 轉換為Spark RDD實現電影流行度分析需要的格式 (電影,評分,人數) --eg (MovieID,(Rating,1))

demo lambda方式

/**
 * 2:最受男性歡迎的電影 和最受女性歡迎的電影(RDD方式)使用lambda簡化
 * 
 * @param userDF
 * @param ratDF
 */
public static void popularByRDDSimpleness(SparkSession sparkSession, JavaRDD<String> userRdd, JavaRDD<String> ratRdd) {
	System.out.println("男性喜愛的10個電影 ByRDD");
	System.out.println(
new DateTime().toString("yyyy-MMM-dd HH:mm:ss:SSS")); // UserID::Gender JavaPairRDD<String, String> user_gender = userRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[1])); user_gender.cache(); // 評分變為 userid:(電影id,評分) JavaPairRDD<String, Tuple2<String,
Long>
> user_movie_rat = ratRdd .mapToPair(t -> new Tuple2<String, Tuple2<String, Long>>(t.split("::")[0], new Tuple2<String, Long>(t.split("::")[1], Long.valueOf(t.split("::")[2])))); user_movie_rat.cache(); JavaPairRDD<String, Tuple2<Tuple2<String, Long>, String>> user_pairRdd = user_movie_rat.join(user_gender); // user_pairRdd.take(10).forEach(t -> System.out.println(t)); // (2828,((3793,3),M)) // (2828,((2997,5),M)) // 從裡面過濾男性 JavaPairRDD<String, Tuple2<Tuple2<String, Long>, String>> filter = user_pairRdd.filter(t -> t._2._2.equals("M")); // 將上述過濾之後的結果 (userid,(電影id,評分),性別) 從新構造成 (MovieID,(Rating,1)) JavaPairRDD<String, Tuple2<Long, Long>> mapToPair = filter.mapToPair(t -> new Tuple2<String, Tuple2<Long, Long>>(t._2._1._1, new Tuple2<Long, Long>(t._2._1._2, 1L))); /** * 1 所有電影中平均得分最高的Top10電影 */ // step 1 把資料變為key-value ,eg (MovieID,(Rating,1)) mapToPair.cache(); // step 2 通過reduceByKey 彙總,key是MovieID,但是values是(評分總和,點評人數合計) JavaPairRDD<String, Tuple2<Long, Long>> reduceByKey = mapToPair.reduceByKey((a, b) -> new Tuple2<Long, Long>(a._1 + b._1, a._2 + b._2)); // step 3 sortByKey(false) 倒序排列 JavaPairRDD<Double, String> result = reduceByKey.mapToPair(v1 -> new Tuple2<Double, String>((v1._2._1 * 0.1 / v1._2._2), v1._1)); result.sortByKey(false).take(10).forEach(t -> System.out.println(t)); System.out.println(new DateTime().toString("yyyy-MMM-dd HH:mm:ss:SSS")); }

demo SparkSQL方式

/**
 * 2:最受男性歡迎的電影 和最受女性歡迎的電影(Sql方式)
 * 
 * @param userDF
 * @param ratDF
 */
public static void popularBySql(SparkSession sparkSession, Dataset<Row> userDF, Dataset<Row> ratDF) {
	System.out.println("男性喜愛的10個電影 BySQL");
	System.out.println(new DateTime().toString("yyyy-MMM-dd HH:mm:ss:SSS"));// 2019-一月-03 20:10:05:305
	userDF.createOrReplaceTempView("t_user");
	ratDF.createOrReplaceTempView("t_rat");
	// 選擇評論中是男性的評分
	Dataset<Row> sql = sparkSession.sql("select avg(rat) rat_avg ,MovieID from (" //
			+ "select r.* from t_rat r , t_user u where u.Gender='M' AND U.UserID = r.UserID )" + //
			"group by MovieID order by rat_avg desc limit 10");

	sql.show();
	System.out.println(new DateTime().toString("yyyy-MMM-dd HH:mm:ss:SSS"));
}

spark RDD方式

GitHub位置 方法是popularByRDD 程式碼太長 此處不羅列

對比RDD和SparkSQL

對比不是那麼充分,僅作為一個參考。由下圖發現 RDD方式 所需要的時間會短一點
在這裡插入圖片描述