1. 程式人生 > >基於Spark的Als演算法+自迭代+Spark2.0新寫法

基於Spark的Als演算法+自迭代+Spark2.0新寫法

主要介紹了一下幾點:
1矩陣分解的幾種演算法
2spark使用矩陣分解的幾種方式,1ml 包中使用,2mllib包中的使用,其實有不呼叫包自己寫的案列(可以去看看哈,就在example目錄)
3使用ALS做推薦的一個比較詳細的流程:1自迭代確定比較優的引數是,2使用引數訓練模型,3使用模型推薦topn的物品給使用者
4講了怎麼自迭代ALS演算法引數,感覺這個還重要點
5提交spark的報了一個錯誤,已經錯誤解決方式
6好多細節都沒寫,感覺要寫的有好多,也不是很完善,時間不夠,只是提供了核心程式碼和思路

一:Spark2.0新概率解釋(僅限本文使用)

1 SparkSession

SparkSession是spark2.0的全新切入點,以前都是sparkcontext建立RDD的,StreamingContext,sqlContext,HiveContext。
DataDrame提供的API慢慢的成為新的標準API,我們需要1個新的切入點來構建他,這個就是SparkSession哈,以前我也沒見過.官網API介紹
這裡寫圖片描述
官網上說,這是用來構建Dataset和DataFrame的API的切入點。在環境中,SparkSession已經預先建立了,我們需要使用bulder方法得到已經存在在SparkSession。使用方法如下:

SparkSession.builder().getOrCreate()
SparkSession.builder()
  .master("local"
) .appName("Word Count") .config(key, value"). .getOrCreate()

二:ALS演算法

1含義

在現實中使用者-物品-評分矩陣是及其大的,使用者消費有限,對單個使用者來說,消費的物品的非常有限的,產生的評分也是比較少的,這樣就造成了使用者-物品矩陣有大量的空值。
假定使用者的興趣只受少數因素的影響,所以使用者-物品矩陣可以分解為使用者的特徵向量矩陣和物品的特徵向量矩陣(降維了)。使用者的特徵向量距離表示使用者的興趣(U),物品的特徵向量矩陣代表使用者的特點(V),合起來(內積)表示使用者對物品的特點的興趣,也就是喜好程度。
M=U*V

2協同過濾矩陣分解演算法

2.1奇異值分解(SVD)

矩陣的奇異值分解是最簡單的一種矩陣分解演算法,主要是在U*V中間加了個一個奇異值矩陣,公式如下:
M=U*(奇異值矩陣)*(V的共軛)
奇異值矩陣是對角矩陣,奇異值分解的缺點(沒試過不知道,書上說的),1不允許分解矩陣有null值,需要進行填分,2如果填分,又有兩個問題:1增加資料量,增加演算法複雜度,2簡單粗暴的填分方式會導致資料失真,如果將null值設定為0,那麼會導致過度學習問題。
奇異值分解方式,感覺用的不多,我自己接觸的話。

2.2正則化矩陣分解

加入正則化是為了解決稀疏矩陣可能過學習問題,評價矩陣分解是RMSE,通過最小化RMSE來學習使用者特徵矩陣U和物品特徵矩陣V,在RMSE函式中加入了正則化項減少過擬合,公式如下,公式都是書上寫的哈,這裡截圖:
這裡寫圖片描述
K表示評分記錄(u使用者對I物品的評分),Ru,i表示使用者u對物品i的真實評分,誒夢達表示正則化係數,誒夢達後面的表示防止過擬合的正則化項。
加入正則化的含義可以理解為,修改rmse,不要其太大或者太小。
假設使用者特徵矩陣為Umt,物品評分矩陣為Vtn,其中t特徵<

2.3帶偏置的矩陣分解(說的很有道理,但是比較難評估)

理論就不說了,舉個例子,u1對v1的評分為4表示u1對v1這個物品非常喜歡,u2對v1的評分為4表示u1對v1一般喜歡,對用使用者來說,即使他們對同一物品的評分相同,但是表示他們的喜好程度並不是一樣的。同理對於物品來說也是一樣。把這種獨立於使用者和獨立於物品的影響因素成為偏置,偏置一共有3個部分組成。
1訓練集中所有評分記錄的全域性平均,表示訓練集中總體評分情況,一般是一個常數。
2使用者偏置bu,獨立於物品特徵因素,表示使用者特定的打分習慣。
3物品偏置bi,表示獨立於使用者特徵因素,舉個列子,好片子一般總體評分偏高,爛片一般評分偏低,偏置就是表示這種特徵。
以上的所有偏置對使用者對物品喜好無關,得到的預測評分公式如下:
這裡寫圖片描述
按照這種思路,其實還要很多其他優化,比如加入時間因素,社會流行因素等。

Spark使用的是帶正則化矩陣分解,優化函式的方式選用的是交叉最小二乘法ALS

三Spark程式碼

spark程式碼一半是官方列子修改過來的哈

1呼叫ml包

使用org.apache.spark.ml.recommendation.ALS來計算,並且使用了spark2.0的新特性SparkSession來實現推薦,具體程式碼與註釋如下:

package org.wq.scala.ml 

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession


/**
  * Created by Administrator on 2016/10/24.
  */
//這是spark新的Als演算法的列子
object ALSRecommendNewTest {
  //定義個類,來儲存一次評分哈
  case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
  //把一行轉換成一個評分類
  def parseRating(str: String): Rating = {
    val fields = str.split("::")
    assert(fields.size == 4)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  }
  def main(args:Array[String])={
    //SparkSession是spark2.0的全新切入點,以前都是sparkcontext建立RDD的,StreamingContext,sqlContext,HiveContext。
    //DataDrame提供的API慢慢的成為新的標準API,我們需要1個新的切入點來構建他,這個就是SparkSession哈
    //以前我也沒見過
    val spark = SparkSession.builder().config("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse").master("local").appName("ALSExample").getOrCreate()
    import spark.implicits._

    //read方法返回的是一個DataFrameReader類,可以轉換為DataFrame
    //DataFrameReader類的textFile方法:載入文字資料,返回為Dataset
    //使用一個函式parseRating處理一行資料
    val ratings = spark.read.textFile("data/mllib/sample_movielens_ratings.txt").map(parseRating).toDF()

    val Array(training,test)=ratings.randomSplit(Array(0.8, 0.2))

    // Build the recommendation model using ALS on the training data
    //使用訓練資料訓練模型
    //這裡的ALS是import org.apache.spark.ml.recommendation.ALS,不是mllib中的哈
    //setMaxiter設定最大迭代次數
    //setRegParam設定正則化引數,日lambda這個不是更明顯麼
    //setUserCol設定使用者id列名
    //setItemCol設定物品列名
    //setRatingCol設定打分列名
    val als = new ALS()

      als.setRank(10)
      .setMaxIter(5)
      .setRegParam(0.01)
      .setUserCol("userId")
      .setItemCol("movieId")
      .setRatingCol("rating")

    //fit給輸出的資料,訓練模型,fit返回的是ALSModel類
    val model = als.fit(training)

    //使用測試資料計算模型的誤差平方和
    //transform方法把資料dataset換成dataframe型別,預測資料
    val predictions = model.transform(test)

    //RegressionEvaluator這個類是使用者評估預測效果的,預測值與原始值
    //這個setLabelCol要和als設定的setRatingCol一致,不然會報錯哈
    //RegressionEvaluator的setPredictionCol必須是prediction因為,ALSModel的預設predictionCol也是prediction
    //如果要修改的話必須把ALSModel和RegressionEvaluator一起修改
    //model.setPredictionCol("prediction")和evaluator.setPredictionCol("prediction")
    //setMetricName這個方法,評估方法的名字,一共有哪些呢?
    //rmse-平均誤差平方和開根號
    //mse-平均誤差平方和
    //mae-平均距離(絕對)
    //r2-沒用過不知道
    //這裡建議就是用rmse就好了,其他的基本都沒用,當然還是要看應用場景,這裡是預測分值就是用rmse。如果是預測距離什麼的mae就不從,看場景哈
    val evaluator = new RegressionEvaluator()
      .setMetricName("rmse")
      .setLabelCol("rating")
      .setPredictionCol("prediction")

    val rmse = evaluator.evaluate(predictions)
    println("Root-mean-square error = "+rmse)

    //stop是停止底層的SparkContext
    spark.stop()
  }
}

2呼叫mllib,實現

使用mllib中的ALS演算法如下,如果是生產,建議使用mllib中的

package org.wq.scala.ml

import org.apache.log4j.{Level, Logger}
import org.apache.spark.examples.mllib.AbstractParams

import scala.collection.mutable
//處理輸入引數的庫
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser

/**
  * Created by Administrator on 2016/10/24.
  */
object ALSRecommendMllibTest {

  //引數含義
  //input表示資料路徑
  //kryo表示是否使用kryo序列化
  //numIterations迭代次數
  //lambda正則化引數
  //numUserBlocks使用者的分塊數
  //numProductBlocks物品的分塊數
  //implicitPrefs這個引數沒用過,但是通過後面的可以推斷出來了,是否開啟隱藏的分值引數閾值,預測在那個級別才建議推薦,這裡是5分制度的,詳細看後面程式碼
  case class Params(
                     input: String = null,
                     output:String=null,
                     kryo: Boolean = false,
                     numIterations: Int = 20,
                     lambda: Double = 1.0,
                     rank: Int = 10,
                     numUserBlocks: Int = -1,
                     numProductBlocks: Int = -1,
                     implicitPrefs: Boolean = false) extends AbstractParams[Params]

  def main(args: Array[String]) {
    val defaultParams = Params()

    //規定引數的輸入方式 --rank 10 這種
    //我個人習慣為直接用空格分割(如果引數不對,給予提示),當然下面這種更規範化和人性化,還有預設引數的
    //以後再研究OptionParser用法,不過他這種引數用法挺好用的哈
    val parser = new OptionParser[Params]("Mllib 的ALS") {
      head("MovieLensALS: an example app for ALS on MovieLens data.")
      opt[Int]("rank")
        .text(s"rank, default: ${defaultParams.rank}")
        .action((x, c) => c.copy(rank = x))
      opt[Int]("numIterations")
        .text(s"number of iterations, default: ${defaultParams.numIterations}")
        .action((x, c) => c.copy(numIterations = x))
      opt[Double]("lambda")
        .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
        .action((x, c) => c.copy(lambda = x))
      opt[Unit]("kryo")
        .text("use Kryo serialization")
        .action((_, c) => c.copy(kryo = true))
      opt[Int]("numUserBlocks")
        .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
        .action((x, c) => c.copy(numUserBlocks = x))
      opt[Int]("numProductBlocks")
        .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
        .action((x, c) => c.copy(numProductBlocks = x))
      opt[Unit]("implicitPrefs")
        .text("use implicit preference")
        .action((_, c) => c.copy(implicitPrefs = true))
      arg[String]("<input>")
        .required()
        .text("input paths to a MovieLens dataset of ratings")
        .action((x, c) => c.copy(input = x))

      arg[String]("<output>")
        .required()
        .text("output Model Path")
        .action((x, c) => c.copy(output = x))
      note(
        """
          |For example, the following command runs this app on a synthetic dataset:
          |
          | bin/spark-submit --class org.apache.spark.examples.mllib.MovieLensALS \
          |  examples/target/scala-*/spark-examples-*.jar \
          |  --rank 5 --numIterations 20 --lambda 1.0 --kryo \
          |  data/mllib/sample_movielens_data.txt
        """.stripMargin)
    }

    //雖然是map但是隻執行1次哈,主要看run方法做了什麼
    parser.parse(args, defaultParams).map { params =>
      run(params)
    } getOrElse {
      System.exit(1)
    }
  }

  def run(params: Params) {
    val conf = new SparkConf().setAppName(s"MovieLensALS with $params").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")
    //如果引數設定了kryo序列化沒那麼需要註冊序列化的類和配置序列化的快取,模板照著寫就是了
    //使用序列化是為傳輸的時候速度更快,我沒有使用這個,因為反序列話也需要一定的時間,我是區域網搭建spark叢集的(機子之間很快)。
    // 如果是在雲搭建叢集可以考慮使用
    if (params.kryo) {
      conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
        .set("spark.kryoserializer.buffer", "8m")
    }
    val sc = new SparkContext(conf)

    //設定log基本,生產也建議使用WARN
    Logger.getRootLogger.setLevel(Level.WARN)

    //得到因此的級別
    val implicitPrefs = params.implicitPrefs

    //讀取資料,並通過是否設定了分值閾值來修正評分
    //官方推薦是,只有哦大於3級別的時候才值得推薦
    //且下面的程式碼,implicitPrefs,直接就是預設5 Must see,按道理會根據自己對分數閾值的預估,rating減去相應的值,比如fields(2).toDouble - 2.5
    //5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5
    //現在是5分值的對映關係,如果是其他分值的對映關係有該怎麼做?還不確定,個人建議別使用這個了。
    //經過下面程式碼推斷出,如果implicitPrefs=true或者flase,true的意思是,預測的分數要大於2.5(自己設定),才能推薦給使用者,小了,沒有意義
    //它引入implicitPrefs的整體含義為,只有使用者對物品的滿意達到一定的值,才推薦,不然推薦不喜歡的沒有意思,所以在構建樣本的時候,會減去相應的值fields(2).toDouble - 2.5(自己設定)
    //這種理論是可以的,但是還有一個理論,不給使用者推薦比給使用者推薦錯了還要嚴重(有人提出過),不推薦產生的效果還要嚴重,還有反向推薦,
    //我把implicitPrefs叫做分值閾值
    val ratings = sc.textFile(params.input).map { line =>
      val fields = line.split("::")
      if (implicitPrefs) {
        /*
         * MovieLens ratings are on a scale of 1-5:
         * 5: Must see
         * 4: Will enjoy
         * 3: It's okay
         * 2: Fairly bad
         * 1: Awful
         * So we should not recommend a movie if the predicted rating is less than 3.
         * To map ratings to confidence scores, we use
         * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved
         * entries are generally between It's okay and Fairly bad.
         * The semantics of 0 in this expanded world of non-positive weights
         * are "the same as never having interacted at all".
         */
        Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
      } else {
        Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
      }
    }.cache()

    //計算一共有多少樣本數
    val numRatings = ratings.count()
    //計算一共有多少使用者
    val numUsers = ratings.map(_.user).distinct().count()
    //計算應該有多少物品
    val numMovies = ratings.map(_.product).distinct().count()

    println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")

    //按80%訓練,20%驗證分割樣本
    val splits = ratings.randomSplit(Array(0.8, 0.2))

    //把訓練樣本快取起來,加快運算速度
    val training = splits(0).cache()

    //構建測試樣,我先翻譯下他說的英文哈。
    //分值為0表示,我對物品的評分不知道,一個積極有意義的評分表示:有信心預測值為1
    //一個消極的評分表示:有信心預測值為0
    //在這個案列中,我們使用的加權的RMSE,這個權重為自信的絕對值(命中就為1,否則為0)
    //關於誤差,在預測和1,0之間是不一樣的,取決於r 是正,還是負
    //這裡splits已經減了分值閾值了,所以>0 =1 else 0的含義是,1表示分值是大於分值閾值的,這裡是大於2.5,0表示小於2.5
    val test = if (params.implicitPrefs) {
      /*
       * 0 means "don't know" and positive values mean "confident that the prediction should be 1".
       * Negative values means "confident that the prediction should be 0".
       * We have in this case used some kind of weighted RMSE. The weight is the absolute value of
       * the confidence. The error is the difference between prediction and either 1 or 0,
       * depending on whether r is positive or negative.
       */
      splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0))
    } else {
      splits(1)
    }.cache()

    //訓練樣本量和測試樣本量
    val numTraining = training.count()
    val numTest = test.count()
    println(s"Training: $numTraining, test: $numTest.")

    //這裡應為不適用ratings了,釋放掉它佔的記憶體
    ratings.unpersist(blocking = false)

    //setRank設定隨機因子,就是隱藏的屬性
    //setIterations設定最大迭代次數
    //setLambda設定正則化引數
    //setImplicitPrefs 是否開啟分值閾值
    //setUserBlocks設定使用者的塊數量,並行化計算,當特別大的時候需要設定
    //setProductBlocks設定物品的塊數量
    val model = new ALS()
      .setRank(params.rank)
      .setIterations(params.numIterations)
      .setLambda(params.lambda)
      .setImplicitPrefs(params.implicitPrefs)
      .setUserBlocks(params.numUserBlocks)
      .setProductBlocks(params.numProductBlocks)
      .run(training)

    //訓練的樣本和測試的樣本的分值全部是減了2.5分的
    //測試樣本的分值如果大於0為1,else 0,表示分值大於2.5才預測為Ok

    //計算rmse
    val rmse = computeRmse(model, test, params.implicitPrefs)

    println(s"Test RMSE = $rmse.")

    //儲存模型,模型儲存路勁為
    model.save(sc,params.output)
    println("模型儲存成功,儲存路勁為:"+params.output)

    sc.stop()
  }

  /** Compute RMSE (Root Mean Squared Error). */
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
  : Double = {

    //內部方法含義如下
    // 如果已經開啟了implicitPref那麼,預測的分值大於0的為1,小於0的為0,沒有開啟的話,就是用原始分值
    //min(r,1.0)求預測分值和1.0那個小,求小值,然後max(x,0.0)求大值, 意思就是把預測分值大於0的為1,小於0 的為0
    //這樣構建之後預測的預測值和測試樣本的樣本分值才一直,才能進行加權rmse計算
    def mapPredictedRating(r: Double): Double = {
      if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
    }

    //根據模型預測,使用者對物品的分值,predict的引數為RDD[(Int, Int)]
    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))

    //mapPredictedRating把預測的分值對映為1或者0
    //join連線原始的分數,連線的key為x.user, x.product
    //values方法表示只保留預測值,真實值
    val predictionsAndRatings = predictions.map{ x =>
      ((x.user, x.product), mapPredictedRating(x.rating))
    }.join(data.map(x => ((x.user, x.product), x.rating))).values

    //最後計算預測與真實值的平均誤差平方和
    //這是先每個的平方求出來,然後再求平均值,最後開方
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
  }
}

