spark 2.x 原始碼分析 之 Logistic Regression 邏輯迴歸
阿新 • • 發佈:2019-01-10
Logistic Regression 邏輯迴歸
注:第一次寫部落格,希望互相交流改進。如果公式顯示不完整,請看github原文
一、二元邏輯迴歸
1、簡介
迴歸是解決變數之間的對映關係(x->y),而邏輯迴歸則通過sigmoid函式將對映值限定在(0,1)。sigmoid影象如下:假設特徵是x,線性函式可以表示為:
而邏輯迴歸則是在其基礎上套上一個sigmoid函式:
因此邏輯迴歸屬於線性函式,具有線性決策邊界(面):
2、構造損失函式
對於二分類,分類結果只有兩種:y=1 or y=0,其概率分別為因此最大似然估計為:
對上述公式取log方便計算,同時為了將likelihood最大化問題轉化為最小化問題,對上述公式取-log得到損失函式:
考慮一個樣本比較方便,spark中也是這樣做的,針對樣本i,loss對於引數j的一階gradient為:
tips:以上margin和multiplier和spark原始碼中的變數一致。
3、正則項-過擬合
為了減少過擬合,在損失函式中加入正則項,其目的是對引數進行限制,與資料無關。常見的正則化手段:L1和L2。L1由於並非處處可導,因此求解需要專門的方法例如OWLQN
2、最優化
lr.regression採用了L-BFGS(L2)和OWLQN(L1),分別針對L2和L1正則化,可參考二、多元邏輯迴歸
1、softmax概率
spark2中multinomial邏輯迴歸採用的是softmax(與spark1.6不完全一致),可參考ufldl(http://ufldl.stanford.edu/tutorial/supervised/SoftmaxRegression/),類別概率定義為:二元邏輯迴歸中權重為向量,多元邏輯迴歸中權重beta為矩陣,相當於多個二元邏輯迴歸(每個類別/每行):
上述模型中的引數可以任意伸縮,即對於任意常數值,都可以被加到所有引數,而每個類別的概率值不發生變化:
2、損失函式及其導數
對於資料中的一個例項instance,損失函式為: 其中,不論SGD,LBFGS還是OWLQN最優化,都需要計算損失函式對引數的一階導數: 其中,w_i是樣本權重(暫時忽略不管), 而 I_{y=k}:因為對第k個類別的引數beta_k求導,因此只有當前樣本的y=k,損失函式的最後一項才計算:
上述公式中,當max(margin)>0時會導致運算溢位,因此需要一些調整,首先損失函式等價變換: 進而,multiplier則變成:
三.例項
import org.apache.spark.ml.classification.LogisticRegression
// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFamily("multinomial")
val mlrModel = mlr.fit(training)
// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
//多分類與上述類似,
四.程式碼分析
4.1 整體流程
邏輯迴歸(mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala)的主要程式碼體現在run函式的 `val (coefficientMatrix, interceptVector, objectiveHistory) = {}` 程式碼塊中。其中前部分初始化引數和計算summary(feature的均值和標準差等),之後則是關鍵部分: **損失函式costFun和最優化方法optimizer**: 如果不使用L1正則化,則採用LBFGS優化,否則利用OWLQN演算法優化(因為L1不保證處處可導),兩者都屬於擬牛頓法 val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
#損失函式後面定義,其中包括梯度計算。必須定義,breeze/optimize中(LBFGS和OWLQN)會用到
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
$(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial,
$(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
} else {
val standardizationParam = $(standardization)
def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept
val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
if (isIntercept) {
0.0
} else {
if (standardizationParam) {
regParamL1
} else {
val featureIndex = index / numCoefficientSets
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
if (featuresStd(featureIndex) != 0.0) {
regParamL1 / featuresStd(featureIndex)
} else {
0.0
}
}
}
}
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}
#此處省略 初始化等操作#
#該LogisticRegression類繼承了FirstOrderMinimizer,因此使用FirstOrderMinimizer類中的iterations方法
val states = optimizer.iterations(new CachedDiffFunction(costFun),
new BDV[Double](initialCoefWithInterceptMatrix.toArray))
/*
Note that in Logistic Regression, the objective history (loss + regularization)
is log-likelihood which is invariant under feature standardization. As a result,
the objective history from optimizer is the same as the one in the original space.
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
#更新到最後一個迭代為最終值
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
bcFeaturesStd.destroy(blocking = false)
其中states表示狀態迭代器,每個迭代進行更新,state類在breeze/optimize/FirstOrderMinimizer.scala中,包括x模型引數、value模型loss、grad梯度等:
case class State[+T, +ConvergenceInfo, +History](
x: T,
value: Double,
grad: T,
adjustedValue: Double,
adjustedGradient: T,
iter: Int,
initialAdjVal: Double,
history: History,
convergenceInfo: ConvergenceInfo,
searchFailed: Boolean = false,
var convergenceReason: Option[ConvergenceReason] = None) {}
4.2 損失函式類 LogisticCostFun
*作用*:在FirstOrderMinimizer的iterations中更新states時使用calculateObjective方法,其中呼叫DiffFunction.calculate。而LogisticCostFun則繼承DiffFunction並重寫calculate方法:計算loss 和 gradient with L2 regularization
/**
* LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss
* function, as used in multi-class classification (it is also used in binary logistic regression).
* It returns the loss and gradient with L2 regularization at a particular point (coefficients).
* It's used in Breeze's convex optimization routines.
*/
private class LogisticCostFun(
instances: RDD[Instance],
numClasses: Int,
fitIntercept: Boolean,
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double,
multinomial: Boolean,
aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
val bcCoeffs = instances.context.broadcast(coeffs)
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1
val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
#利用logisticAggregator類分別計算loss和gradient,之後treeAggregate進行add和merge
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
instances.treeAggregate(
new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
multinomial)
)(seqOp, combOp, aggregationDepth)
}
val totalGradientMatrix = logisticAggregator.gradient
val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
// We do not apply regularization to the intercepts
val isIntercept = fitIntercept && (featureIndex == numFeatures)
#將L2正則項加入loss和相應梯度
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
value * value
} else {
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
val gradValue = totalGradientMatrix(classIndex, featureIndex)
totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
value * temp
} else {
0.0
}
}
}
}
}
0.5 * regParamL2 * sum
}
bcCoeffs.destroy(blocking = false)
(logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
}
上述程式碼主要功能:I. 計算loss和gradient並且合併;II. 計算L2正則項加入loss和相應gradient;III. 對資料進行standardization。
- loss和gradient 損失和梯度的計算在LogisticAggregator類中,包括binaryUpdateInPlace和multinomialUpdateInPlace,下一節會詳細介紹。
- L2正則項 這裡要注意兩點:(1)loss和gradient中都要加入相應L2;(2)Intercept不需要正則項
- standardization 即便我們沒有選擇standardization,整個計算過程中還是會standardize資料,包括計算LogisticAggregator中計算loss、gradient,這樣有利於模型收斂。因此這裡需要reverse(有點confused,待後續研究)。
4.3 LogisticAggregator
該類中包含gradient和loss的計算,以及不同LogisticAggregator之間的合併。而gradient和loss計算又包括兩部分二元和多元:binaryUpdateInPlace和multinomialUpdateInPlace- binaryUpdateInPlace:binary邏輯迴歸
/** Update gradient and loss using binary loss function. */
private def binaryUpdateInPlace(
features: Vector,
weight: Double,
label: Double): Unit = {
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = bcCoefficients.value
val localGradientArray = gradientSumArray
//margin即為公式中的margin
val margin = - {
var sum = 0.0
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
//除以localFeaturesStd進行standardization
sum += localCoefficients(index) * value / localFeaturesStd(index)
}
}
if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
sum
}
//同公式中的multiplier
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
//梯度更新:同樣standardization
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
localGradientArray(index) += multiplier * value / localFeaturesStd(index)
}
}
if (fitIntercept) {
localGradientArray(numFeaturesPlusIntercept - 1) += multiplier
}
if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * MLUtils.log1pExp(margin)
} else {
lossSum += weight * (MLUtils.log1pExp(margin) - margin)
}
}
- 首先計算margin以及maxMargin,如公式所述。其中值得注意的是資料進行standardization,以及是否有截距。
/** Update gradient and loss using multinomial (softmax) loss function. */
private def multinomialUpdateInPlace(
features: Vector,
weight: Double,
label: Double): Unit = {
// TODO: use level 2 BLAS operations
/*
Note: this can still be used when numClasses = 2 for binary
logistic regression without pivoting.
*/
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = bcCoefficients.value
val localGradientArray = gradientSumArray
// marginOfLabel is margins(label) in the formula
var marginOfLabel = 0.0
var maxMargin = Double.NegativeInfinity
//計算margin,如公式所述
val margins = new Array[Double](numClasses)
features.foreachActive { (index, value) =>
//資料進行standardization
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
margins(j) += localCoefficients(index * numClasses + j) * stdValue
j += 1
}
}
var i = 0
while (i < numClasses) {
if (fitIntercept) {
margins(i) += localCoefficients(numClasses * numFeatures + i)
}
if (i == label.toInt) marginOfLabel = margins(i)
//計算maxMargin
if (margins(i) > maxMargin) {
maxMargin = margins(i)
}
i += 1
}
其中margin和multiplier是陣列,維度取決於類別數,即對於每個類別k來說就是一個浮點數;而localGradientArray則是一個矩陣(這裡將矩陣平鋪成陣列)。
tips:可以看成K個binary迴歸,分別計算margin,multiplier和gradient;一條樣本,同時計算所有引數梯度localGradientArray。
/**
* When maxMargin is greater than 0, the original formula could cause overflow.
* We address this by subtracting maxMargin from all the margins, so it's guaranteed
* that all of the new margins will be smaller than zero to prevent arithmetic overflow.
*/
val multipliers = new Array[Double](numClasses)
val sum = {
var temp = 0.0
var i = 0
while (i < numClasses) {
if (maxMargin > 0) margins(i) -= maxMargin
val exp = math.exp(margins(i))
temp += exp
multipliers(i) = exp
i += 1
}
temp
}
margins.indices.foreach { i =>
multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
}
features.foreachActive { (index, value) =>
if (localFeaturesStd(index) != 0.0 && value != 0.0) {
val stdValue = value / localFeaturesStd(index)
var j = 0
while (j < numClasses) {
localGradientArray(index * numClasses + j) +=
weight * multipliers(j) * stdValue
j += 1
}
}
}
if (fitIntercept) {
var i = 0
while (i < numClasses) {
localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i)
i += 1
}
}
val loss = if (maxMargin > 0) {
math.log(sum) - marginOfLabel + maxMargin
} else {
math.log(sum) - marginOfLabel
}
lossSum += weight * loss
}
4.4 最優化方法(/breeze/optimize/)
在LogisticCostFun中只計算了引數的一階導數gradient,然而原始碼中用的最優化方法是LBFGS和OWLQN(解決L1-norm不可微),因此只有gradient是不夠的。最優化的重點在於確定引數的更新方向和步長。 FirstOrderMinimizer中聲明瞭(1)海塞陣估計方法History(2)引數更新方向chooseDescentDirection(3)步長determineStepSize。在iterations中更新states時要用到上述三個重要模組,具體的定義則在相應的最優化方法中。- LBFGS (breeze/optimize/LBFGS.scala)
- 還塞矩陣估計方法:`type History = LBFGS.ApproximateInverseHessian[T]`
- chooseDescentDirection:迭代方向,海塞陣*梯度
``` protected def chooseDescentDirection(state: State, fn: DiffFunction[T]): T = { state.history * state.grad } ``` - determineStepSize:一維搜尋,尋找最佳步長,具體原理此處省略
/**
* Given a direction, perform a line search to find
* a step size to descend. The result fulfills the wolfe conditions.
*
* @param state the current state
* @param f The objective
* @param dir The step direction
* @return stepSize
*/
protected def determineStepSize(state: State, f: DiffFunction[T], dir: T) = {
val x = state.x
val grad = state.grad
val ff = LineSearch.functionFromSearchDirection(f, x, dir)
val search = new StrongWolfeLineSearch(maxZoomIter = 10, maxLineSearchIter = 10) // TODO: Need good default values here.
val alpha = search.minimize(ff, if (state.iter == 0.0) 1.0 / norm(dir) else 1.0)
if (alpha * norm(grad) < 1E-10)
throw new StepSizeUnderflow
alpha
}
- 還塞矩陣估計方法:繼承了LBFGS的history,因為L1-norm的存在與否不影響Hessian矩陣的估計。
- chooseDescentDirection:繼承了LBFGS,同時相應調整方向,解決L1不可導(可參考OWLQN原理,次梯度)
override protected def chooseDescentDirection(state: State, fn: DiffFunction[T]) = {
val descentDir = super.chooseDescentDirection(state.copy(grad = state.adjustedGradient), fn)
// The original paper requires that the descent direction be corrected to be
// in the same directional (within the same hypercube) as the adjusted gradient for proof.
// Although this doesn't seem to affect the outcome that much in most of cases, there are some cases
// where the algorithm won't converge (confirmed with the author, Galen Andrew).
val correctedDir = space.zipMapValues.map(descentDir, state.adjustedGradient, {
case (d, g) => if (d * g < 0) d else 0.0
})
correctedDir
}
}
override protected def determineStepSize(state: State, f: DiffFunction[T], dir: T) = {
val iter = state.iter
val normGradInDir = {
val possibleNorm = dir.dot(state.grad)
// if (possibleNorm > 0) { // hill climbing is not what we want. Bad LBFGS.
// logger.warn("Direction of positive gradient chosen!")
// logger.warn("Direction is:" + possibleNorm)
// Reverse the direction, clearly it's a bad idea to go up
// dir *= -1.0
// dir dot state.grad
// } else {
possibleNorm
// }
}
// projects x to be on the same orthant as y
// this basically requires that x'_i = x_i if sign(x_i) == sign(y_i), and 0 otherwise.
override protected def takeStep(state: State, dir: T, stepSize: Double) = {
val stepped = state.x + dir * stepSize
val orthant = computeOrthant(state.x, state.adjustedGradient)
space.zipMapValues.map(stepped, orthant, {
case (v, ov) =>
v * I(math.signum(v) == math.signum(ov))
})
}
參考文獻
【1】ufldl-SoftmaxRegression【2】部落格OWLQN