1. 程式人生 > >結合原始碼分析Spark中的Accuracy(準確率), Precision(精確率), 和F1-Measure

結合原始碼分析Spark中的Accuracy(準確率), Precision(精確率), 和F1-Measure

例子

某大學一個系,總共100人,其中男90人,女10人,現在根據每個人的特徵,預測性別

Accuracy(準確率)

Accuracy=

計算

由於我知道男生遠多於女生,所以我完全無視特徵,直接預測所有人都是男生
我預測所的人都是男生,而實際有90個男生,所以
預測正確的數量 = 90
需要預測的總數 = 100
Accuracy = 90 / 100 = 90%

問題

在男女比例嚴重不均勻的情況下,我只要預測全是男生,就能獲得極高的Accuracy。
所以在正負樣本嚴重不均勻的情況下,Accuracy指標失效

Precision(精確率), Recall(召回率)

. 實際為真 實際為假
預測為真 TP FP
預測為假 FN TN
# 前面的T和F,代表預測是否正確
# 後面的P和N,代表預測是真還是假
TP:預測為真,正確了
FP:預測為真,結果錯了
TN:預測為假,正確了
FN:預測為假,結果錯了
Precision=TPTP+FP= Recall=TPTP+FN=

計算

注意:在正負樣本嚴重不均勻的情況下,正樣本必須是數量少的那一類。這裡女生是正樣本。是不是女生,是則預測為真,不是則預測為假。

  • 如果沒有預測為真的情況,計算時分母會為0,所以做了調整,也容易比較Accuracy和Precision, Recall的區別
. 實際為真 實際為假
預測為真 1 0
預測為假 10 89

Accuracy = (1 + 89)/ (1 + 0 + 10 + 89) = 90 / 100 = 0.9
Precision = 1 / 1 + 0 = 1
Recall = 1 / 1 + 10 = 0.09090909

注意:為方便與後面Spark的計算結果對比,無限迴圈小數,我們不做四合五入

問題

雖然我們稍微調整了預測結果,但是Accuracy依然無法反應預測結果。

而Precision在這裡達到了1,但是Recall卻極低。因此Precision,Recall的組合能夠反應我們的預測效果不佳。

但是Precision,Recall在對比的時候會出現問題,比如一個模型的Precision是0.9,Recall是0.19,那麼與上面的1和0.0909對比,哪個模型更好呢?

所以我們需要一個指標,能夠綜合的反應Precision和Recall

F1-Measure

F1值就是Precision和Recall的調和均值

1F1=1Precision+1Recall

整理後:

F1=2×Precision×RecallPrecision+Recall

計算

計算上面提到的對比情況

F1 = (2 * 1 * 0.09090909) / 1 + 0.09090909 = 0.1666666
F1 = (2 * 0.9 * 0.19) / 0.9 + 0.19 = 0.3137

很顯然後一種更好

調整Precision, Recall的權重

Fa=(a2+1)×Precision×Recalla2×(Precision+Recall)

當a等於1時,Precision,Recall各佔50%,就是F1-Measure了

Spark原始碼分析

Spark中API計算Precision,Recall,F1

用Spark API計算出上面我們手工計算出的值

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local") // 除錯的時候一定不要用local[*]
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    // 我們先構造一個與上文一樣的資料
    /**
      *         實際為真  實際為假
      * 預測為真   1        0
      * 預測為假   10       89
      */
    // 左邊是預測為真的概率,右邊是真實值
    val TP = Array((1.0, 1.0)) // 預測為真,實際為真

    val TN = new Array[(Double, Double)](89) // 預測為假, 實際為假

    for (i <- TN.indices) {
      TN(i) = (0.0, 0.0)
    }

    val FP = new Array[(Double, Double)](10) // 預測為假, 實際為真

    for (i <- FP.indices) {
      FP(i) = (0.0, 1)
    }

    val all = TP ++ TN ++ FP

    val scoreAndLabels = sc.parallelize(all)

    // 列印觀察資料
    //    scoreAndLabels.collect().foreach(println)
    //    println(scoreAndLabels.count())
    // 到這裡,我們構造了一個與上文例子一樣的資料

    val metrics = new BinaryClassificationMetrics(scoreAndLabels)

    // 下面計算的值,我們先只看右邊的數,它表示計算的precision,recall,F1等
    // 左邊是Threshold,後面會細說
    /**
      * (1.0,1.0) // precision跟我們自己之前計算的一樣
      * (0.0,0.11) // 這是什麼?先不管
      */
    metrics.precisionByThreshold().collect().foreach(println)
    println("---")

    /**
      * (1.0,0.09090909090909091) // recall跟我們自己之前計算的一樣
      * (0.0,1.0) // 先忽略
      */
    metrics.recallByThreshold().collect().foreach(println)
    println("---")

    /**
      * (1.0,0.16666666666666669) // f1跟我們自己之前計算的一樣
      * (0.0,0.19819819819819817) // 先忽略
      */
    metrics.fMeasureByThreshold().collect().foreach(println)
  }
}

