Spark MLib:梯度下降算法實現
阿新 • • 發佈:2018-03-31
測試結果 println tolerance eat print bre AC sim var
在監督學習模型中,需要對原始的模型構建損失函數J(θ), 接著就是最小化損失函數,用以求的最優參數θ
對損失函數θ進行求偏導,獲取每個θ的梯度
聲明:本文參考《 大數據:Spark mlib(三) GradientDescent梯度下降算法之Spark實現》
1. 什麽是梯度下降?
梯度下降法(英語:Gradient descent)是一個一階最優化算法,通常也稱為最速下降法。 要使用梯度下降法找到一個函數的局部極小值,必須向函數上當前點對應梯度(或者是近似梯度)的反方向的規定步長距離點進行叠代搜索。 先來看兩個函數: 1. 擬合函數:為參數向量,h(θ)就是通過參數向量計算的值,n為參數的總個數,j代表的是一條記錄裏的一個參數 2. 損失函數: m為訓練的集合數,i代表的是一條記錄,hθ(xi)代表的是第i條的h(θ)
,
2. 梯度下降的幾種方式
2.1 批量梯度下降(BGD)
在前面的方式,我們采樣部分數據,就稱為批量梯度下降 在公式:中我們會發現隨著計算θ的梯度下降,需要計算所有的采樣數據m,計算量會比較大。
2.2 隨機梯度下降 (SGD)
在上面2.1的批量梯度下降,采樣的是批量數據,那麽隨機采樣一個數據,進行θ梯度下降,就被稱為隨機梯度下降。
損失函數:
那麽單樣本的損失函數:m=1 的情況:
對單樣本的損失函數進行求偏導,計算梯度下降
為了控制梯度下降的速度,引入步長
3. Spark 實現的梯度下降
spark實現在mlib庫下org.apache.spark.mllib.optimization.GradientDescent類中
3.1 隨機梯度?
看函數名字叫做SGD,會以為是隨機梯度下降,實際上Spark裏實現的是隨機批量的梯度下降 我們去看梯度下降的批量算法公式: 這個公式可以拆分成兩部分- 計算數據的梯度
- 根據梯度計算新的權重
3.2 計算梯度
在前面的章節裏描述過隨機和批量的主要區別就是在計算梯度上,隨機采樣只是隨機采用單一樣本,而批量采樣如果采樣所有數據,涉及到采樣的樣本、計算量大的問題,Spark采用了擇中的策略,隨機采樣部分數據- 先隨機采樣部分數據
data.sample(false, miniBatchFraction, 42 + i)
- 對部分數據樣本進行聚合計算
treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) (c._1, c._2 + l, c._3 + 1) }, combOp = (c1, c2) => { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) })使用treeAggregate,而沒有使用Aggregate,是因為treeAggregate比aggregate更高效,combOp會在executor上執行 在聚合計算的seqOp裏我們看到了gradient.compute來計算梯度
3.2.1 Spark 提供的計算梯度的方式
- LeastSquaresGradient 梯度,主要用於線型回歸
- HingeGradient 梯度,用於SVM分類
- LogisticGradient 梯度,用於邏輯回歸
3.3 跟新權重theta θ
在梯度下降計算中,計算新的theta(也叫權重的更新),更新的算法由你采用的模型來決定
val update = updater.compute( weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)目前Spark默認提供了3種算法跟新theta
- SimpleUpdater
- L1Updater
- SquaredL2Updater
3.3.1 SimpleUpdater
以SimpleUpdater來說:SimpleUpdater extends Updater { override def compute( weightsOld: Vector, gradient: Vector, stepSize: Double, iter: Int, regParam: Double): (Vector, Double) = { val thisIterStepSize = stepSize / math.sqrt(iter) val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector brzAxpy(-thisIterStepSize, gradient.asBreeze, brzWeights) (Vectors.fromBreeze(brzWeights), 0) } }
也就是上面提到的公式:
相對來說simpleupdater算法比較簡單,在這裏沒有使用正則參數regParam,只是使用了每個叠代的步長作為相同的因子,計算每一個theta,也就是權重。 叠代的步長=總步長/math.sqrt(叠代的次數)3.3.2 其它的正則參數化算法
L1Updater: 正則化算法- 和SimpleUpdater一樣更新權重
- 將正則化參數乘以叠代步長的到比較參數:shrinkage
- 如果權重大於shrinkage,設置權重-shrinkage
- 如果權重小於-shrinkage,設置權重+shrinkage
- 其它的,設置權重為0
w‘ = w - thisIterStepSize * (gradient + regParam * w)
和SimpleUpdater比較,補償了regParam*w ,這也是邏輯回歸所采用的梯度下降算法的更新算法
4. 梯度下降收斂條件
如何判定梯度下降權重值收斂不在需要計算,通常會有兩個約束條件- 叠代次數,當達到一定的叠代次數後,權重的值會被收斂到極值點,並且不會受到次數的影響
- 筏值:當兩次叠代的權重之間的差小於指定的筏值的時候,就認為已經收斂
private def isConverged( previousWeights: Vector, currentWeights: Vector, convergenceTol: Double): Boolean = { // To compare with convergence tolerance. val previousBDV = previousWeights.asBreeze.toDenseVector val currentBDV = currentWeights.asBreeze.toDenseVector // This represents the difference of updated weights in the iteration. val solutionVecDiff: Double = norm(previousBDV - currentBDV) solutionVecDiff < convergenceTol * Math.max(norm(currentBDV), 1.0) }
當前後權重的差的L2,小於筏值*當前權重的L2和1的最大值,就認為下降結束。
5. Spark實現梯度下降的實現示例:
import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf} import org.apache.spark.mllib.linalg.{Vectors} import org.apache.spark.mllib.optimization._ object SGDExample { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.sql.broadcastTimeout", "10000") conf.set("fs.defaultFS", "hdfs://abccluster") val spark = SparkSession.builder().appName("hz_mlib").config(conf).enableHiveSupport().getOrCreate() /** * 這裏以簡單的y=3*x+1為例來簡單使用一下 * 測試數據就隨意 * 1 0 1 * 7 2 1 * 10 3 1 * 4 1 1 * 19 6 1 **/ val list = List[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]]( Tuple2(1d, Vectors.dense(0.0d, 1d)), Tuple2(7d, Vectors.dense(2.0d, 1d)), Tuple2(10d, Vectors.dense(3.0d, 1d)), Tuple2(4d, Vectors.dense(1.0d, 1d)), Tuple2(19d, Vectors.dense(6.0d, 1d)) ) val data: org.apache.spark.rdd.RDD[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]] = spark.sparkContext.parallelize(list) /** * 而具體的實現梯度有 * LogisticGradient * LeastSquaresGradient * HingeGradient * 對於更新也是三種實現 * SimpleUpdater * L1Updater * SquaredL2Updater **/ var gradient = new LeastSquaresGradient() var updater = new L1Updater() /** * GradientDescent parameters default initialize values: * private var stepSize: Double = 1.0 * private var numIterations: Int = 100 * private var regParam: Double = 0.0 * private var miniBatchFraction: Double = 1.0 * private var convergenceTol: Double = 0.001 */ var stepSize = 1.0 var numIterations = 100 var regParam: Double = 0.0 var miniBatchFraction = 1.0 var initialWeights: org.apache.spark.mllib.linalg.Vector = Vectors.dense(0d, 0d) var convergenceTol = 0.001 val (weights, _) = GradientDescent.runMiniBatchSGD( data: org.apache.spark.rdd.RDD[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]], gradient: org.apache.spark.mllib.optimization.Gradient, updater: org.apache.spark.mllib.optimization.Updater, stepSize: scala.Double, numIterations: scala.Int, regParam: scala.Double, miniBatchFraction: scala.Double, initialWeights: org.apache.spark.mllib.linalg.Vector, convergenceTol: scala.Double) println(weights) spark.stop() } }
輸出測試結果:
scala> import org.apache.spark.mllib.linalg.{Vectors} import org.apache.spark.mllib.linalg.Vectors scala> import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.optimization._ scala> /** | * 這裏以簡單的y=3*x+1為例來簡單使用一下 | * 測試數據就隨意 | * 1 0 1 | * 7 2 1 | * 10 3 1 | * 4 1 1 | * 19 6 1 | **/ | val list = List[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]]( | Tuple2(1d, Vectors.dense(0.0d, 1d)), | Tuple2(7d, Vectors.dense(2.0d, 1d)), | Tuple2(10d, Vectors.dense(3.0d, 1d)), | Tuple2(4d, Vectors.dense(1.0d, 1d)), | Tuple2(19d, Vectors.dense(6.0d, 1d)) | ) list: List[(Double, org.apache.spark.mllib.linalg.Vector)] = List((1.0,[0.0,1.0]), (7.0,[2.0,1.0]), (10.0,[3.0,1.0]), (4.0,[1.0,1.0]), (19.0,[6.0,1.0])) scala> scala> val data: org.apache.spark.rdd.RDD[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]] = spark.sparkContext.parallelize(list) data: org.apache.spark.rdd.RDD[(Double, org.apache.spark.mllib.linalg.Vector)] = ParallelCollectionRDD[11460] at parallelize at <console>:37 scala> scala> /** | * 而具體的實現梯度有 | * LogisticGradient | * LeastSquaresGradient | * HingeGradient | * 對於更新也是三種實現 | * SimpleUpdater | * L1Updater | * SquaredL2Updater | **/ | var gradient = new LeastSquaresGradient() gradient: org.apache.spark.mllib.optimization.LeastSquaresGradient = org.apache.spark.mllib.optimization.LeastSquaresGradient@7adb7d5b scala> var updater = new L1Updater() updater: org.apache.spark.mllib.optimization.L1Updater = org.apache.spark.mllib.optimization.L1Updater@33e6a825 scala> scala> /** | * GradientDescent parameters default initialize values: | * private var stepSize: Double = 1.0 | * private var numIterations: Int = 100 | * private var regParam: Double = 0.0 | * private var miniBatchFraction: Double = 1.0 | * private var convergenceTol: Double = 0.001 | */ | var stepSize = 1.0 stepSize: Double = 1.0 scala> var numIterations = 100 numIterations: Int = 100 scala> var regParam: Double = 0.0 regParam: Double = 0.0 scala> var miniBatchFraction = 1.0 miniBatchFraction: Double = 1.0 scala> var initialWeights: org.apache.spark.mllib.linalg.Vector = Vectors.dense(0d, 0d) initialWeights: org.apache.spark.mllib.linalg.Vector = [0.0,0.0] scala> var convergenceTol = 0.001 convergenceTol: Double = 0.001 scala> val (weights, _) = GradientDescent.runMiniBatchSGD( | data: org.apache.spark.rdd.RDD[scala.Tuple2[scala.Double, org.apache.spark.mllib.linalg.Vector]], | gradient: org.apache.spark.mllib.optimization.Gradient, | updater: org.apache.spark.mllib.optimization.Updater, | stepSize: scala.Double, | numIterations: scala.Int, | regParam: scala.Double, | miniBatchFraction: scala.Double, | initialWeights: org.apache.spark.mllib.linalg.Vector, | convergenceTol: scala.Double) weights: org.apache.spark.mllib.linalg.Vector = [3.000248212261404,0.9997330919125574] scala> scala> println(weights) [3.000248212261404,0.9997330919125574]
樣例實現:參考《夜明的孤行燈 -》Spark中的梯度下降 -》 https://www.huangyunkun.com/2015/05/27/spark-gradient-descent/#comment-9317》
Spark MLib:梯度下降算法實現