1. 程式人生 > >基於Spark構建推薦引擎之一:基於物品的協同過濾推薦

基於Spark構建推薦引擎之一:基於物品的協同過濾推薦

1、Spark構建推薦引擎之一:基於物品的協同過濾推薦

1.0 前言

目前SparkMLlib支援的推薦演算法只有alternating least squares (ALS)這一種,相比較Mahout中的推薦演算法,SparkMLlib目前不能支援目前的業務需求;因此,參照Mahout的推薦引擎,在Spark上構建同樣一套推薦演算法,以支援各種業務需求。

目前SparkMLlib官方網址:


Mahout的推薦引擎的詳細介紹參照:

1.1 資料輸入模型

輸入資料格式:

使用者ID,物品ID,評分

使用者輸入資料模型:

 defUserData (

     sc:SparkContext,input:String,

     split:String

     ):(RDD[(String,String,Double)]) = {

     valuser_rdd1= sc.textFile(input,10)

     valuser_rdd2=user_rdd1.map(line=> {

     valfileds= line.split("split")

          (fileds(0),fileds(1),fileds(2).toDouble)

     })

     user_rdd2

  }

通過UserData可以獲取使用者評分RDD;

輸入引數:sc是SparkContext,input就輸入資料路徑,split是資料分割符號。

1.2 相似度矩陣模型

 基於物品(ItemCF)的相似度演算法:

1)同現相似度

2)歐氏距離相似度

3)餘弦相似度

4)秩相關係數相似度

5)曼哈頓距離相似度

6)對數似然相似度

1.2.1 同現相似度矩陣

參照:《推薦系統實踐》一書中的介紹:

公式定義:


例項:


同現相似度模型:根據使用者評分資料表,生成物品的相似矩陣;

輸入引數:user_rdd:使用者評分表;

輸出引數:餘弦相似矩陣:物品1,物品2,相似度值;

同現相似度矩陣模型:

def Cooccurrence (

     user_rdd:RDD[(String,String,Double)]

     ) : (RDD[(String,String,Double)]) = {

   //  0 資料做準備

   valuser_rdd2=user_rdd.map(f => (f._1,f._2)).sortByKey()

   user_rdd2.cache  

   //  1 (使用者:物品)笛卡爾積 (使用者:物品) =>物品:物品組合

   valuser_rdd3=user_rdd2joinuser_rdd2

   valuser_rdd4=user_rdd3.map(data=> (data._2,1))

   //  2 物品:物品:頻次

   valuser_rdd5=user_rdd4.reduceByKey((x,y) => x + y)

   //  3 對角矩陣

   valuser_rdd6=user_rdd5.filter(f=> f._1._1 == f._1._2)

   //  4 非對角矩陣

   valuser_rdd7=user_rdd5.filter(f=> f._1._1 != f._1._2)

   //  5 計算同現相似度(物品1,物品2,同現頻次)

   valuser_rdd8=user_rdd7.map(f=> (f._1._1, (f._1._1, f._1._2, f._2))).

   join(user_rdd6.map(f=> (f._1._1, f._2)))

   valuser_rdd9=user_rdd8.map(f=> (f._2._1._2, (f._2._1._1,

        f._2._1._2, f._2._1._3, f._2._2)))

   valuser_rdd10=user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))

    val user_rdd11 = user_rdd10.map(f => (f._2._1._1,f._2._1._2,f._2._1._3,f._2._1._4,f._2._2))

   valuser_rdd12=user_rdd11.map(f=> (f._1, f._2, (f._3 / sqrt(f._4 * f._5)) ))

   //   6結果返回

   user_rdd12

  }

1.2.2 餘弦相似度


原理:多維空間兩點與所設定的點形成夾角的餘弦值。

範圍:[-1,1],值越大,說明夾角越大,兩點相距就越遠,相似度就越小。

餘弦相似度模型:根據使用者評分資料表,生成物品的相似矩陣;

輸入引數:user_rdd:使用者評分表;

輸出引數:餘弦相似矩陣:物品1,物品2,相似度值;

餘弦相似度矩陣模型:

 defCosineSimilarity (

     user_rdd:RDD[(String,String,Double)]

     ) : (RDD[(String,String,Double)]) = {

   //  0 資料做準備

   valuser_rdd2=user_rdd.map(f => (f._1,(f._2,f._3))).sortByKey()

   user_rdd2.cache

   //  1 (使用者,物品,評分)笛卡爾積 (使用者,物品,評分) =>(物品1,物品2,評分1,評分2)組合

   valuser_rdd3=user_rdd2joinuser_rdd2

   valuser_rdd4=user_rdd3.map(f=> ((f._2._1._1, f._2._2._1),(f._2._1._2, f._2._2._2)))

   //  2 (物品1,物品2,評分1,評分2)組合 => (物品1,物品2,評分1*評分2組合並累加

   valuser_rdd5=user_rdd4.map(f=> (f._1,f._2._1*f._2._2 )).reduceByKey(_+_)

   //  3 對角矩陣

   valuser_rdd6=user_rdd5.filter(f=> f._1._1 == f._1._2)

   //  4 非對角矩陣

   valuser_rdd7=user_rdd5.filter(f=> f._1._1 != f._1._2)

   //  5 計算相似度

   valuser_rdd8=user_rdd7.map(f=> (f._1._1, (f._1._1, f._1._2, f._2))).

   join(user_rdd6.map(f=> (f._1._1, f._2)))

   valuser_rdd9=user_rdd8.map(f=> (f._2._1._2, (f._2._1._1,

        f._2._1._2, f._2._1._3, f._2._2)))

   valuser_rdd10=user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))

    val user_rdd11 = user_rdd10.map(f => (f._2._1._1,f._2._1._2,f._2._1._3,f._2._1._4,f._2._2))

   valuser_rdd12=user_rdd11.map(f=> (f._1, f._2, (f._3 / sqrt(f._4 * f._5)) ))

   //  7 結果返回

   user_rdd12

  } 