3找到最優(可能最優哈)引數

package org.wq.scala.ml

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2016/10/24.
  */
object ALSRecommendMllibBestParamTest {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ALS_mllib_best_param").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")
    val sc = new SparkContext(conf)
    //設定log基本,生產也建議使用WARN
    Logger.getRootLogger.setLevel(Level.WARN)

    //第一步構建time,Rating
    val movie = sc.textFile("data/mllib/sample_movielens_ratings.txt")
    val ratings = movie.map(line=>{
      val fields = line.split("::")
      val rating  = Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
      val timestamp =fields(3).toLong%5
      (timestamp,rating)

    })

    //輸出資料的基本資訊
    val numRatings  = ratings.count()
    val numUser  = ratings.map(_._2.user).distinct().count()
    val numItems = ratings.map(_._2.product).distinct().count()
    println("樣本基本資訊為:")
    println("樣本數:"+numRatings)
    println("使用者數:"+numUser)
    println("物品數:"+numItems)


    val sp = ratings.randomSplit(Array(0.6,0.2,0.2))
    //第二步驟
    //使用日期把資料分為訓練集(timestamp<6),驗證集(6<timestamp<8)和測試集(timestamp>8/* val training = ratings.filter(x=>x._1<6).values.repartition(2).cache()
    val validation = ratings.filter(x=>x._1>6 && x._1<8).values.repartition(2).cache()
    val test=ratings.filter(x=>x._1>=8).values.cache()*/
    //樣本時間引數都一樣,測試就使用隨機算了
    val training=sp(0).map(x=>Rating(x._2.user,x._2.product,x._2.rating)).repartition(2).cache()
    val validation=sp(1).map(x=>Rating(x._2.user,x._2.product,x._2.rating)).repartition(2).cache()
    val test=sp(1).map(x=>Rating(x._2.user,x._2.product,x._2.rating))

    val numTraining = training.count()
    val numValidation=validation.count()
    val numTest=test.count()

    println("驗證樣本基本資訊為:")
    println("訓練樣本數:"+numTraining)
    println("驗證樣本數:"+numValidation)
    println("測試樣本數:"+numTest)


  //第三步
   //定義RMSE方法
    def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating]):Double={
      val predictions:RDD[Rating]=model.predict(data.map(x=>(x.user,x.product)))
      val predictionAndRatings = predictions.map(x=>{((x.user,x.product),x.rating)}).join(data.map(x=>((x.user,x.product),x.rating))).values
      math.sqrt(predictionAndRatings.map(x=>(x._1-x._2)*(x._1-x._2)).mean())
    }

    //第四步驟,使用不同的引數訓練模型,並且選擇RMSE最小的模型,規定引數的範圍
    //隱藏因子數:8或者12
    //正則化係數,0.01或者0.1選擇,迭代次數為10或者20,訓練8個模型
    val ranks = List(8,12)
    val lambdas = List(0.01,0.1)
    val numiters = List(10,20)
    var bestModel:Option[MatrixFactorizationModel]=None
    var bestValidationRmse=Double.MaxValue
    var bestRank=0
    var bestLamdba = -1.0
    var bestNumIter=1
    for(rank<-ranks;lambda<-lambdas;numiter<-numiters){
      println(rank+"-->"+lambda+"-->"+numiter)
      val model = ALS.train(training,rank,numiter,lambda)
      val valadationRmse=computeRmse(model,validation)
      if(valadationRmse<bestValidationRmse){
        bestModel=Some(model)
        bestValidationRmse=valadationRmse
        bestRank=rank
        bestLamdba=lambda
        bestNumIter=numiter
      }
    }

    val testRmse = computeRmse(bestModel.get,test)
    println("測試資料的rmse為:"+testRmse)
    println("範圍內的最後模型引數為:")
    println("隱藏因子數:"+bestRank)
    println("正則化引數:"+bestLamdba)
    println("迭代次數:"+bestNumIter)
