1. 程式人生 > 其它 >grouplens上的movielens資料集_大資料基礎【Task7】實踐

grouplens上的movielens資料集_大資料基礎【Task7】實踐

技術標籤:grouplens上的movielens資料集

  1. 計算每個content的CTR。

資料集下載:連結:https://pan.baidu.com/s/1YDvBWp35xKLg5zsysEjDGA 提取碼:rpgs

2.【選做】 使用Spark實現ALS矩陣分解演算法

movielen 資料集:http://files.grouplens.org/datasets/movielens/ml-100k.zip

基於ALS矩陣分解演算法的Spark推薦引擎實現

3.使用Spark分析Amazon DataSet(實現 Spark LR、Spark TFIDF)

資料集:http://jmcauley.ucsd.edu/data/amazon/

preprocess

* Spark LR

* Spark TFIDF

1.計算每個content的CTR。

CTRClick-Through-Rate)即點選通過率,是網際網路廣告常用的術語,指網路廣告(圖片廣告/文字廣告/關鍵詞廣告/排名廣告/視訊廣告等)的點選到達率,即該廣告的實際點選次數(嚴格的來說,可以是到達目標頁面的數量)除以廣告的展現量(Show content)。CTR是衡量網際網路廣告效果的一項重要指標。

程式碼(用純python寫的,這裡沒用到spark的內容):

import pandas as pd
df = pd.read_csv("file:///usr/local/lily/task7/content_list_id.txt", sep="t")
df['content_list_len']=df['content_list'].apply(lambda x:len(x.split(',')))
df['content_id_len']=df['content_id'].apply(lambda x:len(x.split(',')))
df['ctr'] = df['content_id_len'] / df['content_list_len']

2.【選做】 使用Spark實現ALS矩陣分解演算法

第一步:提取有效特徵

1. 首先,啟動spark-shell。

2. 載入使用者對影片的評級資料:

這裡的file://代表是本地資料,如果不加這個字首就會從hdfs上讀取資料。

// 載入評級資料
scala> val rawData = sc.textFile("file:///usr/local/lily/task4/u.data")
// 展示一條記錄
scala> rawData.first()

res1: String = 196	242	3	881250949

3. 切分記錄並返回新的RDD:

這裡取了前三個欄位,即user id、item id、rating。

// 格式化資料集
scala> val rawRatings = rawData.map(_.split("t").take(3))
// 展示一條記錄
scala> rawRatings.first()

res2: Array[String] = Array(196, 242, 3)

4. 接下來需要將評分矩陣RDD轉化為Rating格式的RDD:

// 匯入rating類
scala> import org.apache.spark.mllib.recommendation.Rating
// 將評分矩陣RDD中每行記錄轉換為Rating型別
scala> val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }

scala> ratings.first()

res3: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

這是因為MLlib的ALS推薦系統演算法包只支援Rating格式的資料集。

第二步:訓練推薦模型

接下來可以進行ALS推薦系統模型訓練了。MLlib中的ALS演算法接收三個引數:

- rank:對應的是隱因子的個數,這個值設定越高越準,但是也會產生更多的計算量。一般將這個值設定為10-200;
- iterations:對應迭代次數,一般設定個10就夠了;
- lambda:該引數控制正則化過程,其值越高,正則化程度就越深。一般設定為0.01。

1. 首先,執行以下程式碼,啟動ALS訓練:

// 匯入ALS推薦系統演算法包
scala> import org.apache.spark.mllib.recommendation.ALS
// 啟動ALS矩陣分解
scala> val model = ALS.train(ratings, 50, 10, 0.01)

這步將會使用ALS矩陣分解演算法,對評分矩陣進行分解,且隱特徵個數設定為50,迭代10次,正則化引數設為了0.01。

相對其他步驟,訓練耗費的時間最多。

2. 返回型別為MatrixFactorizationModel物件,它將結果分別儲存到兩個(id,factor)RDD裡面,分別名為userFeatures和productFeatures。

也就是評分矩陣分解後的兩個子矩陣:

scala> model.userFeatures.first

