1. 程式人生 > >Spark 實戰(四)

Spark 實戰(四)

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.mllib.recommendation

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.ml.recommendation.{ALS => NewALS}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
 * A more compact class to represent a rating than Tuple3[Int, Int, Double].
 */
@Since("0.8.0")
case class Rating @Since("0.8.0") (
    @Since("0.8.0") user: Int,
    @Since("0.8.0") product: Int,
    @Since("0.8.0") rating: Double)

/**
 * Alternating Least Squares matrix factorization.
 *
 * ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
 * `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices.
 * The general approach is iterative. During each iteration, one of the factor matrices is held
 * constant, while the other is solved for using least squares. The newly-solved factor matrix is
 * then held constant while solving for the other factor matrix.
 *
 * This is a blocked implementation of the ALS factorization algorithm that groups the two sets
 * of factors (referred to as "users" and "products") into blocks and reduces communication by only
 * sending one copy of each user vector to each product block on each iteration, and only for the
 * product blocks that need that user's feature vector. This is achieved by precomputing some
 * information about the ratings matrix to determine the "out-links" of each user (which blocks of
 * products it will contribute to) and "in-link" information for each product (which of the feature
 * vectors it receives from each user block it will depend on). This allows us to send only an
 * array of feature vectors between each user block and product block, and have the product block
 * find the users' ratings and update the products based on these messages.
 *
 * For implicit preference data, the algorithm used is based on
 * "Collaborative Filtering for Implicit Feedback Datasets", available at
 * <a href="http://dx.doi.org/10.1109/ICDM.2008.22">here</a>, adapted for the blocked approach
 * used here.
 *
 * Essentially instead of finding the low-rank approximations to the rating matrix `R`,
 * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if
 * r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of
 * indicated user
 * preferences rather than explicit ratings given to items.
 */