//步驟5可以對比使用協同過濾和不適用協同過濾(使用平均分來做預測結果)能提升多大的預測效果。

//計算訓練樣本和驗證樣本的平均分數
val meanR = training.union(validation).map(x=>x.rating).mean()

//這就是使用平均分做預測,test樣本的rmse
val baseRmse=math.sqrt(test.map(x=>(meanR-x.rating)*(meanR-x.rating)).mean())

val improvement =(baseRmse-testRmse)/baseRmse*100

println("使用了ALS協同過濾演算法比使用評價分作為預測的提升度為:"+improvement)

  }
}

4使用ALS模型進行預測

package org.wq.scala.ml

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2016/10/25.
  */
object ALSModelTopn {

  def main(args: Array[String]): Unit = {

    //給使用者推薦
    val conf = new SparkConf().setAppName("ALS_mllib_best_param").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)
    val movie = sc.textFile("data/mllib/sample_movielens_ratings.txt")
    val ratings = movie.map(line=>{
      val fields = line.split("::")
      val rating  = Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
      val timestamp =fields(3).toLong%5
      (rating)

    })

   val model=  MatrixFactorizationModel.load(sc,"data/mllib/t")

    //選擇一個使用者
    val user=5
    val myRating = ratings.filter(x=>x.user==5)
    //該使用者已經消費了的物品
    val myRateItem = myRating.map(x=>x.product).collect().toSet

