協同過濾(ALS)
Spark ALS演算法進行矩陣分解,U * V = Q
如果資料不是執行在叢集上,而是執行在本地,為了保證記憶體充足,在啟動spark-shell時需要指定引數--driver-memory 6g。
-
- 資料集
藝術家點播資料集:
使用者和藝術家的關係是通過其他行動隱含提現出來的,例如播放歌曲或專輯,而不是通過顯式的評分或者點贊得到的。這被稱為隱式反饋資料。現在的家用電視點播也是這樣,使用者一般不會主動評分。
資料集下載地址是http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
-
- 資料處理
Spark MLib的ALS演算法實現有一個小缺點:它要求使用者和產品的ID必須是數值型,並且是32位非負整數,這意味著大於Integer.MAX_VALUE(2147483647)的ID是非法的。我們首先看看資料集是否滿足要求:
scala> val rawUserArtistData = sc.textFile("D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt")
rawUserArtistData: org.apache.spark.rdd.RDD[String] = D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-
scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
res0: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)
scala> rawUserArtistData.map(_.
res1: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)
過濾資料
val rawArtistData = sc.textFile("hdfs:///user/ds/artist_data.txt")
val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}
-
- 訓練演算法
首先將資料轉換成Rating
import org.apache.spark.mllib.recommendation._
val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
val finalArtistID =
bArtistAlias.value.getOrElse(artistID, artistID)
Rating(userID, finalArtistID, count)
}.cache()
val
model =
ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
//
檢視特徵變數:
model.userFeatures.mapValues(_.mkString(", ")).first()
//我們可以對此使用者做出5個推薦:
val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
-
- 選擇超引數
計算AUC這部分程式碼沒有試。AUC(Area Under ROC Curve)是ROC(Receiver Operating Characteristic,受試者工作特徵)線,它源於二戰中用於敵機檢測的雷達訊號分析技術。在非均等代價下,ROC曲線不能直接反映出學習器的期望總體代價,而“代價曲線”則可達到該目的。
機器學習常涉及兩類引數:一類是演算法的引數,亦稱“超引數”,數目常在10以內;另一類是模型的引數,數目可能很多。前者通常是由人工設定多個引數候選值後產生模型,後者則是通過學習來產生多個候選模型。
ALS.trainImplicit()的引數包括以下幾個:
rank
模型的潛在因素的個數,即“使用者-特徵”和“產品-特徵”矩陣的列數;一般來說,它也是矩陣的階。
iterations
矩陣分解迭代的次數;迭代的次數越多,花費的時間越長,但分解的結果可能會更好。
lambda
標準的過擬合引數;值越大越不容易產生過擬合,但值太大會降低分解的準確度。lambda取較大的值看起來結果要稍微好一些。
alpha
控制矩陣分解時,被觀察到的“使用者-產品”互動相對沒被觀察到的互動的權重。40是最初ALS論文的預設值,這說明了模型在強呼叫戶聽過什麼時的表現要比強呼叫戶沒聽過什麼時要好。
-
- 應用(產生推薦)
val someUsers = allData.map(_.user).distinct().take(100)
val someRecommendations =
someUsers.map(userID => model.recommendProducts(userID, 5))
someRecommendations.map(
recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")
).foreach(println)
-
- Rating資料程式碼
def run(filename: String) {
println("mlALS is running!")
val data = getData(filename)
println(s"dataLen:${data.count}")
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("newUserId")
.setItemCol("newMovieId")
.setRatingCol("newRating");
val split = data.randomSplit(Array(0.8, 0.2))
val model = als.fit(split(0));
println(s"userCol name:${model.getUserCol}")
val predictions = model.transform(split(1))
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("newRating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
val userCommd = model.recommendForAllUsers(10)
userCommd.show()
}
def getData(filename: String) :Dataset[Row] = {
val sqlContext = new SQLContext(sc.sparkContext)
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> filename, "header" -> "true"))
df.columns.map(col => println(s"col:${col}"))
val newdf = df.select(df.col("userId").cast(IntegerType).as("newUserId"),
df.col("movieId").cast(IntegerType).as("newMovieId"),
df.col("rating").cast(DoubleType).as("newRating"),
df.col("timestamp").cast(LongType).as("newTimeStamp"))
newdf
}
-
- 資料正則化
val dataFrame = sqlContext.read.format("libsvm").load("data/libsvm.txt")
// L1正則化
val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures")
// 設定 L1正則化
.setP(1.0)
// 正則化轉換
val l1NormData = normalizer.transform(dataFrame)
// L2正則化
val l2InfNormData = normalizer.transform(dataFrame, normalizer.p -> 2)
l2InfNormData.foreach(println)