@Since("0.8.0")
class ALS private (
    private var numUserBlocks: Int,
    private var numProductBlocks: Int,
    private var rank: Int,
    private var iterations: Int,
    private var lambda: Double,
    private var implicitPrefs: Boolean,
    private var alpha: Double,
    private var seed: Long = System.nanoTime()
  ) extends Serializable with Logging {

  /**
   * Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10,
   * lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
   */
  @Since("0.8.0")
  def this() = this(-1, -1, 10, 10, 0.01, false, 1.0)

  /** If true, do alternating nonnegative least squares. */
  private var nonnegative = false

  /** storage level for user/product in/out links */
  private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
  private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK

  /** checkpoint interval */
  private var checkpointInterval: Int = 10

  /**
   * Set the number of blocks for both user blocks and product blocks to parallelize the computation
   * into; pass -1 for an auto-configured number of blocks. Default: -1.
   */
  @Since("0.8.0")
  def setBlocks(numBlocks: Int): this.type = {
    require(numBlocks == -1 || numBlocks > 0,
      s"Number of blocks must be -1 or positive but got ${numBlocks}")
    this.numUserBlocks = numBlocks
    this.numProductBlocks = numBlocks
    this
  }

  /**
   * Set the number of user blocks to parallelize the computation.
   */
  @Since("1.1.0")
  def setUserBlocks(numUserBlocks: Int): this.type = {
    require(numUserBlocks == -1 || numUserBlocks > 0,
      s"Number of blocks must be -1 or positive but got ${numUserBlocks}")
    this.numUserBlocks = numUserBlocks
    this
  }

  /**
   * Set the number of product blocks to parallelize the computation.
   */
  @Since("1.1.0")
  def setProductBlocks(numProductBlocks: Int): this.type = {
    require(numProductBlocks == -1 || numProductBlocks > 0,
      s"Number of product blocks must be -1 or positive but got ${numProductBlocks}")
    this.numProductBlocks = numProductBlocks
    this
  }

  /** Set the rank of the feature matrices computed (number of features). Default: 10. */
  @Since("0.8.0")
  def setRank(rank: Int): this.type = {
    require(rank > 0,
      s"Rank of the feature matrices must be positive but got ${rank}")
    this.rank = rank
    this
  }

  /** Set the number of iterations to run. Default: 10. */
  @Since("0.8.0")
  def setIterations(iterations: Int): this.type = {
    require(iterations >= 0,
      s"Number of iterations must be nonnegative but got ${iterations}")
    this.iterations = iterations
    this
  }

  /** Set the regularization parameter, lambda. Default: 0.01. */
  @Since("0.8.0")
  def setLambda(lambda: Double): this.type = {
    require(lambda >= 0.0,
      s"Regularization parameter must be nonnegative but got ${lambda}")
    this.lambda = lambda
    this
  }

  /** Sets whether to use implicit preference. Default: false. */
  @Since("0.8.1")
  def setImplicitPrefs(implicitPrefs: Boolean): this.type = {
    this.implicitPrefs = implicitPrefs
    this
  }

  /**
   * Sets the constant used in computing confidence in implicit ALS. Default: 1.0.
   */
  @Since("0.8.1")
  def setAlpha(alpha: Double): this.type = {
    this.alpha = alpha
    this
  }

  /** Sets a random seed to have deterministic results. */
  @Since("1.0.0")
  def setSeed(seed: Long): this.type = {
    this.seed = seed
    this
  }

  /**
   * Set whether the least-squares problems solved at each iteration should have
   * nonnegativity constraints.
   */
  @Since("1.1.0")
  def setNonnegative(b: Boolean): this.type = {
    this.nonnegative = b
    this
  }

  /**
   * :: DeveloperApi ::
   * Sets storage level for intermediate RDDs (user/product in/out links). The default value is
   * `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g., `MEMORY_AND_DISK_SER` and
   * set `spark.rdd.compress` to `true` to reduce the space requirement, at the cost of speed.
   */
  @DeveloperApi
  @Since("1.1.0")
  def setIntermediateRDDStorageLevel(storageLevel: StorageLevel): this.type = {
    require(storageLevel != StorageLevel.NONE,
      "ALS is not designed to run without persisting intermediate RDDs.")
    this.intermediateRDDStorageLevel = storageLevel
    this
  }

  /**
   * :: DeveloperApi ::
   * Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default
   * value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g.
   * `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement,
   * at the cost of speed.
   */
  @DeveloperApi
  @Since("1.3.0")
  def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = {
    this.finalRDDStorageLevel = storageLevel
    this
  }

  /**
   * :: DeveloperApi ::
   * Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with
   * recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps
   * with eliminating temporary shuffle files on disk, which can be important when there are many
   * ALS iterations. If the checkpoint directory is not set in [[org.apache.spark.SparkContext]],
   * this setting is ignored.
   */
  @DeveloperApi
  @Since("1.4.0")
  def setCheckpointInterval(checkpointInterval: Int): this.type = {
    this.checkpointInterval = checkpointInterval
    this
  }

  /**
   * Run ALS with the configured parameters on an input RDD of [[Rating]] objects.
   * Returns a MatrixFactorizationModel with feature vectors for each user and product.
   */
  @Since("0.8.0")
  def run(ratings: RDD[Rating]): MatrixFactorizationModel = {
    val sc = ratings.context

    val numUserBlocks = if (this.numUserBlocks == -1) {
      math.max(sc.defaultParallelism, ratings.partitions.length / 2)
    } else {
      this.numUserBlocks
    }
    val numProductBlocks = if (this.numProductBlocks == -1) {
      math.max(sc.defaultParallelism, ratings.partitions.length / 2)
    } else {
      this.numProductBlocks
    }

    /** 採用交替最小二乘求解使用者和物品的特徵矩陣 */
    val (floatUserFactors, floatProdFactors) = NewALS.train[Int](
      ratings = ratings.map(r => NewALS.Rating(r.user, r.product, r.rating.toFloat)),
      rank = rank,
      numUserBlocks = numUserBlocks,
      numItemBlocks = numProductBlocks,
      maxIter = iterations,
      regParam = lambda,
      implicitPrefs = implicitPrefs,
      alpha = alpha,
      nonnegative = nonnegative,
      intermediateRDDStorageLevel = intermediateRDDStorageLevel,
      finalRDDStorageLevel = StorageLevel.NONE,
      checkpointInterval = checkpointInterval,
      seed = seed)

    val userFactors = floatUserFactors
      .mapValues(_.map(_.toDouble))
      .setName("users")
      .persist(finalRDDStorageLevel)
    val prodFactors = floatProdFactors
      .mapValues(_.map(_.toDouble))
      .setName("products")
      .persist(finalRDDStorageLevel)
    if (finalRDDStorageLevel != StorageLevel.NONE) {
      userFactors.count()
      prodFactors.count()
    }
    new MatrixFactorizationModel(rank, userFactors, prodFactors)
  }

  /**
   * Java-friendly version of `ALS.run`.
   */
  @Since("1.3.0")
  def run(ratings: JavaRDD[Rating]): MatrixFactorizationModel = run(ratings.rdd)
}

/**
 * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
 */
