推薦演算法!基於隱語義模型的協同過濾推薦之商品相似度矩陣
專案採用ALS作為協同過濾演算法,根據MongoDB中的使用者評分表計算離線的使用者商品推薦列表以及商品相似度矩陣。
通過ALS計算商品相似度矩陣,該矩陣用於查詢當前商品的相似商品併為實時推薦系統服務。
離線計算的ALS 演算法,演算法最終會為使用者、商品分別生成最終的特徵矩陣,分別是表示使用者特徵矩陣的U(m x k)矩陣,每個使用者有 k個特徵描述;表示物品特徵矩陣的V(n x k)矩陣,每個物品也由 k 個特徵描述。
V(n x k)表示物品特徵矩陣,每一行是一個 k 維向量,雖然我們並不知道每一個維度的特徵意義是什麼,但是k 個維度的數學向量表示了該行對應商品的特徵。
所以,每個商品用V(n x k)每一行的向量表示其特徵,於是任意兩個商品 p:特徵向量為,商品q:特徵向量為之間的相似度sim(p,q)可以使用和的餘弦值來表示:
資料集中任意兩個商品間相似度都可以由公式計算得到,商品與商品之間的相似度在一段時間內基本是固定值。最後生成的資料儲存到MongoDB的ProductRecs表中。
核心程式碼如下:
//計算商品相似度矩陣 //獲取商品的特徵矩陣,資料格式 RDD[(scala.Int, scala.Array[scala.Double])] val productFeatures = model.productFeatures.map{case (productId,features) => (productId, new DoubleMatrix(features)) } // 計算笛卡爾積並過濾合併 val productRecs = productFeatures.cartesian(productFeatures) .filter{case (a,b) => a._1 != b._1} .map{case (a,b) => val simScore = this.consinSim(a._2,b._2) // 求餘弦相似度 (a._1,(b._1,simScore)) }.filter(_._2._2 > 0.6) .groupByKey() .map{case (productId,items) => ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2))) }.toDF() productRecs .write .option("uri", mongoConfig.uri) .option("collection",PRODUCT_RECS) .mode("overwrite") .format("com.mongodb.spark.sql") .save()
其中,consinSim是求兩個向量餘弦相似度的函式,程式碼實現如下:
//計算兩個商品之間的餘弦相似度
def consinSim(product1: DoubleMatrix, product2:DoubleMatrix): Double ={
product1.dot(product2) / ( product1.norm2() * product2.norm2() )
}
在上述模型訓練的過程中,我們直接給定了隱語義模型的rank,iterations,lambda三個引數。對於我們的模型,這並不一定是最優的引數選取,所以我們需要對模型進行評估。通常的做法是計算均方根誤差(RMSE),考察預測評分與實際評分之間的誤差。
有了RMSE,我們可以就可以通過多次調整引數值,來選取RMSE最小的一組作為我們模型的優化選擇。
在scala/com.atguigu.offline/下新建單例物件ALSTrainer,程式碼主體架構如下:
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
//建立SparkConf
val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))
//建立SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
//載入評分資料
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()
// 將一個RDD隨機切分成兩個RDD,用以劃分訓練集和測試集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
//輸出最優引數
adjustALSParams(trainingRDD, testingRDD)
//關閉Spark
spark.close()
}
其中adjustALSParams方法是模型評估的核心,輸入一組訓練資料和測試資料,輸出計算得到最小RMSE的那組引數。程式碼實現如下:
//輸出最終的最優引數
def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
//這裡指定迭代次數為5,rank和lambda在幾個值中選取調整
val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))
yield {
val model = ALS.train(trainData,rank,5,lambda)
val rmse = getRMSE(model, testData)
(rank,lambda,rmse)
}
// 按照rmse排序
println(result.sortBy(_._3).head)
}
計算RMSE的函式getRMSE程式碼實現如下:
def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={
val userProducts = data.map(item => (item.user,item.product))
val predictRating = model.predict(userProducts)
val real = data.map(item => ((item.user,item.product),item.rating))
val predict = predictRating.map(item => ((item.user,item.product),item.rating))
// 計算RMSE
sqrt(
real.join(predict).map{case ((userId,productId),(real,pre))=>
// 真實值和預測值之間的差
val err = real - pre
err * err
}.mean()
)
}
執行程式碼,我們就可以得到目前資料的最優模型引數。