Spark ML 之 推薦演算法專案(下)
阿新 • • 發佈:2020-10-25
一、整體思路
二、程式碼分析
1、合併資料。使用者見過的商品,根據使用者行為,區分喜歡0-不喜歡1;使用者沒見過的商品,標記為2
// 判斷使用者是否喜歡商品 假設使用者下單或存放購物車 就喜歡 否則不喜歡 val isLove: UserDefinedFunction = udf{ (act:String)=>{ if(act.equalsIgnoreCase("BROWSE") ||act.equalsIgnoreCase("COLLECT")){ 0 }else{ 1 } } }
import spark.implicits._ // 獲取全域性熱賣的資料 // (cust_id,good_id,rank) val hot = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_hotsell") .select($"cust_id",$"good_id") // 獲取分組召回的資料 val group = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_kMeans") .select($"cust_id",$"good_id") // 獲取ALS召回資料 val als = HDFSConnection.readDataToHDFS(spark,"/myshops/dwd_ALS_Iter20") .select($"cust_id",$"good_id") // 獲取使用者下單資料,使用者下單或購物車=> 喜歡 else=> 不喜歡 val order = spark.sparkContext .textFile("file:///D:/logs/virtualLogs/*.log") .map(line=>{ val arr = line.split("") (arr(0),arr(2),arr(3)) }) .toDF("act","cust_id","good_id") .withColumn("flag",isLove($"act")) .drop("act") .distinct() .cache() // 三路召回合併(包含冷使用者=> 2) // 使用者完全沒有見過的商品填充為2 val all = hot.union(group).union(als) .join(order,Seq("cust_id","good_id"),"left") .na.fill(2)
2、準備LR模型需要的資料:label:喜不喜歡,features:user和goods的屬性,並歸一化
// 簡單資料歸一化 val priceNormalize: UserDefinedFunction =udf{ (price:String)=>{ // maxscale & minscale val p:Double = price.toDouble p/(10000+p) } }
def goodNumberFormat(spark: SparkSession): DataFrame ={ val good_infos = MYSQLConnection.readMySql(spark,"goods") .filter("is_sale=1") .drop("spu_pro_name","tags","content","good_name","created_at","update_at","good_img_pos","sku_good_code") // 品牌的數字化處理 val brand_index = new StringIndexer().setInputCol("brand_name").setOutputCol("brand") val bi = brand_index.fit(good_infos).transform(good_infos) // 商品分類的數字化 val type_index = new StringIndexer().setInputCol("cate_name").setOutputCol("cate") val ct = type_index.fit(bi).transform(bi) // 原和現價歸一化 import spark.implicits._ val pc = ct.withColumn("nprice",priceNormalize($"price")) .withColumn("noriginal",priceNormalize($"original")) .withColumn("nsku_num",priceNormalize($"sku_num")) .drop("price","original","sku_num") // 特徵值轉數字化 val feat_index = new StringIndexer().setInputCol("spu_pro_value").setOutputCol("pro_value") feat_index.fit(pc).transform(pc).drop("spu_pro_value") }
// 每一列新增LR迴歸演算法需要的使用者自然屬性,使用者行為屬性,商品自然屬性 val user_info_df = KMeansHandler.user_act_info(spark) // 從資料庫獲取商品中影響商品銷售的自然屬性 val good_infos = goodNumberFormat(spark) // 將3路召回的資料和使用者資訊以及商品資訊關聯 val ddf = all.join(user_info_df,Seq("cust_id"),"inner") .join(good_infos,Seq("good_id"),"inner") // 資料全體轉 Double val columns = ddf.columns.map(f => col(f).cast(DoubleType)) val num_fmt = ddf.select(columns:_*) // 特徵列聚合到一起形成密集向量 val va = new VectorAssembler().setInputCols( Array("province_id","city_id","district_id","sex","marital_status","education_id","vocation","post","compId","mslevel","reg_date","lasttime","age","user_score","logincount","buycount","pay","is_sale","spu_pro_status","brand","cate","nprice","noriginal","nsku_num","pro_value")) .setOutputCol("orign_feature") val ofdf = va.transform(num_fmt).select($"cust_id",$"good_id",$"flag".alias("label"),$"orign_feature") // 資料歸一化處理 val mmScaler = new MinMaxScaler().setInputCol("orign_feature").setOutputCol("features") val res = mmScaler.fit(ofdf).transform(ofdf) .select($"cust_id", $"good_id", $"label", $"features")
3、準備資料分兩類:一類label=0/1 用於預測,一類label=2 用於推薦
(res.filter("label!=2"),res.filter("label=2"))