Spark RDD實現電影流行度分析
阿新 • • 發佈:2019-01-13
楔子
學習《spark大資料商業實戰》第12章節,統計所有電影平均得分最高的前10部電影
資料說明
1:使用者檔案users.dat--------------------------------------- UserID::Gender::Age::OccupationID::Zip-code 使用者id 性別M是男性 年齡 職業 郵編 2:ratings.dat--------------------------------------------- UserID::MovieID::Rating::Timestamp 使用者ID 電影id評分資料 時間戳 ---------------------------------------------------------- 3:movies.dat MovieID::Title::Genres 電影ID 電影名 電影型別 4:職業Occupation.dat-------------------------------------- OccupationID::OccupationName 職業id 職業 推薦系統常用資料集 https://www.cnblogs.com/shenxiaolin/p/8337913.html
思路
分為3步驟:
- 把資料變為key-value ,eg (MovieID,(Rating,1))
- 通過reduceByKey 彙總,key是MovieID,但是values是(評分總和,點評人數合計) (此處是之前不曾遇到的思路)
- sortByKey(false) 倒序排列,在通過take取出前10位
demo
RDD方式
/**
* 1:RDD實現電影流行度 (1):所有電影中平均得分最高的Top10電影
*
*/
private static void rddForMovieTop10(JavaRDD<String> ratRdd) {
// step 1 把資料變為key-value ,eg (MovieID,(Rating,1))
JavaPairRDD<String, Tuple2<Long, Long>> mapToPair = ratRdd.mapToPair(new PairFunction<String, String, Tuple2<Long, Long>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2< Long, Long>> call(String t) throws Exception {
String[] split = t.split("::");// UserID::MovieID::Rating::Timestamp
Tuple2<Long, Long> tuple2 = new Tuple2<Long, Long>(Long.valueOf(split[2]), 1L);
return new Tuple2<String, Tuple2<Long, Long>>(split[1], tuple2);
}
});
// step 2 通過reduceByKey 彙總,key是MovieID,但是values是(評分總和,點評人數合計)
JavaPairRDD<String, Tuple2<Long, Long>> reduceByKey = mapToPair.reduceByKey(new Function2<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> v1, Tuple2<Long, Long> v2) throws Exception {
return new Tuple2<Long, Long>(v1._1 + v2._1, v1._2 + v2._2);
}
});
// step 3 sortByKey(false) 倒序排列
JavaPairRDD<Double, String> result = reduceByKey.mapToPair(new PairFunction<Tuple2<String, Tuple2<Long, Long>>, Double, String>() {
@Override
public Tuple2<Double, String> call(Tuple2<String, Tuple2<Long, Long>> v1) throws Exception {
// TODO 都是整數 做除法還是整數
double avg = v1._2._1 * 0.1 / v1._2._2;
return new Tuple2<Double, String>(avg, v1._1);
}
});
System.out.println("所有電影中平均得分最高的Top10電影 RDD方式");
result.sortByKey(false).take(10).forEach(t -> System.out.println(t));
}
SparkSQL方式
/**
* 1:RDD實現電影流行度 (1):所有電影中平均得分最高的Top10電影Bysql
*
*/
private static void rddForMovieTop10Bysql(Dataset<Row> ratDF) {
ratDF.createOrReplaceTempView("t_rat");
Dataset<Row> sql = sparkSession.sql("select * from ("//
+ "select avg(rat) rat_avg ,MovieID from t_rat group by MovieID order by rat_avg desc" //
+ " ) limit 20");
System.out.println("所有電影中平均得分最高的Top20電影 SQL方式");
sql.show();
}
結果如下