1. 程式人生 > >基於Python Spark的推薦系統

基於Python Spark的推薦系統

ALS推薦演算法

Spark MLlib中實現了ALS(Alternating Least Squares)基於協同過濾的推薦演算法。

MovieLens資料集

下載ml-100k資料至工作目錄中,終端輸入命令:

mkdir -p ~/pythonwork/PythonProject/data
cd ~/pythonwork/PythonProject/data
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip -j ml-100k

啟動 Hadoop Muti Node Cluster,複製ml-100k至HDFS中

start-all.sh
hadoop fs -mkdir /user/yyf/data
hadoop fs -copyFromLocal -f ~/pythonwork/PythonProject/data /user/yyf/
hadoop fs -ls /user/yyf/data

啟動Ipython Notebook(Hadoop YARN-client模式)

終端輸入命令:

cd ~/pythonwork/ipynotebook
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" HADOOP_CONF_IR=/usr/local/hadoop/etc/hadoop MASTER=yarn-client pyspark

準備資料

ALS演算法訓練資料的格式是Rating RDD資料型別

1、配置檔案讀取路徑(IPython/Jupyter Notebook中鍵入以下命令)

global Path
if sc.master[:5]=="local":
    Path="file:/home/yyf/pythonwork/PythonProject"
else:
    Path="hdfs://master:9000/user/yyf/"

2、匯入ml-100k資料u.data

rawUserData=sc.textFile(Path+"data/u.data")
rawUserData.count()

這裡寫圖片描述

上圖,4個欄位分別是:使用者id、專案(電影)id、評分、評分日期

匯入Rating模組,讀取rawUserData前3個欄位,按照使用者id、電影id、使用者對電影的評分來編寫rawRatings

rawRatings=rawUserData.map(lambda line:line.split("\t")[:3])
## 取前5項
rawRatings.take(5)

這裡寫圖片描述

3、準備ALS訓練資料

ALS訓練資料格式 Rating RDD資料型別定義如下:

Rating(user,product,rating)

其中,user欄位是使用者編號,product是產品編號,rating是使用者對產品的評價

這裡寫圖片描述

第一行表示:編號為196的使用者對編號為242的電影的評分為3

統計使用者數和電影數:
這裡寫圖片描述

訓練模型

1、匯入ALS模組

from pyspark.mllib.recommendation import ALS

使用ALS.train進行訓練,兩種訓練模式:顯式評分訓練與隱士評分訓練:

  • 顯式評分(Explicit Rating)訓練

使用實際的電影評分,ALS.train(ratings,rank,iterations=5,lambda_=0.01)

  • 隱式評分(Implicit Rating)訓練

將實際的電影評分看作1(實際意義變成了使用者對電影感不感興趣的二分類標籤問題),ALS.trainImplicit(ratings,rank,iterations=5,lambda_=0.01)

兩種訓練模式都會返回MatrixFactorizationModel模型,其中引數:

  • ratings是格式為Rating RDD的訓練資料
  • rank指矩陣分解時,原本矩陣A(m*n)分解成X(m*rank)矩陣與Y(rank*n)矩陣
  • iterations為迭代次數預設5
  • lambda預設值為0.01

2、使用ASL.train命令進行訓練

這裡寫圖片描述

使用模型進行推薦

1、針對使用者推薦電影

使用MatrixFactorizationModel.recommendProducts(user: Int, num: Int)方法,其中引數user為使用者id,num為推薦的項數 。

例如對編號為666的使用者推薦5部電影:
這裡寫圖片描述

上圖推薦結果中,第一項表示系統對使用者666首先推薦的電影id,推薦的評分約為5.70。返回的結果按照評分從高到低排列,評分越高說明使用者可能越感興趣。

2、預測使用者對某部電影的評分

使用.predict(user: Int, product: Int)方法:
這裡寫圖片描述
上圖的執行結果表示:編號為666的使用者對編號為957的電影的評分可能為2.72

3、針對電影推薦使用者

給定某部電影,將其推薦給可能對它感興趣的使用者,使用.recommendUsers(product: Int, num: Int)方法,其中引數product表示電影id,num表示推薦的使用者數。

例如將編號為957的電影推薦給5個使用者:
這裡寫圖片描述

執行結果返回5個使用者(對957最感興趣的)以及相應的評分。

顯示推薦電影的名稱

讀取u.item檔案:
這裡寫圖片描述

建立Recommend專案

建立Recommend專案,包含兩個.py程式:

1、RecommendTrain.py

  • (1) 資料準備階段:讀取u.data,經過處理後產生評分資料ratingsRDD
  • (2) 訓練階段:評分資料ratingsRDD經過ALS.train訓練後產生模型Model
  • (3) 儲存模型:儲存模型Model在本地或HDFS中,作為後續推薦使用

2、Recommend.py

  • (1) 資料準備階段:讀取u.item經過資料處理後,產生movieTitle(電影ID:電影名稱)對照表
  • (2) 載入模型:載入之前儲存的模型Model
  • (3) 推薦階段:使用Model進行推薦,並使用movieTitle轉換顯示推薦電影名稱

RecommendTrain.py

# -*-coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS

def SetPath(sc):
    """定義全域性變數Path,配置檔案讀取"""
    global Path
    if sc.master[0:5] == "local":   # 當前為本地路徑,讀取本地檔案
        Path = "file:/home/yyf/pythonwork/PythonProject/"
    else:       # 不是本地路徑,有可能是YARN Client或Spark Stand Alone必須讀取HDFS檔案
        Path = "hdfs://master:9000/user/yyf/"