@Since("0.8.0")
object ALS {
  /**
   * Train a matrix factorization model given an RDD of ratings by users for a subset of products.
   * The ratings matrix is approximated as the product of two lower-rank matrices of a given rank
   * (number of features). To solve for these features, ALS is run iteratively with a configurable
   * level of parallelism.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   * @param blocks     level of parallelism to split computation into
   * @param seed       random seed for initial matrix factorization model
   */
  @Since("0.9.1")
  def train(
      ratings: RDD[Rating],
      rank: Int,
      iterations: Int,
      lambda: Double,
      blocks: Int,
      seed: Long
    ): MatrixFactorizationModel = {
    new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings)
  }

  /**
   * Train a matrix factorization model given an RDD of ratings by users for a subset of products.
   * The ratings matrix is approximated as the product of two lower-rank matrices of a given rank
   * (number of features). To solve for these features, ALS is run iteratively with a configurable
   * level of parallelism.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   * @param blocks     level of parallelism to split computation into
   */
  @Since("0.8.0")
  def train(
      ratings: RDD[Rating],
      rank: Int,
      iterations: Int,
      lambda: Double,
      blocks: Int
    ): MatrixFactorizationModel = {
    new ALS(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings)
  }

  /**
   * Train a matrix factorization model given an RDD of ratings by users for a subset of products.
   * The ratings matrix is approximated as the product of two lower-rank matrices of a given rank
   * (number of features). To solve for these features, ALS is run iteratively with a level of
   * parallelism automatically based on the number of partitions in `ratings`.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   */
  @Since("0.8.0")
  def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)
    : MatrixFactorizationModel = {
    train(ratings, rank, iterations, lambda, -1)
  }

  /**
   * Train a matrix factorization model given an RDD of ratings by users for a subset of products.
   * The ratings matrix is approximated as the product of two lower-rank matrices of a given rank
   * (number of features). To solve for these features, ALS is run iteratively with a level of
   * parallelism automatically based on the number of partitions in `ratings`.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   */
  @Since("0.8.0")
  def train(ratings: RDD[Rating], rank: Int, iterations: Int)
    : MatrixFactorizationModel = {
    train(ratings, rank, iterations, 0.01, -1)
  }

  /**
   * Train a matrix factorization model given an RDD of 'implicit preferences' given by users
   * to some products, in the form of (userID, productID, preference) pairs. We approximate the
   * ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
   * To solve for these features, we run a given number of iterations of ALS. This is done using
   * a level of parallelism given by `blocks`.
   *
   * @param ratings    RDD of (userID, productID, rating) pairs
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   * @param blocks     level of parallelism to split computation into
   * @param alpha      confidence parameter
   * @param seed       random seed for initial matrix factorization model
   */
  @Since("0.8.1")
  def trainImplicit(
      ratings: RDD[Rating],
      rank: Int,
      iterations: Int,
      lambda: Double,
      blocks: Int,
      alpha: Double,
      seed: Long
    ): MatrixFactorizationModel = {
    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings)
  }

  /**
   * Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
   * subset of products. The ratings matrix is approximated as the product of two lower-rank
   * matrices of a given rank (number of features). To solve for these features, ALS is run
   * iteratively with a configurable level of parallelism.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   * @param blocks     level of parallelism to split computation into
   * @param alpha      confidence parameter
   */
  @Since("0.8.1")
  def trainImplicit(
      ratings: RDD[Rating],
      rank: Int,
      iterations: Int,
      lambda: Double,
      blocks: Int,
      alpha: Double
    ): MatrixFactorizationModel = {
    new ALS(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings)
  }

  /**
   * Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
   * subset of products. The ratings matrix is approximated as the product of two lower-rank
   * matrices of a given rank (number of features). To solve for these features, ALS is run
   * iteratively with a level of parallelism determined automatically based on the number of
   * partitions in `ratings`.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   * @param lambda     regularization parameter
   * @param alpha      confidence parameter
   */
  @Since("0.8.1")
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
    : MatrixFactorizationModel = {
    trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
  }

  /**
   * Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
   * subset of products. The ratings matrix is approximated as the product of two lower-rank
   * matrices of a given rank (number of features). To solve for these features, ALS is run
   * iteratively with a level of parallelism determined automatically based on the number of
   * partitions in `ratings`.
   *
   * @param ratings    RDD of [[Rating]] objects with userID, productID, and rating
   * @param rank       number of features to use
   * @param iterations number of iterations of ALS
   */
  @Since("0.8.1")
  def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
    : MatrixFactorizationModel = {
    trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
  }
}

相關推薦

Spark 實戰

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this w

WEB安全實戰關於 Cookie

round url 主動 gin 加密 文章 日期 就會 dex 前言 這幾天中,一直再跟漏洞打交道,而在這些漏洞中,出現的最多的就是 Cookie 和 Session 了。這篇文章就簡單的介紹一些 Cookie 中最經常使用的四個屬性。也算是為興許的文章做一個

Spring Boot 揭秘與實戰 配置文件篇 - 有哪些很棒的特性

real randint 開發人員 hat mod 配置管理 bsp footer tar 文章目錄 1. 使用屬性文件2. YAML文件 1.1. 自定義屬性 1.2. 參數引用 1.3. 隨機數屬性 1.4. application-{profile}.proper

python機器學習實戰

畫畫 import 測試數據 trac 1+n read dex 缺失值 類型 python機器學習實戰(四) 版權聲明:本文為博主原創文章,轉載請指明轉載地址