    //給使用者5推薦前評分前10的物品
    val recommendations = model.recommendProducts(user,10)
    recommendations.map(x=>{
      println(x.user+"-->"+x.product+"-->"+x.rating)
    })

  }
}

提交部署

1提交尋找最優引數的jar

提交部署求最優引數的那個jar,這就把最優引數簡單的打印出來,如果要週期的自迭代更新引數的話,就寫在資料庫或者配置檔案中,當訓練的時候,就從資料庫或者配置檔案讀。
首先需要把上面的第三個程式修改一下,修改如下,因為要提交給叢集嘛,所以不能指定master為local了,引數從命令列傳入。把jar上傳到master節點的目錄下,data需要上傳到所有的slaves.
if(args.length!=1){
println(“請輸入1個引數 購物籃資料路徑”)
System.exit(0)
}
val conf = new SparkConf().setAppName(“ALS_mllib_best_param”)
以後所有的提交都需要修改conf的,以後就不說了
jar與資料目錄如下:
這裡寫圖片描述
這裡寫圖片描述
資料長下面這個樣子,使用者id,物品id,評分,時間戳,使用者id和物品id必須是整型,如果你的不是,那麼必須進行一次對映:
這裡寫圖片描述

把資料傳到slave節點
scp sample_movielens_ratings.txt spark@slave1:/home/jar/data/
scp sample_movielens_ratings.txt spark@slave2:/home/jar/data/