def CreateSparkContext():
    """定義CreateSparkContext函式便於建立SparkContext例項"""
    sparkConf = SparkConf() \
             .setAppName("Recommend") \
             .set("spark.ui.showConsoleProgress","false")
    sc = SparkContext(conf=sparkConf)
    SetPath(sc)
    print("master="+sc.master)
    return sc

def PrepareData(sc):
    """資料預處理:讀取u.data檔案,轉化為ratingsRDD資料型別"""
    rawUserData = sc.textFile(Path + "data/u.data")
    rawRatings = rawUserData.map(lambda line: line.split("\t")[:3])
    ratingsRDD = rawRatings.map(lambda x: (x[0], x[1], x[2]))
    return ratingsRDD


def SaveModel(sc):
    """儲存模型"""
    try:
        model.save(sc, Path+"ALSmodel")
        print("模型已儲存")
    except Exception:
        print("模型已存在,先刪除後建立")


if __name__ == "__main__":
    sc = CreateSparkContext()
    print("==========資料準備階段==========")
    ratingsRDD = PrepareData(sc)
    print("========== 訓練階段 ============")
    print(" 開始ALS訓練,引數rank=5,iterations=10,lambda=0.1")
    model = ALS.train(ratingsRDD, 5, 10, 0.1)
    print("========== 儲存model ==========")
    SaveModel(sc)

Recommend.py

# -*-coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.recommendation import MatrixFactorizationModel
import sys


def SetPath(sc):
    """定義全域性變數Path,配置檔案讀取"""
    global Path
    if sc.master[0:5] == "local":   # 當前為本地路徑,讀取本地檔案
        Path = "file:/home/yyf/pythonwork/PythonProject/"
    else:       # 不是本地路徑,有可能是YARN Client或Spark Stand Alone必須讀取HDFS檔案
        Path = "hdfs://master:9000/user/yyf/"


def CreateSparkContext():
    """定義CreateSparkContext函式便於建立SparkContext例項"""
    sparkConf = SparkConf() \
             .setAppName("Recommend") \
             .set("spark.ui.showConsoleProgress","false")
    sc = SparkContext(conf=sparkConf)
    SetPath(sc)
    print("master="+sc.master)
    return sc


def loadModel(sc):
    """載入訓練好的推薦模型"""
    try:
        model = MatrixFactorizationModel.load(sc, Path+"ALSmodel")
        print("載入模型成功")
    except Exception:
        print("模型不存在, 請先訓練模型")
    return model

def PrepareData(sc):
    """資料準備:準備u.item檔案返回電影id-電影名字典(對照表)"""
    itemRDD = sc.textFile(Path+"data/u.item")
    movieTitle = itemRDD.map(lambda line: line.split("|")) \
        .map(lambda a: (int(a[0]), a[1])) \
        .collectAsMap()
    return movieTitle


def RecommendMovies(model,movieTitle,inputUserId):
    RecommendMovie = model.recommendProducts(inputUserId, int(input[1]))
    print("對使用者ID為"+str(inputUserId)+"的使用者推薦下列"+input[1]+"部電影:")
    for p in RecommendMovie:
        print("對編號為" + str(p[0]) + "的使用者" + "推薦電影" + str(movieTitle[p[1]]) + "推薦評分為" + str(p[2]))


def RecommendUsers(model,movieTitle,inputMovieId):
    RecommendUser = model.recommendUsers(inputMovieId, int(input[1]))
    print("對電影ID為"+str(inputMovieId)+"的電影推薦給下列"+input[1]+"個使用者:")
    for p in RecommendUser:
        print("對編號為" + str(p[0]) + "的使用者" + "推薦電影" + str(movieTitle[p[1]]) + "推薦評分為" + str(p[2]))


def Recommend(model):
    """根據引數進行推薦"""
    if input[0][0] == "U":
        RecommendMovies(model, movieTitle, int(input[0][1:]))
    if input[0][0] == "M":
        RecommendUsers(model, movieTitle, int(input[0][1:]))


if __name__ == "__main__":
    print("請輸入2個引數, 第一個引數指定推薦模式(使用者/電影), 第二個引數為推薦的數量如U666 10表示向用戶666推薦10部電影")
    input = [i for i in sys.stdin.readline().strip().split(" ")]


    sc=CreateSparkContext()
    print("==========資料準備==========")
    movieTitle = PrepareData(sc)
    print("==========載入模型==========")
    model = loadModel(sc)
    print("==========進行推薦==========")
    Recommend(model)

執行結果:

請輸入2個引數, 第一個引數指定推薦模式(使用者/電影), 第二個引數為推薦的數量如U666 10表示向用戶666推薦10部電影
U666 10
==========資料準備==========
==========載入模型==========
載入模型成功
==========進行推薦==========
對使用者ID為666的使用者推薦下列10部電影:
對編號為666的使用者推薦電影Boys, Les (1997)推薦評分為4.88358730439
對編號為666的使用者推薦電影Pather Panchali (1955)推薦評分為4.75295263874
對編號為666的使用者推薦電影Some Mother's Son (1996)推薦評分為4.53178390243
對編號為666的使用者推薦電影Anna (1996)推薦評分為4.50898184243
對編號為666的使用者推薦電影Butcher Boy, The (1998)推薦評分為4.46272366026
對編號為666的使用者推薦電影Spanish Prisoner, The (1997)推薦評分為4.46272366026
對編號為666的使用者推薦電影Brothers in Trouble (1995)推薦評分為4.46272366026
對編號為666的使用者推薦電影Butcher Boy, The (1998)推薦評分為4.46272366026
對編號為666的使用者推薦電影They Made Me a Criminal (1939)推薦評分為4.39719073006
對編號為666的使用者推薦電影Saint of Fort Washington, The (1993)推薦評分為4.37662670282