1. 程式人生 > >推薦系統-01-電影推薦與結果評估

推薦系統-01-電影推薦與結果評估

order by eid ota class span pro ram 數據化 直接

import spark.sql
import org.apache.spark.sql.types._
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// 數據預處理
case class Movie(movieId:Int, title:String, genres:Seq[String])
case
class User(userId:Int, gender:String, age:Int, occupation:Int, zip:String) def parseMovie(str:String):Movie={ val fields = str.split("::") assert(fields.size == 3) Movie(fields(0).toInt, fields(1).toString, Seq(fields(2))) } def parseUser(str:String):User={ val fields = str.split
("::") assert(fields.size == 5) User(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toInt, fields(4).toString) } def parseRating(str:String):Rating={ val fields = str.split("::") assert(fields.size == 4) Rating(fields(0).toInt, fields(1).toInt
, fields(2).toInt) } // 加載評分文件到RDD, 這個也可以是HADOOP源 val ratingText = sc.textFile("file:/home/hadoop/ml-1m/ratings.dat"); ratingText.first() // 對原始RDD數據, 進行轉換處理,並緩存 val ratingRDD = ratingText.map(parseRating).cache() // 下面是打印查看一下相關信息 println("Total number of ratings : " + ratingRDD.count()) println("Total number of movies rated : " + ratingRDD.map(_.product).distinct().count()) println("Total number of users who rated moives:" + ratingRDD.map(_.user).distinct().count()) // 將RDD轉換成為DataFrame val ratingDF = ratingRDD.toDF(); // 同理,加載電影信息 val movieDF=sc.textFile("file:/home/hadoop/ml-1m/movies.dat").map(parseMovie).toDF(); // 同理,加載用戶信息 val userDF=sc.textFile("file:/home/hadoop/ml-1m/users.dat").map(parseUser).toDF(); ratingDF.printSchema() movieDF.printSchema() userDF.printSchema() // 將DataFrame數據註冊臨時表, 就可以臨時表進行SQL操作 ratingDF.registerTempTable("ratings") movieDF.registerTempTable("movies") userDF.registerTempTable("users") // SQL操作DataFrame數據後,返回DataFrame數據 val result = sql("""select title, rmax, rmin, ucnt from (select product, max(rating) as rmax, min(rating) as rmin, count(distinct user) as ucnt from ratings group by product) ratingsCNT join movies on product=movieId order by ucnt desc""") result.show() // SQL操作DataFrame數據後,返回DataFrame數據 val mostActiveUser=sql("""select user, count(*) as cnt from ratings group by user order by cnt desc limit 10 """) mostActiveUser.show() // SQL操作DataFrame數據後,返回DataFrame數據 var result = sql("""select title from ratings join movies on movieId=product where user=4169 and rating>4""") result.show() // ALS(交替最小二乘法)算法處理 // 將評分RDD數據化分成訓練集與測試集 val split=ratingRDD.randomSplit(Array(0.8,0.2), 0L) val trainingSet=split(0).cache() val testSet=split(1).cache() trainingSet.count() testSet.count() // 這裏的RANK是UV間的feature秩, 訓練得出模型 val model = (new ALS().setRank(20).setIterations(10).run(trainingSet)) // Array[Rating], 這裏註意DF,沒有直接的map操作 // 利用模型進行電影推薦 val recomForTopUser=model.recommendProducts(4169,5) val movieTitle = movieDF.rdd.map(array=>(array(0),array(1))).collectAsMap(); val recomResult=recomForTopUser.map(rating=>(movieTitle(rating.product), rating.rating)).foreach(println) // 這裏MAP運算, 類匹配 val testUserProduct=testSet.map{ case Rating(user,product,rating) => (user,product) } // 對測試集進行預測 val testUserProductPredict=model.predict(testUserProduct) testUserProductPredict.take(10).mkString("\n") val testSetPair=testSet.map{ case Rating(user,product,rating) => ((user,product), rating) } val predictionPair=testUserProductPredict.map{ case Rating(user,product,rating) => ((user,product), rating) } // 將測試集的預測評分與測試集給定的評分相減, 統計得出平均錯誤mae val joinTestPredict=testSetPair.join(predictionPair) val mae=joinTestPredict.map{ case ((user,product),(ratingT,ratingP)) => val err=ratingT-ratingP Math.abs(err) }.mean() //FP, 過濾一下低分和高分 val fp = joinTestPredict.filter{ case ((user,product),(ratingT,ratingP)) => (ratingT <= 1 & ratingP >=4) } fp.count() import org.apache.spark.mllib.evaluation._ val ratingTP=joinTestPredict.map{ case ((user,product),(ratingT,ratingP))=> (ratingP,ratingT) } // 現測試一下平均絕對誤差 val evaluator = new RegressionMetrics(ratingTP) evaluator.meanAbsoluteError

推薦系統-01-電影推薦與結果評估