1. 程式人生 > >spark練習——影評案例

spark練習——影評案例

ase list highlight package 喜歡 rgs ngs man dfs

第一次寫博客,新人上路,歡迎大家多多指教!!!

---------------------------------------------------------------------分割線---------------------------------------------------------------------

現有如此三份數據:
1、users.dat 數據格式為: 2::M::56::16::70072
對應字段為:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼


2、movies.dat 數據格式為: 2::Jumanji (1995)::Adventure|Children‘s|Fantasy
對應字段為:MovieID BigInt, Title String, Genres String
對應字段中文解釋:電影 ID,電影名字,電影類型
3、ratings.dat 數據格式為: 1::1193::5::978300760
對應字段為:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳

需求:

1、求被評分次數最多的 10 部電影,並給出評分次數(電影名,評分次數)

2、分別求男性,女性當中評分最高的 10 部電影(性別,電影名,影評分)
3、分別求男性,女性看過最多的 10 部電影(性別,電影名)
4、年齡段在“18-24”的男人,最喜歡看 10 部電影
5、求 movieid = 2116 這部電影各年齡段(因為年齡就只有 7 個,就按這個 7 個分就好了)
的平均影評(年齡段,影評分)
6、求最喜歡看電影(影評次數最多)的那位女性評最高分的 10 部電影的平均影評分(觀影
者,電影名,影評分)
7、求好片(評分>=4.0)最多的那個年份的最好看的 10 部電影
8、求 1997 年上映的電影中,評分最高的 10 部 Comedy 類電影
9、該影評庫中各種類型電影中評價最高的 5 部電影(類型,電影名,平均影評分)

10、各年評分最高的電影類型(年份,類型,影評分)

先建立一個Utils類,主要用於初始化配置信息以及解析原始數據

package movie_rating

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Utils {
    //初始化SparkConf對象
    private[movie_rating] val conf = new SparkConf().setAppName("FileReview").setMaster("local")
    //初始化sc對象
    private[movie_rating]  val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    //讀取hdfs上的數據
    private[movie_rating] val movie = sc.textFile("hdfs://myha01/mydata/film_review/movies.dat")
    private[movie_rating] val ratings = sc.textFile("hdfs://myha01/mydata/film_review/ratings.dat")
    private[movie_rating] val users = sc.textFile("hdfs://myha01/mydata/film_review/users.dat")

    //將原始數據轉為RDD格式
    private[movie_rating] val movieRdd: RDD[(String, String, String)] = movie.map(_.split("::")).map(m => (m(0), m(1), m(2)))
    private[movie_rating] val ratingsRdd: RDD[(String, String, String, String)] = ratings.map(_.split("::")).map(r => (r(0), r(1), r(2), r(3)))
    private[movie_rating] val usersRdd: RDD[(String, String, String, String, String)] = users.map(_.split("::")).map(u => (u(0), u(1), u(2), u(3), u(4)))

}

第一問:

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand01 {
    /**
      * 1、求被評分次數最多的 10 部電影,並給出評分次數(電影名,評分次數)
      */
    def main(args: Array[String]): Unit = {

        //獲取電影id與對應的評分次數
        val movieID_rating: RDD[(String, Int)] = Utils.ratingsRdd.map(x => (x._2, 1))
        val movieID_times: RDD[(String, Int)] = movieID_rating.reduceByKey(_ + _).sortBy(_._2, false)
        //獲得電影id和電影名
        val movieID_name: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))
        //關聯movieID_times和movieID_name,獲得電影id,電影名,評分次數
        val result: RDD[(String, Int)] = movieID_times.join(movieID_name).sortBy(_._2._1, false).map(x => (x._2._2, x._2._1))
        //輸出結果
        result.take(10).foreach(println(_))
    }
}

