1. 程式人生 > >Spark SQL電影分析案例

Spark SQL電影分析案例

用Spark SQL分析熱門電影的TopN

1.資料結構

users.dat
5220::M::25::7::91436
5221::F::56::1::96734
5222::M::25::12::94501
5223::M::56::10::11361
5224::M::35::1::10016

movies.dat
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children’s
9::Sudden Death (1995)::Action

ratings.dat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291

README是說明檔案,說明了欄位含意

2.TopN求值

求出TopN其實只要ratings.dat這個檔案的資料就行了。
步驟

  1. 求出每部電影的平均評分
  2. 排序

3.程式碼

package main.scala.com.hlf.project.moguyun.movies

import org.apache.spark.sql.SQLContext
import
org.apache.spark.{SparkConf, SparkContext} /** * 用RDD與DataFrame計算電影topN * Created by hlf on 2016/11/1. */ object MoviesTopN { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("moviesRatings") .setMaster("local") //可調優引數 .set("spark.sql.shuffle.partitions"
,"4")//調節並行度 .set("spark.sql.files.maxPartitionBytes","256")//調節每個partition大小 .set("spark.sql.files.openCostInBytes","4")//小檔案合併 .set("spark.sql.autoBroadcastJoinThreshold","100")//小表join時小表的大小 val sc = new SparkContext(conf) sc.setLogLevel("OFF") val path = """D:\蘑菇雲\專案\data\moviesData\""" //載入資料 val moviesRDD = sc.textFile(path + """movies.dat""") val usersRDD = sc.textFile(path + """users.dat""") val ratingsRDD = sc.textFile(path + """ratings.dat""") /* 計算評分最高電影的TopN,思路:把所有電影的平均評分做對比,所以用到ratings.dat檔案 ratings.dat檔案有電影和評分兩項,這就要把電影,評分取出來做成一個Tuple, 因為要算平均 所以還要加一個個數成為這種格式 (moviesID, (ratings, 1))*/ //用RDD求出 val ratingsArr = ratingsRDD.map(line => line.split("""::""")) val ratingsRow = ratingsArr.map(line => Ratings(line(0).toLong, line(1).toLong, line(2).toInt, line(3).trim)) ratingsRow.map(row => (row.movieID, (row.rating, 1))) //.reduceByKey(((x, y), (z, x)) => (x + z, y + x)) .reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)) .map(t => (t._2._1.toFloat / t._2._2.toFloat, t._1)) .map(t => (f"${t._1}%1.2f".toFloat, t._2))//保留兩位小數,這有點繞,不過沒想到其他方法 .sortByKey(false) .take(20).foreach(println) /* 用DataFrame的方式,就要構建schema和Row或者用反射的方式 */ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val ratingsDF = ratingsRow.toDF ratingsDF.registerTempTable("ratingsTable") //用SQL求值 //val sql = "select movieID, ROUND(CAST(sum(rating) AS DOUBLE)/count('rating'), 4) as ratings from ratingsTable group by movieID order by ratings desc" val sql = "select movieID, ROUND(avg(rating), 3) as ratings from ratingsTable group by movieID order by ratings desc" sqlContext.sql(sql).show() //用內建函式求值 import org.apache.spark.sql.functions._ ratingsDF.groupBy("movieID").agg(avg(ratingsDF("rating")).as("ratings")).sort($"ratings".desc).show() /* * 看某門電影的年齡、性別分析 * 用到Users和Ratings,通過join操作 */ val usersDF = usersRDD.map(line => line.split("""::""")).map(line => Users(line(0).toLong, line(1).trim, line(2).toInt, line(3).trim, line(4).toLong)).toDF() usersDF.registerTempTable("usersTable") ratingsDF.filter(ratingsDF("movieID") === 24)//找出某電影 //.join(usersDF, ratingsDF("userID") === usersDF("userID")) .join(usersDF, "userID") .select(ratingsDF("movieID"),usersDF("gender"), usersDF("age")) .show() } } /** * movies.dat檔案,可以寫成case class * 格式如下 * MovieID(電影ID)::Title(標題)::Genres(型別) * 1::Toy Story (1995)::Animation|Children's|Comedy * 2::Jumanji (1995)::Adventure|Children's|Fantasy * 3::Grumpier Old Men (1995)::Comedy|Romance * 4::Waiting to Exhale (1995)::Comedy|Drama */ case class Movies(movieID: Long, title: String, genres: String) /** * users.dat檔案,可以寫成case class * 格式如下 * UserID::Gender(性別)::Age::Occupation(職業)::Zip-code * 1::F::1::10::48067 * 2::M::56::16::70072 * 3::M::25::15::55117 * 4::M::45::7::02460 */ case class Users(userID: Long, gender: String, age: Int, occupation: String, zipCode: Long) /** * ratings.dat檔案,可以寫成case class * 格式如下 * UserID::MovieID::Rating(評分)::Timestamp * 1::1193::5::978300760 * 1::661::3::978302109 * 1::914::3::978301968 * 1::3408::4::978300275 */ case class Ratings(userID: Long, movieID: Long, rating: Int, timestamp: String)

