1. 程式人生 > >Spark RDD實現電影流行度分析

Spark RDD實現電影流行度分析

楔子

學習《spark大資料商業實戰》第12章節,統計所有電影平均得分最高的前10部電影

資料說明

資料下載 CSDN位置
或者在此處下載

詳細github程式碼

1:使用者檔案users.dat---------------------------------------
UserID::Gender::Age::OccupationID::Zip-code
使用者id   性別M是男性 年齡  職業     郵編

2:ratings.dat---------------------------------------------
UserID::MovieID::Rating::Timestamp
使用者ID   電影id評分資料 時間戳
----------------------------------------------------------
3:movies.dat
MovieID::Title::Genres
電影ID 電影名    電影型別

4:職業Occupation.dat--------------------------------------
OccupationID::OccupationName
職業id  職業
推薦系統常用資料集
https://www.cnblogs.com/shenxiaolin/p/8337913.html

思路

分為3步驟:

  1. 把資料變為key-value ,eg (MovieID,(Rating,1))
  2. 通過reduceByKey 彙總,key是MovieID,但是values是(評分總和,點評人數合計) (此處是之前不曾遇到的思路)
  3. sortByKey(false) 倒序排列,在通過take取出前10位

demo

RDD方式

/**
 * 1:RDD實現電影流行度 (1):所有電影中平均得分最高的Top10電影
 * 
 */

private static void rddForMovieTop10(JavaRDD<String> ratRdd)
{ // step 1 把資料變為key-value ,eg (MovieID,(Rating,1)) JavaPairRDD<String, Tuple2<Long, Long>> mapToPair = ratRdd.mapToPair(new PairFunction<String, String, Tuple2<Long, Long>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Tuple2<
Long, Long>
> call(String t) throws Exception { String[] split = t.split("::");// UserID::MovieID::Rating::Timestamp Tuple2<Long, Long> tuple2 = new Tuple2<Long, Long>(Long.valueOf(split[2]), 1L); return new Tuple2<String, Tuple2<Long, Long>>(split[1], tuple2); } }); // step 2 通過reduceByKey 彙總,key是MovieID,但是values是(評分總和,點評人數合計) JavaPairRDD<String, Tuple2<Long, Long>> reduceByKey = mapToPair.reduceByKey(new Function2<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<Long, Long> v1, Tuple2<Long, Long> v2) throws Exception { return new Tuple2<Long, Long>(v1._1 + v2._1, v1._2 + v2._2); } }); // step 3 sortByKey(false) 倒序排列 JavaPairRDD<Double, String> result = reduceByKey.mapToPair(new PairFunction<Tuple2<String, Tuple2<Long, Long>>, Double, String>() { @Override public Tuple2<Double, String> call(Tuple2<String, Tuple2<Long, Long>> v1) throws Exception { // TODO 都是整數 做除法還是整數 double avg = v1._2._1 * 0.1 / v1._2._2; return new Tuple2<Double, String>(avg, v1._1); } }); System.out.println("所有電影中平均得分最高的Top10電影 RDD方式"); result.sortByKey(false).take(10).forEach(t -> System.out.println(t)); }

SparkSQL方式

/**
 * 1:RDD實現電影流行度 (1):所有電影中平均得分最高的Top10電影Bysql
 * 
 */
private static void rddForMovieTop10Bysql(Dataset<Row> ratDF) {
	ratDF.createOrReplaceTempView("t_rat");
	Dataset<Row> sql = sparkSession.sql("select * from ("//
			+ "select avg(rat) rat_avg ,MovieID  from t_rat group by MovieID order by rat_avg desc" //
			+ " ) limit 20");
	System.out.println("所有電影中平均得分最高的Top20電影 SQL方式");
	sql.show();

}

結果如下

在這裡插入圖片描述
在這裡插入圖片描述