第二問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand02 {
    /**
      * 2、分別求男性,女性當中評分最高的 10 部電影(性別,電影名,影評分)
      */
    def main(args: Array[String]): Unit = {
        //(userID, sex)
        val userID_sex: RDD[(String, String)] = Utils.usersRdd.map(x => (x._1, x._2))
        //(userID, (movieID, rating))
        val userID_movieID_rating: RDD[(String, (String, String))] = Utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))

        //(userID, (sex, (movieID, rating)))  ---> (sex, movieID, rating)
        val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))

        //((sex, movieID), Iterable[(sex, movieID, rating)])  ---> (movieID, (sex, avg))
        val movieID_sex_avg: RDD[(String, (String, Double))] = movieID_rating.groupBy(x => (x._1, x._2)).map(x => {
            var sum, avg = 0d
            val list: List[(String, String, String)] = x._2.toList

            if (list.size > 50) {
                list.map(x => ( sum += x._3.toInt ))
                avg = sum * 1.0 / list.size
            }
            (x._1._2, (x._1._1, avg))
        })
        //(movieID, movieName)
        val movieID_movieName: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))
        //sex_movieID_avg與movie進行關聯 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
        val sex_movieName_avg: RDD[(String, String, Double)] = movieID_sex_avg.join(movieID_movieName)
            .map(x => (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x => (x._1, x._3), false)

        sex_movieName_avg.take(10).foreach(println(_))
        sex_movieName_avg.filter(_._1 == "F").take(10).foreach(println(_))

    }
}

第三問:

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand03 {
    /**
      * 3、分別求男性,女性看過最多的 10 部電影(性別,電影名)
      */
    def main(args: Array[String]): Unit = {
        //(userID, sex)
        val userID_sex: RDD[(String, String)] = Utils.usersRdd.map(x => (x._1, x._2))
        //(userID, movieID)
        val userID_movieID: RDD[(String, String)] = Utils.ratingsRdd.map(x => (x._1, x._2))
        //(movieID, name)
        val movieID_name: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))

        //(userID, (sex, movieID))  ---> (movieID, sex)
        val movieID_sex: RDD[(String, String)] = userID_sex.join(userID_movieID).map(x => (x._2._2, x._2._1))
        //關聯movieID_sex和movieID_name    (movieID, (sex, name))  ---> (movieID, sex, name)
        val movieID_sex_name: RDD[(String, String, String)] = movieID_sex.join(movieID_name)
            .map(x => (x._1, x._2._1, x._2._2))

        //((sex, name), Iterable[(movieID, sex, name)])  ---> (sex, name, times)
        val sex_name_times: RDD[(String, String, Int)] = movieID_sex_name.groupBy(x => (x._2, x._3)).map(x => (x._1._1, x._1._2, x._2.toList.size)).sortBy(x => (x._1, x._3), false)
        //輸出結果
        sex_name_times.take(10).foreach(println(_))
        sex_name_times.filter(_._1 == "F").take(10).foreach(println(_))
    }
}

第四問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand04 {
    /**
      * 4、年齡段在“18-24”的男人,最喜歡看 10 部電影(輸出電影id和電影名字)
      */
    def main(args: Array[String]): Unit = {
        // 年齡段在“18-24”的男人的userID     (userID, (sex, age))
        val userID_sex_age: RDD[(String, (String, Int))] = Utils.usersRdd.map(x => (x._1, (x._2, x._3.toInt))).filter(x =>{
            x._2._2 >= 18 && x._2._2 <= 24 && x._2._1 == "M"
        } )

        //(userID, (movieID, rating))
        val userID_movieID_rating: RDD[(String, (String, Int))] = Utils.ratingsRdd.map(x => (x._1, (x._2, x._3.toInt)))

        //關聯userID與userID_movieID_rating    (userID, ((sex, age), (movieID, rating)))   ---> (movieID, rating)
        // --->(movieID, Iterable(movieID, rating))  ---> (movieID, avg)
        val movieID_avg : RDD[(String, Double)] = userID_sex_age.join(userID_movieID_rating).map(x => (x._2._2._1, x._2._2._2))
            .groupByKey().map(x => {
            var avg = 0d
            val len: Int = x._2.size
            if (len > 50){
                avg = 1.0 * x._2.sum / len
            }
            (x._1, avg)
        })

        //(movieID, name)
        val movieID_name: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))

        //關聯movieID_avg與movieID_name    (movieID, (avg, name))
        val name_avg: RDD[(String, Double)] = movieID_avg.join(movieID_name).map(x => (x._2._2, x._2._1)).sortBy(_._2, false)

        //輸出結果
        name_avg.take(10).foreach(println(_))
    }
}

