Spark RDD電影(根據使用者年齡段)分析——廣播機制
阿新 • • 發佈:2019-01-13
楔子
Spark 分析電影
使用廣播機制查詢18歲喜愛的電影Top10。主要是廣播機制的使用
思路
分為2部分:1 是廣播 符合年齡的userid ,2是 求Top10
第一部分
- 過濾年齡 選取其中符合年齡的userid
- 上述userid 抽取到list中,廣播出去
第二部分
3. 電影轉為key-value (電影id,電影名) 轉為map
4. 評分 轉為 key-value (使用者id,電影id),期間使用 第一部分的廣播變數過濾符合年齡的userid
5. 上述結果 變為 key-value (電影ID,1),然後在進行聚合操作 變為(電影id,觀看總次數),然後交換 key-value
6. 取Top10 查詢電影名列印
demo
/**
* 年齡18歲最喜愛的電影
*
* @param sparkSession
* @param userRdd
* @param ratRdd
* @param moviesRdd
*/
public static void age18PopularByRDD(SparkSession sparkSession, JavaRDD<String> userRdd, JavaRDD<String> ratRdd, JavaRDD<String> moviesRdd) {
// 使用者中挑選年齡18 歲的 key-value :(userid:age)
JavaPairRDD<String, String> userfilter = userRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[2])).filter(t -> t._2.equals("18"));
List<String> userList = userfilter.map(t -> t._1).collect();// 這個集合不能add
// Java建立ClassTag的方法https://blog.csdn.net/hhtop112408/article/details/78338716
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
final Broadcast<List<String>> broadcast = javaSparkContext.broadcast(userList);
// step 1 電影 (電影id,電影名字)
JavaPairRDD<String, String> movie_id_name = moviesRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[1]));
movie_id_name.cache();
Map<String, String> movie_map = movie_id_name.collectAsMap();
// step 2 評分(使用者id,電影id)
JavaPairRDD<String, String> movieFilter = ratRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[0])).filter(new Function<Tuple2<String, String>, Boolean>() {
@Override
public Boolean call(Tuple2<String, String> v1) throws Exception {
return broadcast.value().contains(v1._1);
}
});
/*
* movieFilter 變為 key-value (電影ID,1),然後在進行聚合操作 變為(電影id,觀看總次數),然後交換 key-value
* 變為(總次數,電影ID) 並降序排序。再次交換key-value
*/
JavaPairRDD<String, Integer> movie_count_mid = movieFilter.mapToPair(t -> new Tuple2<String, Integer>(t._2, 1)).reduceByKey((a, b) -> (a + b))
.mapToPair(t -> new Tuple2<Integer, String>(t._2, t._1)).sortByKey(false).mapToPair(t -> new Tuple2<String, Integer>(t._2, t._1));
// 列印電影名字 和喜愛人數
movie_count_mid.take(10).forEach(t -> System.out.println("喜愛人數" + t._2 + " 喜愛的電影名值" + movie_map.get(t._1) + "- "));
}
結果
喜愛人數1216 喜愛的電影名值Who's That Girl? (1987)-
喜愛人數1169 喜愛的電影名值null- #這個值不吹毛求疵了
喜愛人數1016 喜愛的電影名值Rudy (1993)-
喜愛人數990 喜愛的電影名值Gridlock'd (1997)-
喜愛人數971 喜愛的電影名值Lord of the Rings, The (1978)-
喜愛人數964 喜愛的電影名值Excess Baggage (1997)-
喜愛人數914 喜愛的電影名值First Kid (1996)-
喜愛人數903 喜愛的電影名值Absent Minded Professor, The (1961)-
喜愛人數900 喜愛的電影名值Swing Kids (1993)-
喜愛人數882 喜愛的電影名值Playing God (1997)