至此,我們用Spark API計算出了各個值。但是有幾個疑問

  • 無論是precision,recall,還是fMeasure,後面都跟一個ByThreshold,為什麼?
  • 這三個指標,不應該是一個數嘛,為什麼返回一個RDD,裡面包含一堆數?

要弄清楚,就出要知道它們是怎麼計算出來的

計算分析(以Precision為例)

  • 從程式碼的角度,一步步跟蹤到Precision的計算公式,公式找到了值也就算出來了
  • 從資料的角度,你的輸入資料是怎麼一步步到結果的

程式碼角度

# 類宣告
# scoreAndLabels是一個RDD,存放預測為真的概率和真實值
# numBins,先忽略
class BinaryClassificationMetrics (val scoreAndLabels: RDD[(Double, Double)], val numBins: Int)

呼叫BinaryClassificationMetrics的precisionByThreshold方法計算,precision

new BinaryClassificationMetrics(scoreAndLabels).precisionByThreshold()

跟蹤進入precisionByThreshold方法

def precisionByThreshold(): RDD[(Double, Double)] = createCurve(Precision)
# 呼叫了createCurve(Precision)
# precisionByThreshold返回的RDD,就是這個createCurve方法的返回值
# 兩個問題
# createCurve是什麼?
# 引數Precision又是什麼?

跟蹤進入createCurve方法

/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
    // confusions肯定是一個RDD,因為它呼叫了map,然後就作為返回值返回了
    // 所以confusions是關鍵,對它做變換,就能得到結果
    confusions.map { case (s, c) =>
      // precisionByThreshold返回的RDD,左邊是threshold,右邊是precision
      // 所以這裡的s,就是threshold
      // y(c),就是precision
      // y是傳入的引數,也就是createCurve(Precision)中的,Precision
      // 下面就先看看Precision是什麼
      (s, y(c))
    }
}

跟蹤進入Precision

// 上文中的y(c),也就是Precision(c),這語法,自然是呼叫apply方法
/** Precision. Defined as 1.0 when there are no positive examples. */
private[evaluation] object Precision extends BinaryClassificationMetricComputer {
  override def apply(c: BinaryConfusionMatrix): Double = {
    // 看名字numTruePositives,就是TP的數量嘛
    // totalPositives = TP + FP
    val totalPositives = c.numTruePositives + c.numFalsePositives
    // totalPositives為0,也就一個真都沒預測
    if (totalPositives == 0) {
      // 0 / 0,會出錯,這裡是直接返回1
      1.0
    } else {
      // 公式出現
      // Precision = TP / (TP + FP)
      c.numTruePositives.toDouble / totalPositives
    }
  }
}

到這裡找到了Precision的計算公式,但是上面提到的兩個疑問,還沒有解決,Threshold怎麼回事,返回RDD幹嘛?

但是通過上面的分析,我們找到了線索,confusions這個通過變換就能出結果的變數,也許就是答案。

資料角度

跟蹤到confusions的宣告

private lazy val (
    cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
    confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
    // ... 省略了60行左右
    (cumulativeCounts, confusions)
}