res4: (Int, Array[Double]) = (1,Array(0.49355196952819824, -0.22464631497859955, 0.26314327120780945, -0.5041911005973816, 
-0.17332540452480316, -0.09947088360786438, -0.2455645054578781, -0.21306976675987244, 0.07296579331159592, -0.315679132938385, 
0.26139432191848755, 0.5458331108093262, 0.38469621539115906, 0.11748316884040833, -0.15589895844459534, 0.3157919943332672, 
-0.2610902488231659, -0.356866717338562, -0.3456942141056061, 0.5182305574417114, 0.008591878227889538, -0.07697590440511703, 
0.06339816749095917, -0.13890786468982697, 0.2576437294483185, -0.01784001663327217, 0.3721158802509308, -0.08425401151180267, 
-0.7575527429580688, -0.2158384621143341, 0.4707823097705841, 0.2881649434566498, -0.16873227059841156, -0.34343791007995605, 
0.1242368221282959, -0.19271430373191833, ...

上面展示了id為4的使用者的“隱因子向量”。請注意ALS實現的操作都是延遲性的轉換操作。

第三步:使用ALS推薦模型

1. 預測使用者789對物品123的評分:

scala> val predictedRating = model.predict(789,123)

predictedRating: Double = 3.3851580857036034

2. 為使用者789推薦前10個物品:

scala> val userId = 789
userId: Int = 789

scala> val K = 10
K: Int = 10

// 獲取推薦列表
scala> val topKRecs = model.recommendProducts(userId, K)

topKRecs: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(789,56,5.721679562476089), 
Rating(789,134,5.678340203670223), Rating(789,340,5.503186535270071), Rating(789,663,5.402873018755409), 
Rating(789,346,5.268079473217043), Rating(789,246,5.258068771694328), Rating(789,298,5.174696541949913), 
Rating(789,347,5.099838202988733), Rating(789,654,5.030737723453546), Rating(789,211,5.019128792654539))

// 列印推薦列表
scala> println(topKRecs.mkString("n"))

Rating(789,56,5.721679562476089)
Rating(789,134,5.678340203670223)
Rating(789,340,5.503186535270071)
Rating(789,663,5.402873018755409)
Rating(789,346,5.268079473217043)
Rating(789,246,5.258068771694328)
Rating(789,298,5.174696541949913)
Rating(789,347,5.099838202988733)
Rating(789,654,5.030737723453546)
Rating(789,211,5.019128792654539)

3. 初步檢驗推薦效果

獲取到各個使用者的推薦列表後,想必大家都想先看看使用者評分最高的電影,和給他推薦的電影是不是有相似。

3.1 建立電影id - 電影名字典:

// 匯入電影資料集
scala> val movies = sc.textFile("File:///usr/local/lily/task4/u.item")
// 建立電影id - 電影名字典
scala> val titles = movies.map(line => line.split("|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()

titles: scala.collection.Map[Int,String] = Map(137 -> Big Night (1996), 891 -> Bent (1997), 
550 -> Die Hard: With a Vengeance (1995), 1205 -> Secret Agent, The (1996), 146 -> Unhook the Stars (1996), 
864 -> My Fellow Americans (1996), 559 -> Interview with the Vampire (1994), 218 -> Cape Fear (1991), 568 -> Speed (1994), 
227 -> Star Trek VI: The Undiscovered Country (1991), 765 -> Boomerang (1992), 1115 -> Twelfth Night (1996), 
774 -> Prophecy, The (1995), 433 -> Heathers (1989), 92 -> True Romance (1993), 1528 -> Nowhere (1997), 
846 -> To Gillian on Her 37th Birthday (1996), 1187 -> Switchblade Sisters (1975), 
1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 442 -> Amityville Curse, The (1990), 
1160 -> Love! Valour! Compassion! (1997), 101 -> Heavy Metal (1981), 1196 -> Sa...

// 檢視id為123的電影名
scala> titles(123)

res6: String = Frighteners, The (1996)

這樣後面就可以根據電影的id找到電影名了。

3.2 獲取某使用者的所有觀影記錄並列印:

// 建立使用者名稱-其他RDD,並僅獲取使用者789的記錄
scala> val moviesForUser = ratings.keyBy(_.user).lookup(789)

moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), 
Rating(789,475,5.0), Rating(789,93,4.0), Rating(789,1161,3.0), Rating(789,286,1.0), Rating(789,293,4.0), Rating(789,9,5.0), 
Rating(789,50,5.0), Rating(789,294,3.0), Rating(789,181,4.0), Rating(789,1,3.0), Rating(789,1008,4.0), Rating(789,508,4.0), 
Rating(789,284,3.0), Rating(789,1017,3.0), Rating(789,137,2.0), Rating(789,111,3.0), Rating(789,742,3.0), Rating(789,248,3.0), 
Rating(789,249,3.0), Rating(789,1007,4.0), Rating(789,591,3.0), Rating(789,150,5.0), Rating(789,276,5.0), Rating(789,151,2.0), 
Rating(789,129,5.0), Rating(789,100,5.0), Rating(789,741,5.0), Rating(789,288,3.0), Rating(789,762,3.0), Rating(789,628,3.0), 
Rating(789,124,4.0))

// 獲取使用者評分最高的10部電影,並列印電影名和評分值
scala> moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)

3.3 獲取某使用者推薦列表並列印:

scala> val topKRecs = model.recommendProducts(789, 10)
scala> topKRecs.map(rating => (titles(rating.product),rating.rating)).foreach(println)

(Manhattan (1979),5.832914304933656)
(Duck Soup (1933),5.570379610662419)
(Pulp Fiction (1994),5.504039044943547)
(Big Sleep, The (1946),5.483157516029839)
(Citizen Kane (1941),5.343982611556479)
(Maltese Falcon, The (1941),5.334708918288964)
(L.A. Confidential (1997),5.286353962843096)
(Harold and Maude (1971),5.244546353446239)
(Paths of Glory (1957),5.235629127626481)
(Touch of Evil (1958),5.180871609876424)

讀者可以自行對比這兩組列表是否有相似性。

第四步:物品推薦

很多時候還有另一種需求:就是給定一個物品,找到它的所有相似物品。

遺憾的是MLlib裡面竟然沒有包含內建的函式,需要自己用jblas庫來實現 = =#。

1. 匯入jblas庫矩陣類,並建立一個餘弦相似度計量函式:

由於jblas庫沒有,先要下載該庫,放到spark/jars目錄下,下載地址:

http://pan.baidu.com/s/1o8w6Wem

參考:https://blog.csdn.net/chun19920827/article/details/74332178

// 匯入jblas庫中的矩陣類
import org.jblas.DoubleMatrix
// 定義相似度函式
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

2. 接下來獲取物品(本例以物品567為例)的因子特徵向量,並將它轉換為jblas的矩陣格式:

// 選定id為567的電影
scala> val itemId = 567
// 獲取該物品的隱因子向量
scala> val itemFactor = model.productFeatures.lookup(itemId).head
// 將該向量轉換為jblas矩陣型別
scala> val itemVector = new DoubleMatrix(itemFactor)

3. 計算物品567和所有其他物品的相似度:

// 計算電影567與其他電影的相似度
scala> val sims = model.productFeatures.map{ case (id, factor) => 
    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)
    (id, sim)
}
// 獲取與電影567最相似的10部電影
scala> val sortedSims = sims.top(10)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
// 列印結果
scala> println(sortedSims.mkString("n"))