提交job
spark-submit –class org.wq.scala.ml.ALSRecommendMllibBestParam –master spark://master:7077 –executor-memory 700m –num-executors 1 /home/jar/ALSRecommendMllibBestParam.jar /home/jar/data/sample_movielens_ratings.txt
執行結果如下:
這裡寫圖片描述

2把求得的最好引數帶入mllib寫的演算法中,訓練形成模型

提交Job
spark-submit –class org.wq.scala.ml.ALSRecommendMllib –master spark://master:7077 –executor-memory 700m –num-executors 1 /home/jar/ALSRecommendMllib.jar –rank 8 –numIterations 10 –lambda 0.1 /home/jar/data/sample_movielens_ratings.txt /home/jar/model/AlsModel
悲劇的報錯了
這裡寫圖片描述
這個錯誤很明顯是缺少包spark-examples_2.11-2.0.0.jar,這個包在example目錄下的。
兩個種解決方法:
1修改/etc/profile,把example/jars加入classpath.
2把jar複製到目錄sparkhome/jarsspark_home/jars這個目錄在環境變數中,這裡採用第二種.

修改之後的執行結果為:
這裡寫圖片描述

3呼叫模型,得出推薦

到這裡模型就訓練好了,這個模型可以定時訓練,crontab就可以實現,訓練好的模型,使用使用者資料預測分數。
就不提交到叢集運行了,因為這是demo而已,真實應該為提供介面,別人來呼叫
總結:
1矩陣分解的幾種演算法
2spark使用矩陣分解的幾種方式,1ml 包中使用,2mllib包中的使用,其實有不呼叫包自己寫的案列(可以去看看哈,就在example目錄)
3使用ALS做推薦的一個比較詳細的流程:1自迭代確定比較優的引數是,2使用引數訓練模型,3使用模型推薦topn的物品給使用者
4講了怎麼自迭代ALS演算法引數,感覺這個還重要點
5提交spark的報了一個錯誤,已經錯誤解決方式
6好多細節都沒寫,感覺要寫的有好多,也不是很完善,時間不夠,只是提供了核心程式碼和思路

