Spark SQL電影分析案例
用Spark SQL分析熱門電影的TopN
1.資料結構
users.dat
5220::M::25::7::91436
5221::F::56::1::96734
5222::M::25::12::94501
5223::M::56::10::11361
5224::M::35::1::10016
movies.dat
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children’s
9::Sudden Death (1995)::Action
ratings.dat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
README是說明檔案,說明了欄位含意
2.TopN求值
求出TopN其實只要ratings.dat這個檔案的資料就行了。
步驟
- 求出每部電影的平均評分
- 排序
3.程式碼
package main.scala.com.hlf.project.moguyun.movies
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* 用RDD與DataFrame計算電影topN
* Created by hlf on 2016/11/1.
*/
object MoviesTopN {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("moviesRatings")
.setMaster("local")
//可調優引數
.set("spark.sql.shuffle.partitions" ,"4")//調節並行度
.set("spark.sql.files.maxPartitionBytes","256")//調節每個partition大小
.set("spark.sql.files.openCostInBytes","4")//小檔案合併
.set("spark.sql.autoBroadcastJoinThreshold","100")//小表join時小表的大小
val sc = new SparkContext(conf)
sc.setLogLevel("OFF")
val path = """D:\蘑菇雲\專案\data\moviesData\"""
//載入資料
val moviesRDD = sc.textFile(path + """movies.dat""")
val usersRDD = sc.textFile(path + """users.dat""")
val ratingsRDD = sc.textFile(path + """ratings.dat""")
/* 計算評分最高電影的TopN,思路:把所有電影的平均評分做對比,所以用到ratings.dat檔案
ratings.dat檔案有電影和評分兩項,這就要把電影,評分取出來做成一個Tuple, 因為要算平均
所以還要加一個個數成為這種格式 (moviesID, (ratings, 1))*/
//用RDD求出
val ratingsArr = ratingsRDD.map(line => line.split("""::"""))
val ratingsRow = ratingsArr.map(line => Ratings(line(0).toLong, line(1).toLong, line(2).toInt, line(3).trim))
ratingsRow.map(row => (row.movieID, (row.rating, 1)))
//.reduceByKey(((x, y), (z, x)) => (x + z, y + x))
.reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))
.map(t => (t._2._1.toFloat / t._2._2.toFloat, t._1))
.map(t => (f"${t._1}%1.2f".toFloat, t._2))//保留兩位小數,這有點繞,不過沒想到其他方法
.sortByKey(false)
.take(20).foreach(println)
/*
用DataFrame的方式,就要構建schema和Row或者用反射的方式
*/
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ratingsDF = ratingsRow.toDF
ratingsDF.registerTempTable("ratingsTable")
//用SQL求值
//val sql = "select movieID, ROUND(CAST(sum(rating) AS DOUBLE)/count('rating'), 4) as ratings from ratingsTable group by movieID order by ratings desc"
val sql = "select movieID, ROUND(avg(rating), 3) as ratings from ratingsTable group by movieID order by ratings desc"
sqlContext.sql(sql).show()
//用內建函式求值
import org.apache.spark.sql.functions._
ratingsDF.groupBy("movieID").agg(avg(ratingsDF("rating")).as("ratings")).sort($"ratings".desc).show()
/*
* 看某門電影的年齡、性別分析
* 用到Users和Ratings,通過join操作
*/
val usersDF = usersRDD.map(line => line.split("""::""")).map(line => Users(line(0).toLong, line(1).trim, line(2).toInt, line(3).trim, line(4).toLong)).toDF()
usersDF.registerTempTable("usersTable")
ratingsDF.filter(ratingsDF("movieID") === 24)//找出某電影
//.join(usersDF, ratingsDF("userID") === usersDF("userID"))
.join(usersDF, "userID")
.select(ratingsDF("movieID"),usersDF("gender"), usersDF("age"))
.show()
}
}
/**
* movies.dat檔案,可以寫成case class
* 格式如下
* MovieID(電影ID)::Title(標題)::Genres(型別)
* 1::Toy Story (1995)::Animation|Children's|Comedy
* 2::Jumanji (1995)::Adventure|Children's|Fantasy
* 3::Grumpier Old Men (1995)::Comedy|Romance
* 4::Waiting to Exhale (1995)::Comedy|Drama
*/
case class Movies(movieID: Long, title: String, genres: String)
/**
* users.dat檔案,可以寫成case class
* 格式如下
* UserID::Gender(性別)::Age::Occupation(職業)::Zip-code
* 1::F::1::10::48067
* 2::M::56::16::70072
* 3::M::25::15::55117
* 4::M::45::7::02460
*/
case class Users(userID: Long, gender: String, age: Int, occupation: String, zipCode: Long)
/**
* ratings.dat檔案,可以寫成case class
* 格式如下
* UserID::MovieID::Rating(評分)::Timestamp
* 1::1193::5::978300760
* 1::661::3::978302109
* 1::914::3::978301968
* 1::3408::4::978300275
*/
case class Ratings(userID: Long, movieID: Long, rating: Int, timestamp: String)
結果如下
(5.0,3172)
(5.0,3881)
(5.0,3656)
(5.0,3233)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,1830)
(5.0,787)
(5.0,3280)
(4.8,3245)
(4.75,53)
(4.67,2503)
(4.61,2905)
(4.56,2019)
(4.55,318)
(4.52,858)
(4.52,50)
(4.52,745)
(4.51,1148)
+-------+-------+
|movieID|ratings|
+-------+-------+
| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503| 4.667|
| 2905| 4.609|
| 2019| 4.561|
| 318| 4.555|
| 858| 4.525|
| 745| 4.521|
| 50| 4.517|
| 527| 4.51|
+-------+-------+
only showing top 20 rows
+-------+-----------------+
|movieID| ratings|
+-------+-----------------+
| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503|4.666666666666667|
| 2905|4.608695652173913|
| 2019|4.560509554140127|
| 318|4.554557700942973|
| 858|4.524966261808367|
| 745| 4.52054794520548|
| 50|4.517106001121705|
| 527|4.510416666666667|
+-------+-----------------+
only showing top 20 rows
+-------+------+---+
|movieID|gender|age|
+-------+------+---+
| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 18|
| 24| M| 25|
| 24| M| 35|
| 24| M| 25|
| 24| M| 35|
| 24| M| 18|
| 24| F| 50|
| 24| M| 25|
| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 25|
| 24| M| 50|
| 24| M| 35|
| 24| F| 25|
| 24| F| 1|
| 24| M| 25|
+-------+------+---+
only showing top 20 rows
4.優化
因為使用的是DataFrame,而Catalyst已經做過優化了,所以能優化的項並不多,主要有四個
1.並行度的優化,這要根據自己叢集的配置來調節,預設情況下是200
spark.sql.shuffle.partitions=200
2.調節每個partition大小,預設 128M,可以適當調大點
spark.sql.files.maxPartitionBytes=256
3.小檔案合併,預設是4M,可以調大點,不然每個小檔案就是一個Task
spark.sql.files.openCostInBytes=4M
4.兩個表shuffle,如join。這個最有用,經常使用的。
spark.sql.autoBroadcastJoinThreshold 預設是10M,調成100M,甚至是1G。
以上內容部分來自[DT大資料夢工廠]首席專家Spark專家王家林老師的課程分享。感謝王老師的分享,更多精彩內容請掃描關注[DT大資料夢工廠]微信公眾號DT_Spark