第五問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand05 {
    /**
      * 5、求 movieid = 2116 這部電影各年齡段(因為年齡就只有 7 個,就按這個 7 個分就好了)
      * 的平均影評(年齡段,影評分)
      */
    def main(args: Array[String]): Unit = {
        // 獲得movieID = 2116  (userID, rating)
        val userID_rating: RDD[(String, Int)] = Utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toInt))
        //(userID, age)
        val userID_age: RDD[(String, String)] = Utils.usersRdd.map(x => (x._1, x._3))
        //關聯userID_age和userID_rating   (userID, (age, rating)) --->(age, rating)  ---> (age, Iterable(rating))
        val age_avg: RDD[(String, Double)] = userID_age.join(userID_rating).map(x => (x._2._1, x._2._2)).groupByKey()
            .map(x => (x._1, x._2.sum * 1.0 / x._2.size))

        //輸出結果
        age_avg.sortByKey().foreach(println(_))
    }
}

第六問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand06 {
    /**
      * 6、求最喜歡看電影(影評次數最多)的那位女性評最高分的 10 部電影的平均影評分
      * (觀影者userID,電影名,影評分)
      */
    def main(args: Array[String]): Unit = {
        //(userID, Iterable(userID, movieID, rating, time_stamp))  ---> (userID, times)
        val userID_times: RDD[(String, Int)] = Utils.ratingsRdd.groupBy(_._1).map(x => (x._1, x._2.size))

        //(userID, (sex, times))找到最喜歡看電影(影評次數最多)的那位女性的userID
        val userID: String = Utils.usersRdd.map(x => (x._1, x._2)).join(userID_times).filter(_._2._1 == "F")
            .sortBy(_._2._2, false).map(_._1).first()

        //獲得userID用戶評分最高的10部電影的movieID
        val movieID: Array[(String, Int)] = Utils.ratingsRdd.filter(_._1 == userID).map(x => (x._2, x._3.toInt))
            .sortBy(_._2, false).take(10)

        //獲得該10部電影的平均影評分
        val movieID_rating: RDD[(String, String)] = Utils.ratingsRdd.map(x => (x._2, x._3))

        //關聯movieID和movieID_rating   (movieID, (rat1, rating))  ---> (movieID, Iterable(rating))  --> (movieID, avg)
        val movieID_avg = Utils.sc.makeRDD(movieID).join(movieID_rating).map(x => (x._1, x._2._2.toInt))
            .groupByKey().map(x => {
            var avg = 0d
            if (x._2.size >= 50) {
                avg = x._2.sum * 1.0 / x._2.size
            }
            (x._1, avg)
        })

        //(movieID, (name, avg))   ---> (UserID, name, avg)
        val userID_name_avg: RDD[(String, String, Double)] = Utils.movieRdd.map(x => (x._1, x._2))
            .join(movieID_avg).map(x => (userID, x._2._1, x._2._2)).sortBy(_._3, false)

        userID_name_avg.foreach(println(_))
    }
}

