1. 程式人生 > >Spark RDD電影(根據使用者年齡段)分析——廣播機制

Spark RDD電影(根據使用者年齡段)分析——廣播機制

楔子

Spark 分析電影
使用廣播機制查詢18歲喜愛的電影Top10。主要是廣播機制的使用

思路

分為2部分:1 是廣播 符合年齡的userid ,2是 求Top10


第一部分

  1. 過濾年齡 選取其中符合年齡的userid
  2. 上述userid 抽取到list中,廣播出去

第二部分
3. 電影轉為key-value (電影id,電影名) 轉為map
4. 評分 轉為 key-value (使用者id,電影id),期間使用 第一部分的廣播變數過濾符合年齡的userid
5. 上述結果 變為 key-value (電影ID,1),然後在進行聚合操作 變為(電影id,觀看總次數),然後交換 key-value
6. 取Top10 查詢電影名列印

demo

GitHub 位置

/**
 * 年齡18歲最喜愛的電影
 * 
 * @param sparkSession
 * @param userRdd
 * @param ratRdd
 * @param moviesRdd
 */
public static void age18PopularByRDD(SparkSession sparkSession, JavaRDD<String> userRdd, JavaRDD<String> ratRdd, JavaRDD<String> moviesRdd) {
	// 使用者中挑選年齡18 歲的 key-value :(userid:age)
JavaPairRDD<String, String> userfilter = userRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[2])).filter(t -> t._2.equals("18")); List<String> userList = userfilter.map(t -> t._1).collect();// 這個集合不能add // Java建立ClassTag的方法https://blog.csdn.net/hhtop112408/article/details/78338716
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); final Broadcast<List<String>> broadcast = javaSparkContext.broadcast(userList); // step 1 電影 (電影id,電影名字) JavaPairRDD<String, String> movie_id_name = moviesRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[1])); movie_id_name.cache(); Map<String, String> movie_map = movie_id_name.collectAsMap(); // step 2 評分(使用者id,電影id) JavaPairRDD<String, String> movieFilter = ratRdd.mapToPair(t -> new Tuple2<String, String>(t.split("::")[0], t.split("::")[0])).filter(new Function<Tuple2<String, String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { return broadcast.value().contains(v1._1); } }); /* * movieFilter 變為 key-value (電影ID,1),然後在進行聚合操作 變為(電影id,觀看總次數),然後交換 key-value * 變為(總次數,電影ID) 並降序排序。再次交換key-value */ JavaPairRDD<String, Integer> movie_count_mid = movieFilter.mapToPair(t -> new Tuple2<String, Integer>(t._2, 1)).reduceByKey((a, b) -> (a + b)) .mapToPair(t -> new Tuple2<Integer, String>(t._2, t._1)).sortByKey(false).mapToPair(t -> new Tuple2<String, Integer>(t._2, t._1)); // 列印電影名字 和喜愛人數 movie_count_mid.take(10).forEach(t -> System.out.println("喜愛人數" + t._2 + " 喜愛的電影名值" + movie_map.get(t._1) + "- ")); }

結果

喜愛人數1216  喜愛的電影名值Who's That Girl? (1987)- 
喜愛人數1169  喜愛的電影名值null-  #這個值不吹毛求疵了 
喜愛人數1016  喜愛的電影名值Rudy (1993)- 
喜愛人數990  喜愛的電影名值Gridlock'd (1997)- 
喜愛人數971  喜愛的電影名值Lord of the Rings, The (1978)- 
喜愛人數964  喜愛的電影名值Excess Baggage (1997)- 
喜愛人數914  喜愛的電影名值First Kid (1996)- 
喜愛人數903  喜愛的電影名值Absent Minded Professor, The (1961)- 
喜愛人數900  喜愛的電影名值Swing Kids (1993)- 
喜愛人數882  喜愛的電影名值Playing God (1997)