Spark ML自定義選擇最優模型演算法深入剖析-Spark商業ML實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何商業交流,可隨時聯絡。
1 自定義選擇最優模型
什麼叫做自定義模型?其實就是不借助Spark官方支援的交叉驗證和訓練驗證拆分,而是根據實際場景進行自定義的RMSE等指標進行綜合分析。奉上試驗美圖:
2 協同過濾(Collaborative Filtering)
-
顯式的使用者反饋:這類是使用者在網站上自然瀏覽或者使用網站以外,顯式地提供反饋資訊,例如使用者對物品的評分或者對物品的評論。
-
隱式的使用者反饋:這類是使用者在使用網站是產生的資料,隱式地反映了使用者對物品的喜好,例如使用者購買了某物品,使用者查看了某物品的資訊,等等
-
顯式的使用者反饋能準確地反映使用者對物品的真實喜好,但需要使用者付出額外的代價;
-
而隱式的使用者行為,通過一些分析和處理,也能反映使用者的喜好,只是資料不是很精確,有些行為的分析存在較大的噪音。但只要選擇正確的行為特徵,隱式的使用者反饋也能得到很好的效果,只是行為特徵的選擇可能在不同的應用中有很大的不同,
-
Spark ML目前支援基於協同過濾的模型,在這個模型裡,使用者和產品被一組可以用來預測缺失專案的潛在因子來描述。ML 實現了交替最小二乘(ALS)演算法來學習這些潛在的因子,在 ML 中的實現有如下引數:
numBlocks 是用於並行化計算的使用者和商品的分塊個數 (預設為10)。 rank 是模型中隱語義因子的個數(預設為10)。 maxIter 是迭代的次數(預設為10)。 regParam 是ALS的正則化引數(預設為1.0)。 implicitPrefs 決定了是用顯性反饋ALS的版本還是用適用隱性反饋資料集的版本(預設是false,即用顯性反饋)。 alpha 是一個針對於隱性反饋 ALS 版本的引數,這個引數決定了偏好行為強度的基準(預設為1.0)。 nonnegative 決定是否對最小二乘法使用非負的限制(預設為false)。
3 經典協同過濾資料集
-
在MovieLens提供的電影評分資料分為三個表:評分、使用者資訊和電影資訊,在該系列提供的附屬資料提供大概6000位讀者和100萬個評分資料。
-
評分資料說明(ratings.data)
該評分資料總共四個欄位,格式為UserID::MovieID::Rating::Timestamp, 分為為使用者編號::電影編號::評分::評分時間戳, 其中各個欄位說明如下: 使用者編號範圍1~6040 電影編號1~3952 電影評分為五星評分,範圍0~5 評分時間戳單位秒 每個使用者至少有20個電影評分 1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968
-
2.使用者資訊(users.dat)
使用者資訊五個欄位,格式為UserID::Gender::Age::Occupation::Zip-code, 分為為使用者編號::性別::年齡::職業::郵編。 其中各個欄位說明如下: 使用者編號範圍1~6040 性別,其中M為男性,F為女性 不同的數字代表不同的年齡範圍,如:25代表25~34歲範圍 職業資訊,在測試資料中提供了21中職業分類 地區郵編 1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117
-
3 電影資訊(movies.dat)
電影資料分為三個欄位,格式為MovieID::Title::Genres,分為 電影編號::電影名::電影類別 其中各個欄位說明如下: 電影編號1~3952 由IMDB提供電影名稱,其中包括電影上映年份 電影分類,這裡使用實際分類名非編號,如:Action、Crime等 1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance
4 自定義模型
import org.apache.spark.sql.Row
import org.apache.spark.ml.recommendation.{ALS,ALSModel}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.evaluation.RegressionEvaluator
import spark.implicits._
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
1 根據資料結構定義資料規範
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) }
val ratings = spark.sparkContext.textFile("/data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()
ratings.show()
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
| 0| 2| 3.0|1424380312|
| 0| 3| 1.0|1424380312|
| 0| 5| 2.0|1424380312|
| 0| 9| 4.0|1424380312|
2 分割資料集
val splits = ratings.randomSplit(Array(0.6, 0.2, 0.2),12)
val training =splits(0).cache()
val validation=splits(1).toDF.cache()
val test =splits(2).toDF.cache()
3 訓練不同引數下的模型,並集中驗證,獲取最佳引數下的模型
val numValidation =validation.count
val numTraining =training.count
val numTest =test.count
val ranks = List(8, 12,13)
val lambdas = List(0.1, 10.0, 12.0)
val numIters = List(10, 20, 20)
var bestModel: Option[ALSModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
4 校驗集預測資料和實際資料之間的均方根誤差運算元(參看上一篇指標定義)
模型預測結果
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
| 13| 31| 1.0|1424380312| 2.35308|
| 0| 31| 1.0|1424380312| 2.5408225|
| 18| 31| 1.0|1424380312| 1.3848196|
| 4| 85| 1.0|1424380312| 2.4104187|
| 8| 85| 5.0|1424380312| 3.9386258|
| 23| 85| 1.0|1424380312| -0.7795656|
| 29| 85| 1.0|1424380312|0.118287265|
| 28| 65| 1.0|1424380312| 4.6700068|
def computeRmse(model: ALSModel, data: DataFrame, n: Long): Double = {
val predictions = model.transform(data)
//計算過程:((userId,movieId),(rating,prediction)) ====> (rating,prediction)
val predictionsAndRatings = predictions.rdd.map{x => ((x(0), x(1)),x(2))}
.join(predictions.rdd.map(x => ((x(0), x(1)), x(4))))
.values
math.sqrt(predictionsAndRatings.map(x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_ + _) / n)
}
5 計算最優模型的遍歷測試
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val als = new ALS().setMaxIter(numIter).setRegParam(lambda).setRank(rank).setUserCol("userId"). setItemCol("movieId").setRatingCol("rating")
val model = als.fit(training)
模型作用於驗證集進行驗證
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (模型評估) = " + validationRmse + " 引數 rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
RMSE (validation) = 1.0747445332055616 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 1.045271064998892 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 1.0213510038051121 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 1.005770421453116 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 2.041241452319315 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
6 最優模型作用於測試集
val testRmse = computeRmse(bestModel.get, test, numTest)
println("最優模型引數: rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", 最優模型均方根誤差為 " + testRmse + ".")
最優模型引數: rank = 12 and lambda = 0.1, and numIter = 20, 最優模型均方根誤差為 0.9519301678208573.
5 飯後甜點(基於RegressionEvaluator評估器)
使用ALS來建立推薦模型,這裡我們構建了兩個模型,一個是顯性反饋,一個是隱性反饋
1 構建模型
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
顯性反饋
val alsExplicit = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId"). setItemCol("movieId").setRatingCol("rating")
隱性反饋
val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
val modelExplicit = alsExplicit.fit(training)
val modelImplicit = alsImplicit.fit(training)
2 模型預測
val predictionsExplicit = modelExplicit.transform(test)
predictionsExplicit.show()
------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 29| 31| 1.0|1424380312| 1.6074163|
| 0| 31| 1.0|1424380312| 1.7223389|
| 28| 85| 1.0|1424380312| 4.469816|
| 26| 85| 1.0|1424380312| 1.0373509|
| 15| 85| 1.0|1424380312| 5.004366|
| 23| 85| 1.0|1424380312|-1.0499454|
| 5| 65| 2.0|1424380312| 0.6295809|
val predictionsImplicit = modelImplicit.transform(test)
predictionsImplicit.show()
+------+-------+------+----------+------------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+------------+
| 29| 31| 1.0|1424380312| 0.24691844|
| 0| 31| 1.0|1424380312| 0.18451405|
| 28| 85| 1.0|1424380312| 0.6521389|
| 26| 85| 1.0|1424380312| 0.9124242|
| 15| 85| 1.0|1424380312| 0.6542819|
| 23| 85| 1.0|1424380312| 0.34113467|
3 模型評估
scala>val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_bfdf233c6bad
scala>val rmseExplicit = evaluator.evaluate(predictionsExplicit)
rmseExplicit: Double = 1.6652229504120535
scala>val rmseImplicit = evaluator.evaluate(predictionsImplicit)
rmseImplicit: Double = 1.7925024428799021
結語
花了大量時間實現了自定義模型評估演算法,不管對你有沒有用,我覺得通過例項和現場試驗,我終於強化了我的知識體系。辛苦成文,各自珍惜,謝謝!奉上美圖。
秦凱新 於深圳 201811181817