1.2.3 歐氏距離相似度



原理:利用歐式距離d定義的相似度s,s=1 /(1+d)。

改進:

num = intersect(which(M[x,]!=0),which(M[y,]!=0))

d = sum((M[x,] - M[y,])^2,na.rm = T)

s = length(num)/(1 + sqrt(d))

範圍:[0,1],值越大,說明d越小,也就是距離越近,則相似度越大。

歐式相似度模型:根據使用者評分資料表,生成物品的相似矩陣;

輸入引數:user_rdd:使用者評分表;

輸出引數:餘弦相似矩陣:物品1,物品2,相似度值;

歐式相似度矩陣模型:

 def EuclideanDistanceSimilarity (

     user_rdd:RDD[(String,String,Double)]

     ) : (RDD[(String,String,Double)]) = {

   //  0 資料做準備

   valuser_rdd2=user_rdd.map(f => (f._1,(f._2,f._3))).sortByKey()

   user_rdd2.cache

   //  1 (使用者,物品,評分)笛卡爾積 (使用者,物品,評分) =>(物品1,物品2,評分1,評分2)組合

   valuser_rdd3=user_rdd2joinuser_rdd2

   valuser_rdd4=user_rdd3.map(f=> ((f._2._1._1, f._2._2._1),(f._2._1._2, f._2._2._2)))

   //  2 (物品1,物品2,評分1,評分2)組合 => (物品1,物品2,評分1-評分2組合並累加

   valuser_rdd5=user_rdd4.map(f=> (f._1,(f._2._1-f._2._2 )*(f._2._1-f._2._2 ))).reduceByKey(_+_)

   //  3 (物品1,物品2,評分1,評分2)組合 => (物品1,物品2,1組合並累加計算重疊數

   valuser_rdd6=user_rdd4.map(f=> (f._1,1)).reduceByKey(_+_)

   //  4 非對角矩陣

   valuser_rdd7=user_rdd5.filter(f=> f._1._1 != f._1._2)

   //  5 計算相似度

   valuser_rdd8=user_rdd7.join(user_rdd6)

   valuser_rdd9=user_rdd8.map(f=> (f._1._1,f._1._2,f._2._2/(1+sqrt(f._2._1))))  

//   7 結果返回

user_rdd9

 } 

1.2.4 秩相關係數相似度

等待更新......

1.2.5 曼哈頓距離相似度

等待更新......

1.2.6 對數似然相似度

等待更新......

1.3 推薦演算法模型

推薦模型:根據物品相似矩陣以及使用者對物品的評分表,計算使用者的推薦列表。

輸入引數:items_similar物品相似矩陣,user_perf:使用者評分表,r_number:推薦個數。

輸出引數:使用者推薦列表:使用者ID,物品ID,得分。

 defRecommend (

     items_similar:RDD[(String,String,Double)],

     user_perf:RDD[(String,String,Double)],

     r_number:Int

     ) : (RDD[(String,String,Double)]) = {  

    //  1 矩陣計算——i行與jjoin

    valrdd_app1_R2=items_similar.map(f => (f._2, (f._1,f._3))).

    join(user_perf.map(f => (f._2,(f._1,f._3))))

    //  2 矩陣計算——i行與j列元素相乘

    valrdd_app1_R3=rdd_app1_R2.map(f=> ((f._2._2._1,f._2._1._1),f._2._2._2*f._2._1._2))

    //  3 矩陣計算——使用者:元素累加求和

    valrdd_app1_R4=rdd_app1_R3.reduceByKey((x,y)=> x+y).map(f => (f._1._1,(f._1._2,f._2)))

    //  4 矩陣計算——使用者:使用者對結果排序,過濾

    valrdd_app1_R5=rdd_app1_R4.groupByKey()

    valrdd_app1_R6=rdd_app1_R5.map(f=> {

         val i2 = f._2.toBuffer

         val i2_2 = i2.sortBy(_._2)

         if (i2_2.length > r_number)i2_2.remove(0,(i2_2.length-r_number))

           (f._1,i2_2.toIterable)

    })

    valrdd_app1_R7=rdd_app1_R6.flatMap(f=> {

         val id2 = f._2     

         for (w <-id2)yield(f._1,w._1,w._2)

    })

    rdd_app1_R7

  }

1.4 評估演算法模型

個性化推薦演算法的通用評判標準:召回率(recall)與查準率(precision);


被檢索到的越多越好,這是追求“查全率”,即A/(A+B),越大越好。被檢索到的,越相關的越多越好,不相關的越少越好,這是追求“查準率”,即A/(A+C),越大越好。

評估演算法模型

等待更新......

轉載請註明出處:

http://blog.csdn.net/sunbow0/article/details/42737541