spark電影推薦系統的簡單測試
阿新 • • 發佈:2019-02-07
object Movie_Users_Analyzer_RDD { Logger.getLogger("org").setLevel(Level.ERROR); var masterUrl = "local[4]" var dataPath = "E:\\work_node\\資料\\spark_data\\ml-1m\\" //資料存放的目錄 def main(args: Array[String]): Unit = { if(args.length>0){ masterUrl = args(0) }else if(args.length>1){ dataPath= args(1) } val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer")) val usersRDD = sc.textFile(dataPath+"users.dat") val moviesRDD = sc.textFile(dataPath+"movies.dat") val occupationsRDD= sc.textFile(dataPath+"occupations.dat") val ratingsRDD= sc.textFile(dataPath+"ratings.dat") /** * *電影 點評 系統 使用者 行為 分析 之一: 分析 具體 某部 電影 觀看 的 使用者 資訊, * 如 電影 ID 為 *1193 的 使用者 資訊( 使用者 的 ID、 Age、 Gender、 Occupation) */ val usersBasic:RDD[(String,(String,String,String))] = usersRDD.map(_.split("::")).map{user=>( //UserID::Gender::Age::Occupation user(3),(user(0),user(1),user(2)) )} for (elem<- usersBasic.collect().take(2)){ println("usersBasicRDD (職業ID,(使用者ID,性別,年齡)):"+elem) } val occupations:RDD[(String,String)] = occupationsRDD.map(_.split("::")).map(job=>(job(0),job(1))) for (elem <- occupations.collect().take(2)){ println("occupations (職業ID,職業名):"+elem) } val userInformation:RDD[(String,((String,String,String),String))] = usersBasic.join(occupations) userInformation.cache() for(elem<- userInformation.collect().take(2)){ println("userInformation (職業ID,(使用者ID,性別,年齡),職業名):"+elem) } val targetMovie:RDD[(String,String)] = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1))).filter(_._2.equals("1193")) println("------") for (elem<-targetMovie.collect().take(2)){ println("targetMovie (使用者ID,電影ID):"+elem) } val targetUsers:RDD[(String,((String,String,String),String))] = userInformation.map(x=>(x._2._1._1,x._2)) for(elem<-targetUsers.collect().take(2)){ println("targetUsers (使用者ID,((使用者ID,性別,年齡),職業名)):"+elem) } println("電影點評系統使用者行為分析, 統計觀看電影ID為 1193 的電影使用者資訊: 使用者的 ID、 性別、 年齡、 職業名") val userInformationForSpecificMovie:RDD[(String,(String,((String,String,String),String)))] = targetMovie.join(targetUsers) for(elem<-userInformationForSpecificMovie.collect().take(10)){ println("userInformationForSpecificMovie (使用者ID,(電影ID,(使用者ID,性別,年齡),職業名)):"+elem) } println("-------------------------------------") println("所有電影中口碑最好的電影:") val ratings = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2))).cache() //格式化出電影ID和評分 ratings.map(x=>(x._2,(x._3.toDouble,1))) //格式化成K-V .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))//對 Value 進行 reduce 操作, 分別 得出 每部 電影 的 總的 評分 和 總的 點評 人數 .map(x=>(x._2._1.toDouble/x._2._2,x._1)) //求出電影的平均分 .sortByKey(false) //降序排序 .take(10) //取TOP10 .foreach(println) //迴圈輸出 val male = "M" val famale = "F" val genderRatings = ratings.map(x=>(x._1,(x._1,x._2,x._3))).join(usersRDD.map(_.split("::")).map(x=>(x(0),x(1))).cache()) genderRatings.take(2).foreach(println) val maleFliteredRatings:RDD[(String,String,String)] = genderRatings.filter(x=>x._2._2.equals("M")).map(x=>x._2._1) val femaleFliteredRatings:RDD[(String,String,String)] = genderRatings.filter(x=>x._2._2.equals("F")).map(x=>x._2._1) println("所有電影中最受男性喜愛的電影Top10:") maleFliteredRatings.map(x=>(x._2,(x._3.toDouble,1))) //格式化成k-v值 .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) //對 Value 進行 reduce 操作, 分別 得出 每部 電影 的 總的 評分 和 總的 點評 人數 .map(x => (x._2._1.toDouble/x._2._2,x._1)) //求電影平均分 .sortByKey(false).take(10).foreach(println) //降序排序 println("所有電影中最受女性喜愛的電影Top10:") femaleFliteredRatings.map(x=>(x._2,(x._3.toDouble,1))) //格式化成k-v值 .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)) //對 Value 進行 reduce 操作, 分別 得出 每部 電影 的 總的 評分 和 總的 點評 人數 .map(x => (x._2._1.toDouble/x._2._2,x._1)) //求電影平均分 .sortByKey(false).take(10).foreach(println) //降序排序
資料來源:https://grouplens.org/datasets/movielens/