第七問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand07 {
    /**
      * 7、求好片(評分>=4.0)最多的那個年份的最好看的 10 部電影(電影id, 電影名,平均評分)
      */
    def main(args: Array[String]): Unit = {
        //1、找到所有的好片的movieID
        //(movieID, rating) ---> (movieID, Iterable(rating))  ---> (movieID, avg)(avg >= 4.0)
        val movieID_avg :RDD[(String, Double)]= Utils.ratingsRdd.map(x => (x._2, x._3.toInt)).groupByKey().map(x =>{
            var avg = 0d
            if(x._2.size >= 50)
                avg = x._2.sum * 1.0 / x._2.size
            (x._1, avg)
        }).filter(_._2 >= 4.0)

        //(movieID, (name, year))
        val movieID_name_year: RDD[(String, (String, String))] = Utils.movieRdd.map(x => (x._1, (x._2, x._2.substring(x._2.length - 5, x._2.length - 1))))

        //2、找到好片最多的年代
        //關聯movieID_avg與movieID_name_year,(movieID, (avg, (name, year)))   --> (year, Iterable(movieID))
        val year_count: (String, Int) = movieID_avg.join(movieID_name_year).map(x => (x._2._2._2, x._1))
            .groupByKey().map(x => (x._1, x._2.size)).sortBy(_._2, false).first()

        //3、找到該年最好看的10部電影
        //(movieID, name) ---> (movieID, (name, avg))  ---> (movieID, name, avg)
        val movieID_name_avg = movieID_name_year.filter(_._2._2 == year_count._1).map( x => (x._1, x._2._1))
            .join(movieID_avg).map(x => (x._1, x._2._1, x._2._2)).sortBy(_._3, false).take(10)

        //輸出結果
        movieID_name_avg.foreach(println(_))

    }
}

