基於Python Spark的推薦系統
ALS推薦演算法
Spark MLlib中實現了ALS(Alternating Least Squares)基於協同過濾的推薦演算法。
MovieLens資料集
下載ml-100k資料至工作目錄中,終端輸入命令:
mkdir -p ~/pythonwork/PythonProject/data
cd ~/pythonwork/PythonProject/data
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip -j ml-100k
啟動 Hadoop Muti Node Cluster,複製ml-100k至HDFS中
start-all.sh
hadoop fs -mkdir /user/yyf/data
hadoop fs -copyFromLocal -f ~/pythonwork/PythonProject/data /user/yyf/
hadoop fs -ls /user/yyf/data
啟動Ipython Notebook(Hadoop YARN-client模式)
終端輸入命令:
cd ~/pythonwork/ipynotebook
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" HADOOP_CONF_IR=/usr/local/hadoop/etc/hadoop MASTER=yarn-client pyspark
準備資料
ALS演算法訓練資料的格式是Rating RDD資料型別
1、配置檔案讀取路徑(IPython/Jupyter Notebook中鍵入以下命令)
global Path
if sc.master[:5]=="local":
Path="file:/home/yyf/pythonwork/PythonProject"
else:
Path="hdfs://master:9000/user/yyf/"
2、匯入ml-100k資料u.data
rawUserData=sc.textFile(Path+"data/u.data")
rawUserData.count()
上圖,4個欄位分別是:使用者id、專案(電影)id、評分、評分日期
匯入Rating模組,讀取rawUserData前3個欄位,按照使用者id、電影id、使用者對電影的評分來編寫rawRatings
rawRatings=rawUserData.map(lambda line:line.split("\t")[:3])
## 取前5項
rawRatings.take(5)
3、準備ALS訓練資料
ALS訓練資料格式 Rating RDD資料型別定義如下:
Rating(user,product,rating)
其中,user欄位是使用者編號,product是產品編號,rating是使用者對產品的評價
第一行表示:編號為196的使用者對編號為242的電影的評分為3
統計使用者數和電影數:
訓練模型
1、匯入ALS模組
from pyspark.mllib.recommendation import ALS
使用ALS.train進行訓練,兩種訓練模式:顯式評分訓練與隱士評分訓練:
- 顯式評分(Explicit Rating)訓練
使用實際的電影評分,ALS.train(ratings,rank,iterations=5,lambda_=0.01)
- 隱式評分(Implicit Rating)訓練
將實際的電影評分看作1(實際意義變成了使用者對電影感不感興趣的二分類標籤問題),ALS.trainImplicit(ratings,rank,iterations=5,lambda_=0.01)
兩種訓練模式都會返回MatrixFactorizationModel模型,其中引數:
- ratings是格式為Rating RDD的訓練資料
- rank指矩陣分解時,原本矩陣A(m*n)分解成X(m*rank)矩陣與Y(rank*n)矩陣
- iterations為迭代次數預設5
- lambda預設值為0.01
2、使用ASL.train命令進行訓練
使用模型進行推薦
1、針對使用者推薦電影
使用MatrixFactorizationModel.recommendProducts(user: Int, num: Int)方法,其中引數user為使用者id,num為推薦的項數 。
例如對編號為666的使用者推薦5部電影:
上圖推薦結果中,第一項表示系統對使用者666首先推薦的電影id,推薦的評分約為5.70。返回的結果按照評分從高到低排列,評分越高說明使用者可能越感興趣。
2、預測使用者對某部電影的評分
使用.predict(user: Int, product: Int)方法:
上圖的執行結果表示:編號為666的使用者對編號為957的電影的評分可能為2.72
3、針對電影推薦使用者
給定某部電影,將其推薦給可能對它感興趣的使用者,使用.recommendUsers(product: Int, num: Int)方法,其中引數product表示電影id,num表示推薦的使用者數。
例如將編號為957的電影推薦給5個使用者:
執行結果返回5個使用者(對957最感興趣的)以及相應的評分。
顯示推薦電影的名稱
讀取u.item檔案:
建立Recommend專案
建立Recommend專案,包含兩個.py程式:
1、RecommendTrain.py
- (1) 資料準備階段:讀取u.data,經過處理後產生評分資料ratingsRDD
- (2) 訓練階段:評分資料ratingsRDD經過ALS.train訓練後產生模型Model
- (3) 儲存模型:儲存模型Model在本地或HDFS中,作為後續推薦使用
2、Recommend.py
- (1) 資料準備階段:讀取u.item經過資料處理後,產生movieTitle(電影ID:電影名稱)對照表
- (2) 載入模型:載入之前儲存的模型Model
- (3) 推薦階段:使用Model進行推薦,並使用movieTitle轉換顯示推薦電影名稱
RecommendTrain.py
# -*-coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS
def SetPath(sc):
"""定義全域性變數Path,配置檔案讀取"""
global Path
if sc.master[0:5] == "local": # 當前為本地路徑,讀取本地檔案
Path = "file:/home/yyf/pythonwork/PythonProject/"
else: # 不是本地路徑,有可能是YARN Client或Spark Stand Alone必須讀取HDFS檔案
Path = "hdfs://master:9000/user/yyf/"
def CreateSparkContext():
"""定義CreateSparkContext函式便於建立SparkContext例項"""
sparkConf = SparkConf() \
.setAppName("Recommend") \
.set("spark.ui.showConsoleProgress","false")
sc = SparkContext(conf=sparkConf)
SetPath(sc)
print("master="+sc.master)
return sc
def PrepareData(sc):
"""資料預處理:讀取u.data檔案,轉化為ratingsRDD資料型別"""
rawUserData = sc.textFile(Path + "data/u.data")
rawRatings = rawUserData.map(lambda line: line.split("\t")[:3])
ratingsRDD = rawRatings.map(lambda x: (x[0], x[1], x[2]))
return ratingsRDD
def SaveModel(sc):
"""儲存模型"""
try:
model.save(sc, Path+"ALSmodel")
print("模型已儲存")
except Exception:
print("模型已存在,先刪除後建立")
if __name__ == "__main__":
sc = CreateSparkContext()
print("==========資料準備階段==========")
ratingsRDD = PrepareData(sc)
print("========== 訓練階段 ============")
print(" 開始ALS訓練,引數rank=5,iterations=10,lambda=0.1")
model = ALS.train(ratingsRDD, 5, 10, 0.1)
print("========== 儲存model ==========")
SaveModel(sc)
Recommend.py
# -*-coding: utf-8 -*-
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.recommendation import MatrixFactorizationModel
import sys
def SetPath(sc):
"""定義全域性變數Path,配置檔案讀取"""
global Path
if sc.master[0:5] == "local": # 當前為本地路徑,讀取本地檔案
Path = "file:/home/yyf/pythonwork/PythonProject/"
else: # 不是本地路徑,有可能是YARN Client或Spark Stand Alone必須讀取HDFS檔案
Path = "hdfs://master:9000/user/yyf/"
def CreateSparkContext():
"""定義CreateSparkContext函式便於建立SparkContext例項"""
sparkConf = SparkConf() \
.setAppName("Recommend") \
.set("spark.ui.showConsoleProgress","false")
sc = SparkContext(conf=sparkConf)
SetPath(sc)
print("master="+sc.master)
return sc
def loadModel(sc):
"""載入訓練好的推薦模型"""
try:
model = MatrixFactorizationModel.load(sc, Path+"ALSmodel")
print("載入模型成功")
except Exception:
print("模型不存在, 請先訓練模型")
return model
def PrepareData(sc):
"""資料準備:準備u.item檔案返回電影id-電影名字典(對照表)"""
itemRDD = sc.textFile(Path+"data/u.item")
movieTitle = itemRDD.map(lambda line: line.split("|")) \
.map(lambda a: (int(a[0]), a[1])) \
.collectAsMap()
return movieTitle
def RecommendMovies(model,movieTitle,inputUserId):
RecommendMovie = model.recommendProducts(inputUserId, int(input[1]))
print("對使用者ID為"+str(inputUserId)+"的使用者推薦下列"+input[1]+"部電影:")
for p in RecommendMovie:
print("對編號為" + str(p[0]) + "的使用者" + "推薦電影" + str(movieTitle[p[1]]) + "推薦評分為" + str(p[2]))
def RecommendUsers(model,movieTitle,inputMovieId):
RecommendUser = model.recommendUsers(inputMovieId, int(input[1]))
print("對電影ID為"+str(inputMovieId)+"的電影推薦給下列"+input[1]+"個使用者:")
for p in RecommendUser:
print("對編號為" + str(p[0]) + "的使用者" + "推薦電影" + str(movieTitle[p[1]]) + "推薦評分為" + str(p[2]))
def Recommend(model):
"""根據引數進行推薦"""
if input[0][0] == "U":
RecommendMovies(model, movieTitle, int(input[0][1:]))
if input[0][0] == "M":
RecommendUsers(model, movieTitle, int(input[0][1:]))
if __name__ == "__main__":
print("請輸入2個引數, 第一個引數指定推薦模式(使用者/電影), 第二個引數為推薦的數量如U666 10表示向用戶666推薦10部電影")
input = [i for i in sys.stdin.readline().strip().split(" ")]
sc=CreateSparkContext()
print("==========資料準備==========")
movieTitle = PrepareData(sc)
print("==========載入模型==========")
model = loadModel(sc)
print("==========進行推薦==========")
Recommend(model)
執行結果:
請輸入2個引數, 第一個引數指定推薦模式(使用者/電影), 第二個引數為推薦的數量如U666 10表示向用戶666推薦10部電影
U666 10
==========資料準備==========
==========載入模型==========
載入模型成功
==========進行推薦==========
對使用者ID為666的使用者推薦下列10部電影:
對編號為666的使用者推薦電影Boys, Les (1997)推薦評分為4.88358730439
對編號為666的使用者推薦電影Pather Panchali (1955)推薦評分為4.75295263874
對編號為666的使用者推薦電影Some Mother's Son (1996)推薦評分為4.53178390243
對編號為666的使用者推薦電影Anna (1996)推薦評分為4.50898184243
對編號為666的使用者推薦電影Butcher Boy, The (1998)推薦評分為4.46272366026
對編號為666的使用者推薦電影Spanish Prisoner, The (1997)推薦評分為4.46272366026
對編號為666的使用者推薦電影Brothers in Trouble (1995)推薦評分為4.46272366026
對編號為666的使用者推薦電影Butcher Boy, The (1998)推薦評分為4.46272366026
對編號為666的使用者推薦電影They Made Me a Criminal (1939)推薦評分為4.39719073006
對編號為666的使用者推薦電影Saint of Fort Washington, The (1993)推薦評分為4.37662670282