結果如下

(5.0,3172)
(5.0,3881)
(5.0,3656)
(5.0,3233)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,1830)
(5.0,787)
(5.0,3280)
(4.8,3245)
(4.75,53)
(4.67,2503)
(4.61,2905)
(4.56,2019)
(4.55,318)
(4.52,858)
(4.52,50)
(4.52,745)
(4.51,1148)
+-------+-------+
|movieID|ratings|
+-------+-------+
|   1830|    5.0|
|    787|    5.0|
|   3233|    5.0|
|   3382|    5.0|
|   3172|    5.0|
|   3607|    5.0|
|   3656|    5.0|
|   3881|    5.0|
|   3280|    5.0|
|    989|    5.0|
|   3245|    4.8|
|     53|   4.75|
|   2503|  4.667|
|   2905|  4.609|
|   2019|  4.561|
|    318|  4.555|
|    858|  4.525|
|    745|  4.521|
|     50|  4.517|
|    527|   4.51|
+-------+-------+
only showing top 20 rows

+-------+-----------------+
|movieID|          ratings|
+-------+-----------------+
|   1830|              5.0|
|    787|              5.0|
|   3233|              5.0|
|   3382|              5.0|
|   3172|              5.0|
|   3607|              5.0|
|   3656|              5.0|
|   3881|              5.0|
|   3280|              5.0|
|    989|              5.0|
|   3245|              4.8|
|     53|             4.75|
|   2503|4.666666666666667|
|   2905|4.608695652173913|
|   2019|4.560509554140127|
|    318|4.554557700942973|
|    858|4.524966261808367|
|    745| 4.52054794520548|
|     50|4.517106001121705|
|    527|4.510416666666667|
+-------+-----------------+
only showing top 20 rows

+-------+------+---+
|movieID|gender|age|
+-------+------+---+
|     24|     M| 18|
|     24|     M| 18|
|     24|     F| 25|
|     24|     M| 18|
|     24|     M| 25|
|     24|     M| 35|
|     24|     M| 25|
|     24|     M| 35|
|     24|     M| 18|
|     24|     F| 50|
|     24|     M| 25|
|     24|     M| 18|
|     24|     M| 18|
|     24|     F| 25|
|     24|     M| 25|
|     24|     M| 50|
|     24|     M| 35|
|     24|     F| 25|
|     24|     F|  1|
|     24|     M| 25|
+-------+------+---+
only showing top 20 rows

4.優化

因為使用的是DataFrame,而Catalyst已經做過優化了,所以能優化的項並不多,主要有四個
1.並行度的優化,這要根據自己叢集的配置來調節,預設情況下是200
spark.sql.shuffle.partitions=200
2.調節每個partition大小,預設 128M,可以適當調大點
spark.sql.files.maxPartitionBytes=256
3.小檔案合併,預設是4M,可以調大點,不然每個小檔案就是一個Task
spark.sql.files.openCostInBytes=4M
4.兩個表shuffle,如join。這個最有用,經常使用的。
spark.sql.autoBroadcastJoinThreshold 預設是10M,調成100M,甚至是1G。

以上內容部分來自[DT大資料夢工廠]首席專家Spark專家王家林老師的課程分享。感謝王老師的分享,更多精彩內容請掃描關注[DT大資料夢工廠]微信公眾號DT_Spark