第八問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand08 {
    /**
      * 8、求 1997 年上映的電影中,評分最高的 10 部 Comedy 類電影(電影id,電影名字,類型,平均評分)
      */
    def main(args: Array[String]): Unit = {
        //(movieID, (name, year, type))
        val movieID_name_year_type: RDD[(String, (String, String, String))] = Utils.movieRdd
            .map(x => (x._1, (x._2, x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))

        //找到所有1997年的comedy類型的電影 (movieID, (name, 1997, comedy))
        val movieID_name_1997_comedy: RDD[(String, (String, String, String))] = movieID_name_year_type.filter(x => {x._2._2 == "1997" && x._2._3.toLowerCase.contains("comedy")} )

        //(movieID, (rating, (name, 1997, comedy)))  ---> (movieID, (name, comedy, rating))
        val movieID_name_comedy_rating: RDD[(String, (String, String, String))] = Utils.ratingsRdd.map(x => (x._2, x._3))
            .join(movieID_name_1997_comedy).map(x => (x._1, (x._2._2._1, x._2._2._3, x._2._1)))


        //(movieID, Iterable(rating))  ---> (movieID, avg)
        val movieID_avg: Array[(String, Double)] = movieID_name_comedy_rating.map(x => (x._1, x._2._3.toInt))
            .groupByKey().map(x => {
            var avg = 0d
            if (x._2.size >= 50)
                avg = x._2.sum * 1.0 / x._2.size
            (x._1, avg)
        }).distinct().sortBy(_._2, false).take(10)

        //(movieID, (avg, (name, comedy, rating)))  ---> (movieID, name, comedy, avg)
        val movieID_name_comedy_avg: RDD[(String, String, String, Double)] = Utils.sc.makeRDD(movieID_avg)
            .join(movieID_name_comedy_rating).map(x => (x._1, x._2._2._1, x._2._2._2, x._2._1)).distinct().sortBy(_._4, false)

        //輸出結果
        movieID_name_comedy_avg.foreach(println(_))
    }
}

第九問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand09 {
    /**
      * 9、該影評庫中各種類型電影中評價最高的 5 部電影(類型,電影名,平均影評分)
      */
    def main(args: Array[String]): Unit = {

        //獲得所有電影的movieID,name,types      (movieID, (name, types))
        val movieID_name_types: RDD[(String, (String, String))] = Utils.movieRdd.map(x => (x._1, (x._2, x._3)))

        //獲得所有的movieID,rating (movieID, rating)
        val movieID_rating: RDD[(String, String)] = Utils.ratingsRdd.map(x => (x._2, x._3))

        //關聯movieID_name_types與movieID_rating   (movieID, ((name, types), rating))  ---> (types, name, rating)
        val types_name_rating: RDD[((String, String), Int)] = movieID_name_types.join(movieID_rating)
            .map(x => ((x._2._1._2, x._2._1._1), x._2._2.toInt))

        //((types, name), Iterable(rating))  ---> (types, name, avg)
        val types_name_avg: RDD[(String, String, Double)] = types_name_rating.groupByKey().map(x => {
            var avg = 0d
            if (x._2.size >= 50)
                avg = x._2.sum * 1.0 / x._2.size
            (x._1._1, x._1._2, avg)
        })

        //(types, name, avg)     劃分types:將Action|Adventure|Comedy|Sci-Fi拆開
        var tempArray: Array[(String, String, Double)] = Array(("", "", 0d))

        types_name_avg.collect().foreach(x => {
            //Action|Adventure|Comedy|Sci-Fi   ---> Arrays(Action, Adventure, Comedy, Sci-Fi)
            val types: Array[String] = x._1.split("\\|")
            //將所有的types_name_avg中的元素拆分後存於tempArray數組中
            tempArray = types.map((_, x._2, x._3)).union(tempArray)
        })

        //(type, name, avg)  包含所有類型電影的排序
        val type_name_avg = Utils.sc.makeRDD(tempArray).filter(_._3 > 0).sortBy(x => (x._1, x._3), false)

        //(type, Iterable(type, name, avg))  打印前五
        type_name_avg.groupBy(_._1).sortByKey().foreach(x => {
            var count = 0
            val list: List[(String, String, Double)] = x._2.toList
            while(count < list.size  && count < 5){
                println(list(count))
                count += 1
            }
            println()
        })
    }
}

第十問

package movie_rating

import org.apache.spark.rdd.RDD

/**
  * Utils.usersRdd:對應字段中文解釋:用戶 id,性別,年齡,職業,郵政編碼
  * Utils.movieRdd:對應字段中文解釋:電影 ID,電影名字,電影類型
  * Utils.ratingsRdd:對應字段中文解釋:用戶 ID,電影 ID,評分,評分時間戳
  */
object Demand10 {
    /**
      * 10、各年評分最高的電影類型(年份,類型,影評分)
      */
    def main(args: Array[String]): Unit = {
        //(movieID, year)
        val movieID_year: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, (x._2.substring(x._2.length - 5, x._2.length - 1))))

        //(movieID, rating)  ---> (movieID, Iterable(rating)) ---> (movieID, avg)
        val moviID_avg: RDD[(String, Double)] = Utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).groupByKey()
            .map(x => (x._1, x._2.sum / x._2.size))

        //關聯movieID_year和moviID_avg   (movieID, (year, avg)) ---> (year, (movieID, avg))
        val year_mocvieID_avg: RDD[(String, (String, Double))] = movieID_year.join(moviID_avg)
            .distinct().map(x => (x._2._1, (x._1, x._2._2)))

        //(year, (movieID, avg))  ---> (year, Iterable((movieID, avg)))  ---> (movieID, (year, topavg))
        val year_movieID_topavg: RDD[(String, (String, Double))] = year_mocvieID_avg.groupByKey().map(x => {
            val list: List[(String, Double)] = x._2.toList.sortBy(-_._2)
            (list(0)._1, (x._1, list(0)._2))
        })

        //(movieID, (type, (year, topavg)) ---> (year, type, topavg)
        val year_type_topavg: RDD[(String, String, Double)] = Utils.movieRdd.map(x => (x._1, x._3))
            .join(year_movieID_topavg).map(x => (x._2._2._1, x._2._1, x._2._2._2)).sortBy(_._1, false)

        //輸出結果
        year_type_topavg.foreach(println(_))
    }
}

spark練習——影評案例