疑問:在做的過程中,我發現spark的job檢視,只有在job執行的時候才可以檢視,其他時候不行
http://192.168.247.132:4040/jobs/
這個應該是可以隨時檢視的,應該是spark的日誌和檢視jobs的服務要一直開啟才行,希望對spark叢集熟悉的人求解,跪謝

相關推薦

基於Spark的Als演算法++Spark2.0寫法

主要介紹了一下幾點: 1矩陣分解的幾種演算法 2spark使用矩陣分解的幾種方式,1ml 包中使用,2mllib包中的使用,其實有不呼叫包自己寫的案列(可以去看看哈,就在example目錄) 3使用ALS做推薦的一個比較詳細的流程:1自迭代確定比較優的引數

演算法和遞迴

在計算機程式設計實現中有常常兩種方法: 一為迭代(iterate);二為遞迴(recursion)。 一、概念區分 迭代:利用已知的變數值,根據遞推公式不斷演進得到變數新值得程式設計思想。 遞迴:是指程式呼叫自身的程式設計思想,即一個函式呼叫本身 如果遞迴是自己呼叫

C++沉思錄__演算法__器__資料結構

書中弟18章總結中有這麼一段話: 所謂的泛型演算法,就是這樣的演算法,對於所操作的資料結構的細節資訊,只加入最低限度的理解。當然,這是理想情況,實際上是做不到的,作為這樣一種折中。STL根據資料結構能夠支援的有效操作,將這些資料結構進行分類。然後,對於每一個演算法,指出這個演算法所需要的資料結構

圖及演算法----遍歷演算法實現)