(567,1.0)
(219,0.7708029243146022)
(1376,0.7245712144554307)
(195,0.7213879610509834)
(413,0.7161206343271771)
(184,0.7119537102065846)
(181,0.7074297269599691)
(250,0.7070691619306613)
(825,0.7050879435249158)
(670,0.7036005054982796)

其中1.0當然就是自己跟自己的相似度了。

4. 檢視推薦結果:

// 列印電影567的影片名
scala> println(titles(567))
Wes Craven's New Nightmare (1994)

// 獲取和電影567最相似的11部電影(含567自己)
scala> val K=10
K: Int = 10

scala> val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
sortedSims2: Array[(Int, Double)] = Array((567,1.0), (219,0.7708029243146022), (1376,0.7245712144554307), (195,0.7213879610509834), 
(413,0.7161206343271771), (184,0.7119537102065846), (181,0.7074297269599691), (250,0.7070691619306613), (825,0.7050879435249158), 
(670,0.7036005054982796), (76,0.7033096049883402))

// 再列印和電影567最相似的10部電影
scala> sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("n")
res2: String =
(Nightmare on Elm Street, A (1984),0.7708029243146022)
(Meet Wally Sparks (1997),0.7245712144554307)
(Terminator, The (1984),0.7213879610509834)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.7161206343271771)
(Army of Darkness (1993),0.7119537102065846)
(Return of the Jedi (1983),0.7074297269599691)
(Fifth Element, The (1997),0.7070691619306613)
(Arrival, The (1996),0.7050879435249158)
(Body Snatchers (1993),0.7036005054982796)
(Carlito's Way (1993),0.7033096049883402)

看看,這些電影是不是和567相似?

第五步:推薦效果評估

在Spark的ALS推薦系統中,最常用到的兩個推薦指標分別為MSE和MAPK。其中MSE就是均方誤差,是基於評分矩陣的推薦系統的必用指標。那麼MAPK又是什麼呢?

它稱為K值平均準確率,最多用於TopN推薦中,它表示資料集範圍內K個推薦物品與實際使用者購買物品的吻合度。具體公式請讀者自行參考有關文件。

本文推薦系統就是一個[基於使用者-物品評分矩陣的TopN推薦系統],下面步驟分別用來獲取本文推薦系統中的這兩個指標。

PS:記得先要匯入jblas庫。

1. 首先計算MSE和RMSE:

// 建立使用者id-影片id RDD
val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
// 建立(使用者id,影片id) - 預測評分RDD
val predictions = model.predict(usersProducts).map{
    case Rating(user, product, rating) => ((user, product), rating)
}
// 建立使用者-影片實際評分RDD,並將其與上面建立的預測評分RDD join起來
val ratingsAndPredictions = ratings.map{
    case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
 
// 匯入RegressionMetrics類
import org.apache.spark.mllib.evaluation.RegressionMetrics
// 建立預測評分-實際評分RDD
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
// 建立RegressionMetrics物件
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
 
// 列印MSE和RMSE
scala> println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
Mean Squared Error = 0.08528673495619758

scala> println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
Root Mean Squared Error = 0.29203892712478857

基本原理是將實際評分-預測評分扔到RegressionMetrics類裡,該類提供了mse和rmse成員,可直接輸出獲取。

2. 計算MAPK:

// 建立電影隱因子RDD,並將它廣播出去
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
scala> val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
itemFactors: Array[Array[Double]] = Array(Array(0.23932668566703796, -0.2555418312549591, 0.5615943074226379, 0.608315646648407, 
-0.16954585909843445, -0.43999677896499634, 0.03769494965672493, -1.8908671140670776...
scala> val itemMatrix = new DoubleMatrix(itemFactors)
itemMatrix: org.jblas.DoubleMatrix = [0.239327, -0.255542, 0.561594, 0.608316, -0.169546, -0.439997, 0.037695, -1.890867...

 
// 建立使用者id - 推薦列表RDD
val allRecs = model.userFeatures.map{ case (userId, array) => 
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}
 
// 建立使用者 - 電影評分ID集RDD
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
 
// 匯入RankingMetrics類
import org.apache.spark.mllib.evaluation.RankingMetrics
// 建立實際評分ID集-預測評分ID集 RDD
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
// 建立RankingMetrics物件
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
// 列印MAPK
scala> println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
Mean Average Precision = 0.19938697526328025

比較坑的是不能設定K,也就是說,計算的實際是MAP...... 正如屬性名:meanAveragePrecision。

小結

感覺MLlib的推薦系統真的很一般,一方面支援的型別少 - 只支援ALS;另一方面支援的推薦系統運算元也少,連輸出個RMSE指標都要寫好幾行程式碼,太不方便了。

唯一的好處是因為接近底層,所以可以讓使用者看到些實現的細節,對原理更加清晰。

原文:第三篇:一個Spark推薦系統引擎的實現 - 穆晨 - 部落格園

3.使用Spark分析Amazon DataSet(實現 Spark LR、Spark TFIDF)

到http://jmcauley.ucsd.edu/data/amazon/ 下載資料集:

ed35d363aab6e92d87fc646e6a763b9f.png

第一個是json檔案,資料形式是這樣的:

{"reviewerID": "APYOBQE6M18AA", "asin": "0615391206", "reviewerName": "Martin Schwartz", "helpful": [0, 0], 
"reviewText": "My daughter wanted this book and the price on Amazon was the best.  She has already tried one 
recipe a day after receiving the book.  She seems happy with it.", "overall": 5.0, "summary": "Best Price", 
"unixReviewTime": 1382140800, "reviewTime": "10 19, 2013"}

在tfidf計算的時候會用到這個檔案,使用reviewText欄位的內容。

第二個是csv檔案,資料形式:

A210NOCSTBT4OD,0076144011,4.0,1349308800
A28ILV4TOG8BH2,0130350591,5.0,1300752000
A31B4D7URW4DNZ,0307394530,2.0,1214784000
A2HU0RPDRZZOP1,0307394530,5.0,1277337600
A7J0XOW7DYBBD,0307394530,5.0,1393113600

幾個欄位分別是user,item,rating,timestamp。我們可以把低於4分的作為負例,高於或等於4分的作為正例,訓練LR模型。

但是這裡的user和item都不是數值型的,沒什麼特徵,所以要訓練LR模型還有點困難,所以這部分暫時留著不做。

Spark LR模型的寫法可以參考下面幾篇文章:

https://blog.csdn.net/flysky1991/article/details/80182501

【原】Spark之機器學習(Python版)(二)--分類 - Charlotte77 - 部落格園

下面來計算TFIDF。

首先讀取資料

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
jsons=sqlContext.read.json("file:///usr/local/lily/task7/Home_and_Kitchen_5.json")

看一下資料結構:

jsons.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

//首行資料
jsons.first()

Row(asin='0615391206', helpful=[0, 0], overall=5.0, reviewText='My daughter wanted this book and the price on Amazon was the best.  
She has already tried one recipe a day after receiving the book.  She seems happy with it.', reviewTime='10 19, 2013', 
reviewerID='APYOBQE6M18AA', reviewerName='Martin Schwartz', summary='Best Price', unixReviewTime=1382140800)

我們只需要用text資料

df = jsons.select('reviewText')

加上一列id,以便統計評論個數:

from pyspark.sql import functions as F
df = df.withColumn("doc_id", F.monotonically_increasing_id())

分詞,我這裡沒有用任何分詞包,因為英文的分詞主要就是空格分詞。

df = df.withColumn('keys',F.split('reviewText', " ")).drop('reviewText')

然後把分好的詞explode一下,這樣每個評論及其每個單詞都會形成一行

NUM_doc = df.count()
# One hot words
df = df.select('*', F.explode('keys').alias('token'))

計算TF,TF是針對一篇文章而言的,是一篇文章中的單詞頻數/單詞總數,這裡的一篇文章就是一條評論。

# Calculate TF
TF = df.groupBy("doc_id").agg(F.count("token").alias("doc_len")) 
    .join(df.groupBy("doc_id", "token")
          .agg(F.count("keys").alias("word_count")), ['doc_id']) 
    .withColumn("tf", F.col("word_count") / F.col("doc_len")) 
    .drop("doc_len", "word_count")
TF.cache()

這裡以評論id分組,並計算每個組內單詞的個數,也就是每個評論有多少單詞(doc_len),然後和另一個df2以欄位“doc_id”內連線,df2以評論id和單詞分組,計算組內分詞集合的個數,也就是每個詞出現在多少集合中(word_count)。最後再新增一列tf值,即單詞在文件中出現的次數/文件總詞數。

計算IDF,IDF是逆文件頻率,表示一個單詞在語料庫中出現的頻率,也就是一個單詞在多少篇文章中出現了。

# Calculate IDF
IDF = df.groupBy("token").agg(F.countDistinct("doc_id").alias("df"))
IDF = IDF.select('*', (F.log(NUM_doc / (IDF['df'] + 1))).alias('idf'))
IDF.cache()

這裡以每個單詞分組,計算單詞在不同評論中出現的次數,然後再用log(訓練語料的總文件數/(出現詞語x的文件數+1))計算出idf值。

計算TF-IDF,兩個df以單詞為欄位join,得到tf-idf值。

# Calculate TF-IDF
TFIDF = TF.join(IDF, ['token']).withColumn('tf-idf', F.col('tf') * F.col('idf'))

參考:

https://blog.csdn.net/macanv/article/details/87731785

利用Spark計算TF-IDF