[讀書筆記] R語言實戰 基本數據管理

mean 圖片 數值 函數 nbsp 一個 img order 分享 1. 創建新的變量 mydata<-data.frame(x1=c(2,2,6,4),x2=c(3,4,2,8)) #方法一 mydata$sumx<-mydata$x1+mydat

Spring實戰Spring高級裝配中的bean profile

優先 contex 如何 文件中 web.xml 多個 定義 blog 軟件   profile的原意為輪廓、剖面等,軟件開發中可以譯為“配置”。   在3.1版本中,Spring引入了bean profile的功能。要使用profile,首先要將所有不同的bean定義整理

Docker從入門到實戰

Docker 虛擬化 一步一步走,寫小白都能看懂的文章,將持續更新中,敬請期待! Docker從入門到實戰(四) Docker基礎 一:Docker基本操作 一般情況安裝Docker之後系統會自動創建一個Docker的用戶組,如果沒有創建可以手動創建groupadd docker把當前非root用戶加

MySQL數據庫從入門到實戰

字符集 數據類型 主鍵 元數據 ——————————————————————第一部分:字符集——————————————————————show charset; ---查看mysql支持字符集1、服務器端(1)實例級別vim my.cnfcharacter-set-server=utf8

Android項目實戰:ViewPager切換動畫3.0版本以上有效果

技術 code info utf-8 play draw pos support addview 原文:Android項目實戰(四):ViewPager切換動畫(3.0版本以上有效果)學習內容來自“慕課網” 一般APP進去之後都會有幾張圖片來導航,

Spark實戰SparkStreaming集成Kafka

round 形式 寫入 some base cal 接下來 會話 支持 Spark Streaming + Kafka集成指南 Kafka項目在版本0.8和0.10之間引入了一個新的消費者API,因此有兩個獨立的相應Spark Streaming包可用。請選擇正確的包,

nginx實戰反向代理配置緩存及負載均衡

cer zone ofo domain 針對 臨時文件 地址 ike ipv6 前言 反向代理,是指用戶通過同一服務器訪問服務器後端各被代理服務器的訪問方式。(詳見百度百科 https://baike.baidu.com/item/反向代理/7793488 )? nginx

springCloud分布式事務實戰分布式事務處理器的下載,編譯和運行

false cto instance alt 編譯 pat -o ins odin (1)下載分布式事務處理器工程源碼https://github.com/codingapi/tx-lcn/(2)導入eclipse (3)修改配置文件填寫分布式事務服務器地址,redis地址

微服務分散式事務實戰分散式事務處理器的下載,編譯和執行

分散式事務處理器的下載,編譯和執行 (1)下載分散式事務處理器工程原始碼 https://github.com/codingapi/tx-lcn/ (2)匯入eclipse (3)修改配置檔案 填寫分散式事務伺服器地址,redis地址 ,註冊中心地址 ################

機器學習實戰邏輯迴歸LRLogistic Regression

目錄 0. 前言 1. Sigmoid 函式 2. 梯度上升與梯度下降 3. 梯度下降法(Gradient descent) 4. 梯度上升法(Gradient ascent) 5. 梯度下降/上升法的數學推導

Docker入門實戰——基於jenkins部署微服務

一、部署前端Vue專案 第一步、編寫構建指令碼xxx-build.sh port=xxx #根據埠號查詢對應的pid pid=$(netstat -nlp | grep :$port | awk '{print $7}' | awk -F"/" '{ print $1 }'); #殺掉對應

Python實戰

一 實戰——讀寫檔案 1 ex16.py from sys import argv script, filename = argv print "We're going to erase %r." % filename print "If you don't want

Spark介紹SparkSQL

一、SparkSQL發展歷程 SparkSQL的前身是Shark, Shark是伯克利實驗室Spark生態環境的元件之一,它修改了下圖Hive所示的右下角的記憶體管理、物理計劃、執行三個模組,並使之能執行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升 2014年6

Docker最全教程——從理論到實戰

變量 參加 屬於 當我 web服務 隔離 方便 當前 context 往期內容鏈接 https://www.cnblogs.com/codelove/p/10030439.html https://www.cnblogs.com/codelove/p/10036608.

微服務Springcloud超詳細教程+實戰

如在文件中遇到什麼問題請聯絡作者 QQ:1172796094 本人正在找深圳Java實習工作,求大佬帶飛 —————————————————————————————————————— 初始SpringCloud 微服務是一種架構方式,最終肯定需要技術架構去實施。 微服務的實現方式

Unity ShaderLab開發實戰描邊

      之前可能在面剔除中提到過,面剔除可以用來實現描邊效果。(以下效果圖來自Unity3D ShaderLab開發實戰詳解)        原理:這是一個最簡單的描邊,使用面剔除:Cull指令,上圖中 ,