電商推薦系統七:基於物品的協同過濾相似推薦
阿新 • • 發佈:2021-02-08
7.2 基於物品的協同過濾相似推薦
基於物品的協同過濾(Item-CF),只需收集使用者的常規行為資料(比如點選、收藏、購買)就可以得到商品間的相似度
,在實際專案中應用很廣。
我們的整體思想是,如果兩個商品有同樣的受眾(感興趣的人群),那麼它們就是有內在相關性的。所以可以利用已有的行為資料,分析商品受眾的相似程度,進而得出商品間的相似度。我們把這種方法定義為物品的“同現相似度”,可以概括為喜歡同一商品的使用者能喜歡同一組使用者喜歡的別的商品,即人以群分
的理念。
公式如下:
其中,Ni 是購買商品 i (或對商品 i 評分)的使用者列表,Nj 是購買商品 j 的使用者列表。
核心程式碼實現如下:
package com.recom.itemcf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class ProductRating(userId: Int,productId:Int,score:Double,timestamp:Int)
case class MongoConfig(uri:String,db:String)
//定義標準推薦物件
case class Recommendation(productId: Int,score:Double )
//定義使用者推薦列表
case class UserRecs(userId:Int,recs:Seq[Recommendation])
//定義商品相似度列表
case class ProductRecs(productId:Int,recs:Seq[Recommendation])
object ItemCFRecommender {
//定義表名和常量
val MONGODB_RATING_COLLECTION = "Rating"
val USER_MAX_RECOMMENDATION=10
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
def main(args: Array[String]): Unit = {
//定義基礎配置的集合(可以放入配置檔案,通過方法獲取屬性的值)
val config = Map(
"spark.cores"->"local[*]",
"mongo.uri"->"mongodb://hadoop102:27017/recommender",
"mongo.db"->"recommender"
)
//建立一個spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
//建立一個spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
//匯入隱式轉換類,在DF和DS轉換的過程中會使用到
import spark.implicits._
//通過隱式類的方法建立mongodb連線物件
implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
//載入資料
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(rating=>(rating.userId,rating.productId,rating.score))
.toDF("userId","productId","score")
.cache()
//TODO:核心演算法,計算同現相似度,得到商品的相似列表
//統計每個商品的評分個數,按照productId來做group by
val productRatingCountDF = ratingDF.groupBy("productId").count()
//在原有評分表上新增count
val ratingWithCountDF = ratingDF.join(productRatingCountDF,"productId")
//將評分表按照使用者id兩兩配對,統計兩個商品被同一個使用者評分過的次數
val joinDF = ratingWithCountDF.join(ratingWithCountDF,"userId")
.toDF("userId","product1","score1","count1","product2","score2","count2")
.select("userId","product1","count1","product2","count2")
//joinDF.show()
//建立一張臨時表,用於寫sql查詢
joinDF.createOrReplaceTempView("joined")
//按照product1,product2做group by,統計userId的數量,即同時對兩個商品評分的人數
val cooccurrenceDF = spark.sql(
"""
|select product1,product2, count(userId) as cocount,
|first(count1) as count1, first(count2) as count2
|from joined
|group by product1,product2
|""".stripMargin
).cache()
//提取需要的資料,包裝成(product1,(product2,score))
val simDF = cooccurrenceDF.map{
raw =>
val coocSim = cooccurrenceSim(raw.getAs[Long]("cocount"),raw.getAs[Long]("count1")
,raw.getAs[Long]("count2"))
(raw.getInt(0),(raw.getInt(1),coocSim))
}
.rdd
.groupByKey()
.map{
case (productId,recs)=>
ProductRecs(productId,recs.toList.filter(x=>x._1!=productId)
.sortWith(_._2>_._2)
.take(USER_MAX_RECOMMENDATION)
.map(x=>Recommendation(x._1,x._2))
)
}
.toDF()
//儲存到mongodb
simDF.write
.option("uri",mongoConfig.uri)
.option("collection",ITEM_CF_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save( )
spark.stop()
}
//按照公式計算現同相似度
def cooccurrenceSim(coCount: Long, count1: Long, count2: Long)={
coCount/math.sqrt(count1*count2)
}
}