ALS推薦演算法—訓練並儲存—載入並測試
阿新 • • 發佈:2018-12-11
資料集下載地址:https://download.csdn.net/download/wsp_1138886114/10681947
一、讀取資料—清洗資料訓練並儲存
import os
from pyspark import SparkContext,SparkConf
from pyspark.mllib.recommendation import ALS,Rating
def create_spark_context():
os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
os.environ['SPARK_HOME'] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6'
spark_conf = SparkConf()\
.setAppName('Python_Spark_WordCount')\
.setMaster('local[4]') \
.set("spark.driver.extraJavaOptions", "-Xss4096k")
spark_context = SparkContext(conf=spark_conf) # 獲取SparkContext例項物件,
spark_context.setLogLevel('WARN') # 設定日誌級別
return spark_context
def prepare_data(spark_context):
# ------------1.讀取評分資料並解析 -------------
raw_user_data = spark_context.textFile("../ml-100k/u.data")
raw_ratings = raw_user_data.map(lambda line: line.split("\t")[:3])
ratings_rdd = raw_ratings.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
# ------------2.資料初步統計 ----------------
num_ratings = ratings_rdd.count()
num_users = ratings_rdd.map(lambda x: x[0]).distinct().count()
num_movies = ratings_rdd.map(lambda x: x[1]).distinct().count()
print("總共: ratings: " + str(num_ratings) + ", User: " + str(num_users) + ", Moive: " + str(num_movies))
return ratings_rdd
def save_mode(spark_context,model):
try:
model.save(spark_context, "../datas/als-model")
except Exception:
print ("儲存模型出錯")
if __name__ =="__main__":
sc = create_spark_context()
print("==================資料準備階段===================")
rating_rdd = prepare_data(sc)
print("==================模型訓練階段===================")
#開始使用ALS演算法:rank=5",iterations = 5, lambda = 0.1
als_model = ALS.train(rating_rdd,5,iterations=5,lambda_=0.1)
print( als_model)
print("==================模型儲存階段===================")
save_mode(sc,als_model)
sc.stop()
二、載入模型—預測
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import MatrixFactorizationModel
import os
import sys
def create_spark_context():
os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
os.environ['SPARK_HOME'] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6'
spark_conf = SparkConf()\
.setAppName('Python_Spark_WordCount')\
.setMaster('local[4]') \
.set("spark.driver.extraJavaOptions", "-Xss4096k")
spark_context = SparkContext(conf=spark_conf) # 獲取SparkContext例項物件,
spark_context.setLogLevel('WARN') # 設定日誌級別
return spark_context
def prepare_data(spark_context):
item_rdd = sc.textFile("../ml-100k/u.item") # 讀取 u.item 電影資訊資料
movie_title = item_rdd \ # 建立 電影名稱 與 電影ID 對映的字典
.map(lambda line: line.split("|")) \
.map(lambda a: (float(a[0]), a[1]))
movie_title_dict = movie_title.collectAsMap() # 將RDD轉換字典
return movie_title_dict
def load_model(spark_context): # 載入模型
try:
model = MatrixFactorizationModel.load(spark_context, '../datas/als-model')
print (model)
return model
except Exception:
print ("載入模型出錯")
if __name__ =="__main__":
# 執行前點選 pycharm 選單欄 run->Edit configuration->Script parameters 輸入: --U 198(隨意user_id)
if len(sys.argv)!=3:
print("請輸入兩個引數:--U user_id,--M movie_id")
def recommend_movies(als, movies, user_id):
rmd_movies = als.recommendProducts(user_id, 10)
print('推薦的電影為:{}'.format(rmd_movies))
for rmd in rmd_movies:
print("為使用者{}推薦的電影為:{}".format(rmd[0], movies[rmd[1]]))
return rmd_movies
def recommend_users(als, movies, movie_id): # 為每個電影推薦10個使用者
rmd_users = als.recommendUsers(movie_id, 10)
print('針對電影ID:{0},電影名:{1},推薦是個使用者為:'.format(movie_id, movies[movie_id]))
for rmd in rmd_users:
print("推薦使用者ID:{},推薦評分:{}".format(rmd[0], rmd[2]))
def recommend(als_model, movie_dic):
if sys.argv[1] == '--U': # 推薦電影給使用者
recommend_movies(als_model, movie_dic, int(sys.argv[2]))
if sys.argv[1] == '--M': # 推薦使用者給電影
recommend_users(als_model, movie_dic, int(sys.argv[2]))
if __name__ == "__main__":
"""
1.資料準備
2.載入模型
3.預測推薦
"""
# 由於推薦的方式有兩種,一個是依據使用者的推薦,一個是基於商品的推薦
if len(sys.argv) != 3:
print("請輸入2個引數, 要麼是: --U user_id, 要麼是: --M movie_id")
exit(-1)
sc = create_spark_context()
# 資料準備,就是載入電影資料資訊,轉換字典
print('============= 資料準備 =============')
movie_title_dic = prepare_data(sc)
print('============= 載入模型 =============')
als_load_model = load_model(sc)
print('============= 預測推薦 =============')
recommend(als_load_model, movie_title_dic)