這60行裡做了什麼,我們拷貝出來,一步步分析

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local") // 除錯的時候一定不要用local[*]
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val TP = Array((1.0, 1.0))

    val TN = new Array[(Double, Double)](89)

    for (i <- TN.indices) {
      TN(i) = (0.0, 0.0)
    }

    /**
      * *******這裡改了********這裡改了********這裡改了*****
      */
    // 從10改成了5,有5個樣本有60%的概率是真的;另外5個設定成了40%,在下面
    val FP1 = new Array[(Double, Double)](5)

    for (i <- FP1.indices) {
      FP1(i) = (0.6, 1)
    }

    val FP2 = new Array[(Double, Double)](5) // 有5個樣本有40%的概率是真的

    for (i <- FP2.indices) {
      FP2(i) = (0.4, 1)
    }

    val all = TP ++ TN ++ FP1 ++ FP2

    val scoreAndLabels = sc.parallelize(all, 2) // 調整並行度為2,後面會說,為什麼要調整

    // 列印觀察資料
    scoreAndLabels.collect().foreach(println)

    val metrics = new BinaryClassificationMetrics(scoreAndLabels)

    // 先看下調整後的結果
    // 左邊一列多了0.6,和0.4,猜的話,應該是因為上面的概率我們添加了0.6和0.4
    // 後面會說,具體是怎麼出來的
    /**
      * (1.0,1.0) // 當Threshold為1時,precision是1
      * (0.6,1.0) // 當Threshold為0.6時,precision還是1.0
      * (0.4,1.0) // 以此類推
      * (0.0,0.11)
      */
    println("-- precisionByThreshold --")
    metrics.precisionByThreshold().collect().foreach(println)

    /**
      * (1.0,0.09090909090909091)
      * (0.6,0.5454545454545454)
      * (0.4,1.0)
      * (0.0,1.0)
      */
    println("-- recallByThreshold --")
    metrics.recallByThreshold().collect().foreach(println)

    /**
      * (1.0,0.16666666666666669)
      * (0.6,0.7058823529411764)
      * (0.4,1.0)
      * (0.0,0.19819819819819817)
      */
    println("--  fMeasureByThreshold --")
    metrics.fMeasureByThreshold().collect().foreach(println)

    // 下面以Precision的計算為例

    // 下面的程式碼是初始化confusions的程式碼, 在BinaryClassificationMetrics類中,Spark 1.6.1版本的149行開始

    // 1. 以預測的概率為key,計算在這個概率下,有多少個;比如:0.6這個概率,出現了多少個(0.6, 1)或0.6, 0)
    /**
      * (1.0,{numPos: 1, numNeg: 0}) // 1.0,只有一個
      * (0.6,{numPos: 5, numNeg: 0}) // 0.6,5個,上面我們修改的
      * (0.4,{numPos: 5, numNeg: 0}) // 0.4,同樣是5個
      * (0.0,{numPos: 0, numNeg: 89}) // 0.0, 89個
      */
    println("-- binnedCounts --")
    val binnedCounts = scoreAndLabels.combineByKey(
      // BinaryLabelCounter用於儲存累加的numPositives和numNegatives
      // 先說下label是什麼,scoreAndLabels中右邊那一列,只可能是0或1, 是真實值
      // BinaryLabelCounter中判斷是Positive還是Negatives,是通過label,而不是你自己預測的概率,不是左邊那一列
      // label > 0.5 為Positive
      createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
      mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
      mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
    ).sortByKey(ascending = false)

    binnedCounts.collect().foreach(println)

    println("-- agg --")
    // agg是一個數組,collect返回一個數組
    // 前面設定了Partition為2,所以這裡會有兩條資料
    // 計算每個Partition中numPos, numNeg的總和
    /**
      * {numPos: 6, numNeg: 0}
      * {numPos: 5, numNeg: 89}
      */
    val agg = binnedCounts.values.mapPartitions { iter =>
      val agg = new BinaryLabelCounter()
      iter.foreach(agg += _)
      Iterator(agg)
    }.collect()

    agg.foreach(println)

    // partitionwiseCumulativeCounts的長度是Partition數量加1
    // partitionwiseCumulativeCounts的每一行是每個Partition的初始numPos, numNeg數量; 這點很重要, 後面會用到
    /**
      * {numPos: 0, numNeg: 0} // 第一個Partition的初始, 都是0,
      * {numPos: 6, numNeg: 0} // 第一個Partition累加後, 等於第二個Partition的初始值;同樣可以表明第一個Partition中有6個是Positive
      * {numPos: 11, numNeg: 89} // 最後一個位置,就是正負樣本的總數; 一共只有兩個Partition,都累加起來自然就是總和。
      */
    println("-- partitionwiseCumulativeCounts --")
    val partitionwiseCumulativeCounts =
    // 建立一個新的BinaryLabelCounter,然後把agg中的值,從左往右,加一遍
      agg.scanLeft(new BinaryLabelCounter())(
        (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)

    partitionwiseCumulativeCounts.foreach(println)

    // 列印正負樣本總數
    val totalCount = partitionwiseCumulativeCounts.last
    println(s"Total counts: $totalCount")

    // 列印Partition的數量
    println("getNumPartitions = " + binnedCounts.getNumPartitions)

    // binnedCounts
    // binnedCounts經過mapPartitionsWithIndex後就變成了cumulativeCounts
    // 先看cumulativeCounts是怎麼算出來, 跟下面那組cumulativeCounts資料的結合起來看
    /**
      * (1.0,{numPos: 1, numNeg: 0}) // 第一行是一樣的
      * (0.6,{numPos: 5, numNeg: 0}) // 第一行加上第二上,就是cumulativeCounts的第二行
      * (0.4,{numPos: 5, numNeg: 0}) // 前三行相加,就是cumulativeCounts的第三行
      * (0.0,{numPos: 0, numNeg: 89}) // 以此類推,前四行相加,就是cumulativeCounts的第四行
      */

    // cumulativeCounts
    // 那cumulativeCounts的這些數是什麼意思呢?
    /**
      * (1.0,{numPos: 1, numNeg: 0}) // 當取Threshold為1時,有一個樣本,我預測為真
      * (0.6,{numPos: 6, numNeg: 0}) // 當取Threshold為0.6時,有6個樣本,我預測為真
      * (0.4,{numPos: 11, numNeg: 0}) // 以此類推
      * (0.0,{numPos: 11, numNeg: 89})
      */
    println("-- cumulativeCounts --")
    // 程式碼是怎麼實現的, 資料可是在RDD上
    // 首先binnedCounts是sortByKey排過序的,每個Partitions中是有序的
    // 再加上Partition的Index, 和之前的計算的partitionwiseCumulativeCounts, 就能夠計算出來
    /**
      * partitionwiseCumulativeCounts
      * {numPos: 0, numNeg: 0} index為0的Partition, 剛開始時, numPos和numNeg都是0
      * {numPos: 6, numNeg: 0} 經過index為0的Partition累加後, index為1的Partition, 剛開始時, numPos為6
      * {numPos: 11, numNeg: 89}
      */
    val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
      (index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
        val cumCount = partitionwiseCumulativeCounts(index)
        iter.map { case (score, c) =>
          // index為0時, cumCount為{numPos: 0, numNeg: 0}; 也就是第一個Partition, 剛開始時, numPos和numNeg都是0
          // 第一個過來的是, (1.0,{numPos: 1, numNeg: 0}), 經過cumCount += c, 變成了(1.0,{numPos: 1, numNeg: 0})
          // 第二個過來的是, (0.6,{numPos: 5, numNeg: 0}), 經過cumCount += c, (0.6,{numPos: 6, numNeg: 0})
          // index為1時, cumCount為{numPos: 6, numNeg: 0}; 也就是第二個Partition, 剛開始時, numPos為6
          // 第一個過來的是, (0.4,{numPos: 5, numNeg: 0}), 經過cumCount += c, 變成了(0.4,{numPos: 11, numNeg: 0})
          // 第二個過來的是, (0.0,{numPos: 0, numNeg: 89}), 經過cumCount += c, 變成了(0.0,{numPos: 11, numNeg: 89})
          cumCount += c
          (score, cumCount.clone())
        }
        // preservesPartitioning = true, mapPartitionsWithIndex運算元計算過程中,不能修改key
      }, preservesPartitioning = true)

    cumulativeCounts.collect().foreach(println)

    /**
      * BinaryConfusionMatrixImpl({numPos: 1, numNeg: 0},{numPos: 11, numNeg: 89})
      * 這個矩陣應該轉換成下面這種形式來看
      *
      *          實際為真  實際為假
      * 預測為真   1        0
      * 預測為假   11-1     89-0
      *
      * 所以當Threshold不斷變化時,矩陣也在不斷變化,因此在precision在不斷變化
      *
      * (1.0,BinaryConfusionMatrixImpl({numPos: 1, numNeg: 0},{numPos: 11, numNeg: 89}))
      * (0.6,BinaryConfusionMatrixImpl({numPos: 6, numNeg: 0},{numPos: 11, numNeg: 89}))
      * (0.4,BinaryConfusionMatrixImpl({numPos: 11, numNeg: 0},{numPos: 11, numNeg: 89}))
      * (0.0,BinaryConfusionMatrixImpl({numPos: 11, numNeg: 89},{numPos: 11, numNeg: 89}))
      */
    println("-- confusions --")
    val confusions = cumulativeCounts.map { case (score, cumCount) =>
      (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
    }

    confusions.collect().foreach(println)

    println("-- precision --")
    def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
      confusions.map { case (s, c) =>
        (s, y(c))
      }
    }

    createCurve(Precision).collect().foreach(println)

    sc.stop()
  }

  object Precision extends BinaryClassificationMetricComputer {
    override def apply(c: BinaryConfusionMatrix): Double = {
      val totalPositives = c.numTruePositives + c.numFalsePositives
      if (totalPositives == 0) {
        1.0
      } else {
        c.numTruePositives.toDouble / totalPositives