1. 圖的遍歷 2. 3.   class Graph: def __init__(self): self.graph: Dict[str, List[str]] = defaultdict(list) def addEdge(self,

演算法和遞迴

在日常程式的編寫中,複雜的專案日益增多,在後期的程式碼優化上需要花更多的時間和精力。在前期的規劃上也越來越重要,前期良好的規劃可以避免後期遇到些奇怪的問題。 這次部落格我希望通過講解下迭代和遞迴的具體應用場景,來表達寫程式是前期規劃的作用和解決具體問題所需要的方法。

Dijkstra演算法-(迪傑斯特拉)演算法實現與優先佇列實現 圖解演算法過程

Dijkstra演算法-(迪傑斯特拉)演算法之迭代實現 Dijkstra演算法-(迪傑斯特拉)演算法之優先佇列實現 該演算法的核心原理,很簡單,如下圖所示: 先說說Dijkstra演算法-(迪傑斯特拉)演算法之迭代實現,如下圖為詳細步驟, 程式碼如下,兩種實現方法

八皇后問題:基於python生成器的實現

>>>list(queens()) [(0, 4, 7, 5, 2, 6, 1, 3), (0, 5, 7, 2, 6, 3, 1, 4), (0, 6, 3, 5, 7, 1, 4, 2), (0, 6, 4, 7, 1, 3, 5, 2), (1, 3, 5, 7, 2, 0, 6, 4

高階搜尋演算法加深

# 前言 最開始搞 $OI$ 的時候接觸了搜尋演算法,後面基本上沒有在練過了。若本文有誤,請在討論區指出。 [本文例題連結](https://www.luogu.com.cn/problem/UVA529) # 思想 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2021

優羅鏈EULO:超級節點模式

優羅鏈EULO:迭代超級節點新模式 區塊鏈的火熱引發了人們對於區塊鏈技術的各種關注。從開始的共識機制到TPS到後來的零知識演算法。關於區塊鏈的各大技術突破。業內總在不斷的提出最新模式。 今年6月起,一個新的名詞“超級節點”引起了圈內人士的關注。最早是EOS依靠“社群超級節點競選”贏得

器刪除操作寫法及解釋

1 迭代器是什麼? 迭代器是一種典型的設計模式,與集合配套使用,其目的是隱藏集合中的內部成員,並且提供對集合成員的訪問能力。其結構如下圖所示; 具體協作關係及實現方式,就不在此贅述了。 2 在迭代器上執行刪除操作 下面以刪除list中所有給定值的元素為例,介

spark2.0 特性總結

新特性: 1,用sparksession實現hivecontext和sqlcontext統一 2,whole-stage code generation,大幅提高計算效能,因為把物理計劃變成硬編碼,每秒處理的sql中的資料量增加十倍,即對物理執行的多次呼叫轉化為程式碼for迴圈,蕾絲hard

基於Pytorch框架實現ENAS演算法優化的影象識別技術探索-α隨筆

設想和目標 1. 我們的軟體要解決什麼問題?是否定義得很清楚?是否對典型使用者和典型場景有清晰的描述? 我們希望通過將ENAS的網路架構優化演算法轉變為例項化專案,能夠在有一定實際意義下解決對於Pytorch影象識別的探索問題。 專案性質為科研專案,由於是依託演算法研究產生產品,故對於

Spark2.0機器學習系列之11: 聚類(冪聚類, power iteration clustering, PIC)

           在Spark2.0版本中(不是基於RDD API的MLlib),共有四種聚類方法:             (1)K-means             (2)Latent Dirichlet allocation (LDA)

基於模型融合的推薦系統實現(2):式SVD分解

SVD演算法的原理網路上也有很多,不再細說了,關鍵是我們得到的資料是不完整的資料,所以要算SVD就必須做一次矩陣補全。補全的方式有很多,這裡推薦使用均值補全的方法(用每一行均值和每一列均值的平均來代替空白處),然後可以計算SVD,作PCA分析,然後就可以得到預測結果。 但是我們這裡有

python 法和遞迴 實現斐波那契演算法

題目:古典問題:有一對兔子,從出生後第3個月起每個月都生一對兔子,小兔子長到第三個月後每個月又生一對兔子,假如兔子都不死,問每個月的兔子總數為多少? 1.程式分析: 兔子的規律為數列1,1,2,3,5,8,13,21…. 由規律可知: f(n) = f(n-1)+f(n-2) 符合斐波那契數

基於matlab的Guass-Seidel(高斯--賽德爾) 法求解線性方程組

演算法解釋見此:https://blog.csdn.net/zengxyuyu/article/details/53056453   原始碼在此: main.m clear clc A = [8 -3 2;4 11 -1;6 3 12]; b = [20;33;36]; [

基於matlab的jacobi(雅可比)法求解線性方程組

說明推導見此部落格:https://blog.csdn.net/zengxyuyu/article/details/53054880 原始碼見下面: main.m clear clc A = [8 -3 2;4 11 -1;6 3 12]; b = [20;33;36]; [x, n]

權重最小二乘演算法

迭代權重最小二乘(Iteratively reweighted least squares, IRLS) [1] 方法用於求解\(p\)範數(\(p\) norm)的最小化問題。問題如下: \[\arg \min_{x} \sum_{i} | y_i - f_i (x) |^p\] 通過迭代的方法,在每次迭代

使用器做定義篩選

1 /* 2 IEnumerator 介面 3 支援對非泛型集合的簡單迭代。 4 5 IEnumerator 是所有非泛型列舉數的基介面。 6 7 8 C# 語言的 foreach 語句(在 Visual Basic 中為 for each)隱藏了列舉數的複雜性

實戰c++中的vector系列--vector的遍歷(stl演算法、vector器(不要在迴圈中判斷不等於end())、operator[])【轉】

(轉自:https://blog.csdn.net/wangshubo1989/article/details/50374914?utm_source=blogxgwz29) 遍歷一個vector容器有很多種方法,使用起來也是仁者見仁。 通過索引遍歷: for (i = 0; i<