1. 程式人生 > >ALS推薦演算法—訓練並儲存—載入並測試

ALS推薦演算法—訓練並儲存—載入並測試

資料集下載地址:https://download.csdn.net/download/wsp_1138886114/10681947
一、讀取資料—清洗資料訓練並儲存
import os
from pyspark import SparkContext,SparkConf
from pyspark.mllib.recommendation import ALS,Rating

def create_spark_context():
    os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
    os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
os.environ['SPARK_HOME'] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6' spark_conf = SparkConf()\ .setAppName('Python_Spark_WordCount')\ .setMaster('local[4]') \ .set("spark.driver.extraJavaOptions", "-Xss4096k") spark_context = SparkContext(conf=spark_conf) # 獲取SparkContext例項物件,
spark_context.setLogLevel('WARN') # 設定日誌級別 return spark_context def prepare_data(spark_context): # ------------1.讀取評分資料並解析 ------------- raw_user_data = spark_context.textFile("../ml-100k/u.data") raw_ratings = raw_user_data.map(lambda line: line.split("\t")[:3]) ratings_rdd =
raw_ratings.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2]))) # ------------2.資料初步統計 ---------------- num_ratings = ratings_rdd.count() num_users = ratings_rdd.map(lambda x: x[0]).distinct().count() num_movies = ratings_rdd.map(lambda x: x[1]).distinct().count() print("總共: ratings: " + str(num_ratings) + ", User: " + str(num_users) + ", Moive: " + str(num_movies)) return ratings_rdd def save_mode(spark_context,model): try: model.save(spark_context, "../datas/als-model") except Exception: print ("儲存模型出錯") if __name__ =="__main__": sc = create_spark_context() print("==================資料準備階段===================") rating_rdd = prepare_data(sc) print("==================模型訓練階段===================") #開始使用ALS演算法:rank=5",iterations = 5, lambda = 0.1 als_model = ALS.train(rating_rdd,5,iterations=5,lambda_=0.1) print( als_model) print("==================模型儲存階段===================") save_mode(sc,als_model) sc.stop()
二、載入模型—預測

from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import MatrixFactorizationModel
import os
import sys

def create_spark_context():
    os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
    os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
    os.environ['SPARK_HOME'] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6'
    spark_conf = SparkConf()\
        .setAppName('Python_Spark_WordCount')\
        .setMaster('local[4]') \
        .set("spark.driver.extraJavaOptions", "-Xss4096k")

    spark_context = SparkContext(conf=spark_conf) # 獲取SparkContext例項物件,
    spark_context.setLogLevel('WARN')             # 設定日誌級別
    return spark_context

def prepare_data(spark_context):
    item_rdd = sc.textFile("../ml-100k/u.item")   # 讀取 u.item 電影資訊資料
    movie_title = item_rdd \                      # 建立 電影名稱 與 電影ID  對映的字典
        .map(lambda line: line.split("|")) \
        .map(lambda a: (float(a[0]), a[1]))

    movie_title_dict = movie_title.collectAsMap() # 將RDD轉換字典
    return movie_title_dict

def load_model(spark_context):                    # 載入模型
    try:
        model = MatrixFactorizationModel.load(spark_context, '../datas/als-model')
        print (model)
        return model
    except Exception:
            print ("載入模型出錯")

if __name__ =="__main__": 
    # 執行前點選 pycharm 選單欄 run->Edit configuration->Script parameters 輸入: --U 198(隨意user_id)
    if len(sys.argv)!=3:
        print("請輸入兩個引數:--U user_id,--M movie_id")

def recommend_movies(als, movies, user_id):
    rmd_movies = als.recommendProducts(user_id, 10)
    print('推薦的電影為:{}'.format(rmd_movies))
    for rmd in rmd_movies:
        print("為使用者{}推薦的電影為:{}".format(rmd[0], movies[rmd[1]]))
    return rmd_movies

def recommend_users(als, movies, movie_id):      # 為每個電影推薦10個使用者
    rmd_users = als.recommendUsers(movie_id, 10)
    print('針對電影ID:{0},電影名:{1},推薦是個使用者為:'.format(movie_id, movies[movie_id]))
    for rmd in rmd_users:
        print("推薦使用者ID:{},推薦評分:{}".format(rmd[0], rmd[2]))


def recommend(als_model, movie_dic):
    if sys.argv[1] == '--U':                     # 推薦電影給使用者
        recommend_movies(als_model, movie_dic, int(sys.argv[2]))
    if sys.argv[1] == '--M':                     # 推薦使用者給電影
        recommend_users(als_model, movie_dic, int(sys.argv[2]))

if __name__ == "__main__":
    """
    1.資料準備
    2.載入模型
    3.預測推薦
    """
    # 由於推薦的方式有兩種,一個是依據使用者的推薦,一個是基於商品的推薦
    if len(sys.argv) != 3:
        print("請輸入2個引數, 要麼是: --U user_id,  要麼是: --M movie_id")
        exit(-1)
    sc = create_spark_context()

    # 資料準備,就是載入電影資料資訊,轉換字典
    print('============= 資料準備 =============')
    movie_title_dic = prepare_data(sc)
    print('============= 載入模型 =============')
    als_load_model = load_model(sc)
    print('============= 預測推薦 =============')
    recommend(als_load_model, movie_title_dic)