1. 程式人生 > >使用Spark ALS實現協同過濾

使用Spark ALS實現協同過濾

更新:

  1. 【2016.06.12】Spark1.4.0中MatrixFactorizationModel提供了recommendForAll方法實現離線批量推薦,見SPARK-3066

測試環境

為了測試簡單,在本地以local方式執行Spark,你需要做的是下載編譯好的壓縮包解壓即可,可以參考Spark本地模式執行

測試資料使用MovieLensMovieLens 10M資料集,下載之後解壓到data目錄。資料的格式請參考README中的說明,需要注意的是ratings.dat中的資料被處理過,每個使用者至少訪問了20個商品

下面的程式碼均在spark-shell中執行,啟動時候可以根據你的機器記憶體設定JVM引數,例如:

bin/spark-shell --executor-memory 3g --driver-memory 3g --driver-java-options '-Xms2g -Xmx2g -XX:+UseCompressedOops'

預測評分

這個例子主要演示如何訓練資料、評分並計算根均方差。

準備工作

首先,啟動spark-shell,然後引入mllib包,我們需要用到ALS演算法類和Rating評分類:

import org.apache.spark.mllib.recommendation.{ALS, Rating}

Spark的日誌級別預設為INFO,你可以手動設定為WARN級別,同樣先引入log4j依賴:

import org.apache.log4j.{Logger,Level}

然後,執行下面程式碼:

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

載入資料

spark-shell啟動成功之後,sc為內建變數,你可以通過它來載入測試資料:

val data = sc.textFile("data/ml-1m/ratings.dat")

接下來解析檔案內容,獲得使用者對商品的評分記錄:

val ratings = data.map(_.split("::") match { case Array(user, item, rate, ts) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
}).cache()

檢視第一條記錄:

scala> ratings.first
res81: org.apache.spark.mllib.recommendation.Rating = Rating(1,1193,5.0)

我們可以統計檔案中使用者和商品數量:

val users = ratings.map(_.user).distinct()
val products = ratings.map(_.product).distinct()
println("Got "+ratings.count()+" ratings from "+users.count+" users on "+products.count+" products.")

可以看到如下輸出:

//Got 1000209 ratings from 6040 users on 3706 products.

你可以對評分資料生成訓練集和測試集,例如:訓練集和測試集比例為8比2:

val splits = ratings.randomSplit(Array(0.8, 0.2), seed = 111l)
val training = splits(0).repartition(numPartitions)
val test = splits(1).repartition(numPartitions)

這裡,我們是將評分資料全部當做訓練集,並且也為測試集。

訓練模型

接下來呼叫ALS.train()方法,進行模型訓練:

val rank = 12
val lambda = 0.01
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, lambda)

訓練完後,我們看看model中的使用者和商品特徵向量:

model.userFeatures
//res82: org.apache.spark.rdd.RDD[(Int, Array[Double])] = users MapPartitionsRDD[400] at mapValues at ALS.scala:218

model.userFeatures.count
//res84: Long = 6040

model.productFeatures
//res85: org.apache.spark.rdd.RDD[(Int, Array[Double])] = products MapPartitionsRDD[401] at mapValues at ALS.scala:222

model.productFeatures.count
//res86: Long = 3706

評測

我們要對比一下預測的結果,注意:我們將訓練集當作測試集來進行對比測試。從訓練集中獲取使用者和商品的對映:

val usersProducts= ratings.map { case Rating(user, product, rate) =>
  (user, product)
}

顯然,測試集的記錄數等於評分總記錄數,驗證一下:

usersProducts.count  //Long = 1000209

使用推薦模型對使用者商品進行預測評分,得到預測評分的資料集:

var predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
}

檢視其記錄數:

predictions.count //Long = 1000209

將真實評分資料集與預測評分資料集進行合併,這樣得到使用者對每一個商品的實際評分和預測評分:

val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)

ratesAndPreds.count  //Long = 1000209

然後計算根均方差:

val rmse= math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean())

println(s"RMSE = $rmse")

上面這段程式碼其實就是對測試集進行評分預測並計算相似度,這段程式碼可以抽象為一個方法,如下:

/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {
  val usersProducts = data.map { case Rating(user, product, rate) =>
    (user, product)
  }

  val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }

  val ratesAndPreds = data.map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }.join(predictions)

  math.sqrt(ratesAndPreds.map { case ((user, product), (r1, r2)) =>
    val err = (r1 - r2)
    err * err
  }.mean())
}

除了RMSE指標,我們還可以及時AUC以及Mean average precision at K (MAPK),關於AUC的計算方法,參考RunRecommender.scala,關於MAPK的計算方法可以參考《Packt.Machine Learning with Spark.2015.pdf》一書第四章節內容,或者你可以看本文後面內容。

儲存真實評分和預測評分

我們還可以儲存使用者對商品的真實評分和預測評分記錄到本地檔案:

ratesAndPreds.sortByKey().repartition(1).sortBy(_._1).map({
  case ((user, product), (rate, pred)) => (user + "," + product + "," + rate + "," + pred)
}).saveAsTextFile("/tmp/result")

上面這段程式碼先按使用者排序,然後重新分割槽確保目標目錄中只生成一個檔案。如果你重複執行這段程式碼,則需要先刪除目標路徑:

import scala.sys.process._
"rm -r /tmp/result".!

我們還可以對預測的評分結果按使用者進行分組並按評分倒排序:

predictions.map { case ((user, product), rate) =>
  (user, (product, rate))
}.groupByKey(numPartitions).map{case (user_id,list)=>
  (user_id,list.toList.sortBy {case (goods_id,rate)=> - rate})
}

給一個使用者推薦商品

這個例子主要是記錄如何給一個或大量使用者進行推薦商品,例如,對使用者編號為384的使用者進行推薦,查出該使用者在測試集中評分過的商品。

找出5個使用者:

users.take(5) 
//Array[Int] = Array(384, 1084, 4904, 3702, 5618)

檢視使用者編號為384的使用者的預測結果中預測評分排前10的商品:

val userId = users.take(1)(0) //384
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))
//    Rating(384,2545,8.354966018818265)
//    Rating(384,129,8.113083736094676)
//    Rating(384,184,8.038113395650853)
//    Rating(384,811,7.983433591425284)
//    Rating(384,1421,7.912044967873945)
//    Rating(384,1313,7.719639594879865)
//    Rating(384,2892,7.53667094600392)
//    Rating(384,2483,7.295378004543803)
//    Rating(384,397,7.141158013610967)
//    Rating(384,97,7.071089782695754)

檢視該使用者的評分記錄:

val goodsForUser=ratings.keyBy(_.user).lookup(384)
// Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(384,2055,2.0), Rating(384,1197,4.0), Rating(384,593,5.0), Rating(384,599,3.0), Rating(384,673,2.0), Rating(384,3037,4.0), Rating(384,1381,2.0), Rating(384,1610,4.0), Rating(384,3074,4.0), Rating(384,204,4.0), Rating(384,3508,3.0), Rating(384,1007,3.0), Rating(384,260,4.0), Rating(384,3487,3.0), Rating(384,3494,3.0), Rating(384,1201,5.0), Rating(384,3671,5.0), Rating(384,1207,4.0), Rating(384,2947,4.0), Rating(384,2951,4.0), Rating(384,2896,2.0), Rating(384,1304,5.0))

productsForUser.size //Int = 22
productsForUser.sortBy(-_.rating).take(10).map(rating => (rating.product, rating.rating)).foreach(println)
//    (593,5.0)
//    (1201,5.0)
//    (3671,5.0)
//    (1304,5.0)
//    (1197,4.0)
//    (3037,4.0)
//    (1610,4.0)
//    (3074,4.0)
//    (204,4.0)
//    (260,4.0)

可以看到該使用者對22個商品評過分以及瀏覽的商品是哪些。

我們可以該使用者對某一個商品的實際評分和預測評分方差為多少:

val actualRating = productsForUser.take(1)(0)
//actualRating: org.apache.spark.mllib.recommendation.Rating = Rating(384,2055,2.0)    val predictedRating = model.predict(789, actualRating.product)
val predictedRating = model.predict(384, actualRating.product)
//predictedRating: Double = 1.9426030777174637
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
//squaredError: Double = 0.0032944066875075172

如何找出和一個已知商品最相似的商品呢?這裡,我們可以使用餘弦相似度來計算:

import org.jblas.DoubleMatrix

/* Compute the cosine similarity between two vectors */
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
  vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

以2055商品為例,計算實際評分和預測評分相似度

val itemId = 2055
val itemFactor = model.productFeatures.lookup(itemId).head
//itemFactor: Array[Double] = Array(0.3660752773284912, 0.43573060631752014, -0.3421429991722107, 0.44382765889167786, -1.4875195026397705, 0.6274569630622864, -0.3264533579349518, -0.9939845204353333, -0.8710321187973022, -0.7578890323638916, -0.14621856808662415, -0.7254264950752258)
val itemVector = new DoubleMatrix(itemFactor)
//itemVector: org.jblas.DoubleMatrix = [0.366075; 0.435731; -0.342143; 0.443828; -1.487520; 0.627457; -0.326453; -0.993985; -0.871032; -0.757889; -0.146219; -0.725426]

cosineSimilarity(itemVector, itemVector)
// res99: Double = 0.9999999999999999

找到和該商品最相似的10個商品:

val sims = model.productFeatures.map{ case (id, factor) =>
  val factorVector = new DoubleMatrix(factor)
  val sim = cosineSimilarity(factorVector, itemVector)
  (id, sim)
}
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Doubl