結合原始碼分析Spark中的Accuracy(準確率), Precision(精確率), 和F1-Measure
例子
某大學一個系,總共100人,其中男90人,女10人,現在根據每個人的特徵,預測性別
Accuracy(準確率)
計算
由於我知道男生遠多於女生,所以我完全無視特徵,直接預測所有人都是男生
我預測所的人都是男生,而實際有90個男生,所以
預測正確的數量 = 90
需要預測的總數 = 100
Accuracy = 90 / 100 = 90%
問題
在男女比例嚴重不均勻的情況下,我只要預測全是男生,就能獲得極高的Accuracy。
所以在正負樣本嚴重不均勻的情況下,Accuracy指標失效
Precision(精確率), Recall(召回率)
. | 實際為真 | 實際為假 |
---|---|---|
預測為真 | TP | FP |
預測為假 | FN | TN |
# 前面的T和F,代表預測是否正確
# 後面的P和N,代表預測是真還是假
TP:預測為真,正確了
FP:預測為真,結果錯了
TN:預測為假,正確了
FN:預測為假,結果錯了
計算
注意
:在正負樣本嚴重不均勻的情況下,正樣本必須是數量少的那一類。這裡女生是正樣本。是不是女生,是則預測為真,不是則預測為假。
- 如果沒有預測為真的情況,計算時分母會為0,所以做了調整,也容易比較Accuracy和Precision, Recall的區別
. | 實際為真 | 實際為假 |
---|---|---|
預測為真 | 1 | 0 |
預測為假 | 10 | 89 |
Accuracy = (1 + 89)/ (1 + 0 + 10 + 89) = 90 / 100 = 0.9
Precision = 1 / 1 + 0 = 1
Recall = 1 / 1 + 10 = 0.09090909
注意
:為方便與後面Spark的計算結果對比,無限迴圈小數,我們不做四合五入
問題
雖然我們稍微調整了預測結果,但是Accuracy依然無法反應預測結果。
而Precision在這裡達到了1,但是Recall卻極低。因此Precision,Recall的組合能夠反應我們的預測效果不佳。
但是Precision,Recall在對比的時候會出現問題,比如一個模型的Precision是0.9,Recall是0.19,那麼與上面的1和0.0909對比,哪個模型更好呢?
所以我們需要一個指標,能夠綜合的反應Precision和Recall
F1-Measure
F1值就是Precision和Recall的調和均值
整理後:
計算
計算上面提到的對比情況
F1 = (2 * 1 * 0.09090909) / 1 + 0.09090909 = 0.1666666
F1 = (2 * 0.9 * 0.19) / 0.9 + 0.19 = 0.3137
很顯然後一種更好
調整Precision, Recall的權重
當a等於1時,Precision,Recall各佔50%,就是F1-Measure了
Spark原始碼分析
Spark中API計算Precision,Recall,F1
用Spark API計算出上面我們手工計算出的值
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local") // 除錯的時候一定不要用local[*]
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 我們先構造一個與上文一樣的資料
/**
* 實際為真 實際為假
* 預測為真 1 0
* 預測為假 10 89
*/
// 左邊是預測為真的概率,右邊是真實值
val TP = Array((1.0, 1.0)) // 預測為真,實際為真
val TN = new Array[(Double, Double)](89) // 預測為假, 實際為假
for (i <- TN.indices) {
TN(i) = (0.0, 0.0)
}
val FP = new Array[(Double, Double)](10) // 預測為假, 實際為真
for (i <- FP.indices) {
FP(i) = (0.0, 1)
}
val all = TP ++ TN ++ FP
val scoreAndLabels = sc.parallelize(all)
// 列印觀察資料
// scoreAndLabels.collect().foreach(println)
// println(scoreAndLabels.count())
// 到這裡,我們構造了一個與上文例子一樣的資料
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
// 下面計算的值,我們先只看右邊的數,它表示計算的precision,recall,F1等
// 左邊是Threshold,後面會細說
/**
* (1.0,1.0) // precision跟我們自己之前計算的一樣
* (0.0,0.11) // 這是什麼?先不管
*/
metrics.precisionByThreshold().collect().foreach(println)
println("---")
/**
* (1.0,0.09090909090909091) // recall跟我們自己之前計算的一樣
* (0.0,1.0) // 先忽略
*/
metrics.recallByThreshold().collect().foreach(println)
println("---")
/**
* (1.0,0.16666666666666669) // f1跟我們自己之前計算的一樣
* (0.0,0.19819819819819817) // 先忽略
*/
metrics.fMeasureByThreshold().collect().foreach(println)
}
}
至此,我們用Spark API計算出了各個值。但是有幾個疑問
- 無論是precision,recall,還是fMeasure,後面都跟一個ByThreshold,為什麼?
- 這三個指標,不應該是一個數嘛,為什麼返回一個RDD,裡面包含一堆數?
要弄清楚,就出要知道它們是怎麼計算出來的
計算分析(以Precision為例)
- 從程式碼的角度,一步步跟蹤到Precision的計算公式,公式找到了值也就算出來了
- 從資料的角度,你的輸入資料是怎麼一步步到結果的
程式碼角度
# 類宣告
# scoreAndLabels是一個RDD,存放預測為真的概率和真實值
# numBins,先忽略
class BinaryClassificationMetrics (val scoreAndLabels: RDD[(Double, Double)], val numBins: Int)
呼叫BinaryClassificationMetrics的precisionByThreshold方法計算,precision
new BinaryClassificationMetrics(scoreAndLabels).precisionByThreshold()
跟蹤進入precisionByThreshold方法
def precisionByThreshold(): RDD[(Double, Double)] = createCurve(Precision)
# 呼叫了createCurve(Precision)
# precisionByThreshold返回的RDD,就是這個createCurve方法的返回值
# 兩個問題
# createCurve是什麼?
# 引數Precision又是什麼?
跟蹤進入createCurve方法
/** Creates a curve of (threshold, metric). */
private def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
// confusions肯定是一個RDD,因為它呼叫了map,然後就作為返回值返回了
// 所以confusions是關鍵,對它做變換,就能得到結果
confusions.map { case (s, c) =>
// precisionByThreshold返回的RDD,左邊是threshold,右邊是precision
// 所以這裡的s,就是threshold
// y(c),就是precision
// y是傳入的引數,也就是createCurve(Precision)中的,Precision
// 下面就先看看Precision是什麼
(s, y(c))
}
}
跟蹤進入Precision
// 上文中的y(c),也就是Precision(c),這語法,自然是呼叫apply方法
/** Precision. Defined as 1.0 when there are no positive examples. */
private[evaluation] object Precision extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
// 看名字numTruePositives,就是TP的數量嘛
// totalPositives = TP + FP
val totalPositives = c.numTruePositives + c.numFalsePositives
// totalPositives為0,也就一個真都沒預測
if (totalPositives == 0) {
// 0 / 0,會出錯,這裡是直接返回1
1.0
} else {
// 公式出現
// Precision = TP / (TP + FP)
c.numTruePositives.toDouble / totalPositives
}
}
}
到這裡找到了Precision的計算公式,但是上面提到的兩個疑問,還沒有解決,Threshold怎麼回事,返回RDD幹嘛?
但是通過上面的分析,我們找到了線索,confusions這個通過變換就能出結果的變數,也許就是答案。
資料角度
跟蹤到confusions的宣告
private lazy val (
cumulativeCounts: RDD[(Double, BinaryLabelCounter)],
confusions: RDD[(Double, BinaryConfusionMatrix)]) = {
// ... 省略了60行左右
(cumulativeCounts, confusions)
}
這60行裡做了什麼,我們拷貝出來,一步步分析
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local") // 除錯的時候一定不要用local[*]
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val TP = Array((1.0, 1.0))
val TN = new Array[(Double, Double)](89)
for (i <- TN.indices) {
TN(i) = (0.0, 0.0)
}
/**
* *******這裡改了********這裡改了********這裡改了*****
*/
// 從10改成了5,有5個樣本有60%的概率是真的;另外5個設定成了40%,在下面
val FP1 = new Array[(Double, Double)](5)
for (i <- FP1.indices) {
FP1(i) = (0.6, 1)
}
val FP2 = new Array[(Double, Double)](5) // 有5個樣本有40%的概率是真的
for (i <- FP2.indices) {
FP2(i) = (0.4, 1)
}
val all = TP ++ TN ++ FP1 ++ FP2
val scoreAndLabels = sc.parallelize(all, 2) // 調整並行度為2,後面會說,為什麼要調整
// 列印觀察資料
scoreAndLabels.collect().foreach(println)
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
// 先看下調整後的結果
// 左邊一列多了0.6,和0.4,猜的話,應該是因為上面的概率我們添加了0.6和0.4
// 後面會說,具體是怎麼出來的
/**
* (1.0,1.0) // 當Threshold為1時,precision是1
* (0.6,1.0) // 當Threshold為0.6時,precision還是1.0
* (0.4,1.0) // 以此類推
* (0.0,0.11)
*/
println("-- precisionByThreshold --")
metrics.precisionByThreshold().collect().foreach(println)
/**
* (1.0,0.09090909090909091)
* (0.6,0.5454545454545454)
* (0.4,1.0)
* (0.0,1.0)
*/
println("-- recallByThreshold --")
metrics.recallByThreshold().collect().foreach(println)
/**
* (1.0,0.16666666666666669)
* (0.6,0.7058823529411764)
* (0.4,1.0)
* (0.0,0.19819819819819817)
*/
println("-- fMeasureByThreshold --")
metrics.fMeasureByThreshold().collect().foreach(println)
// 下面以Precision的計算為例
// 下面的程式碼是初始化confusions的程式碼, 在BinaryClassificationMetrics類中,Spark 1.6.1版本的149行開始
// 1. 以預測的概率為key,計算在這個概率下,有多少個;比如:0.6這個概率,出現了多少個(0.6, 1)或0.6, 0)
/**
* (1.0,{numPos: 1, numNeg: 0}) // 1.0,只有一個
* (0.6,{numPos: 5, numNeg: 0}) // 0.6,5個,上面我們修改的
* (0.4,{numPos: 5, numNeg: 0}) // 0.4,同樣是5個
* (0.0,{numPos: 0, numNeg: 89}) // 0.0, 89個
*/
println("-- binnedCounts --")
val binnedCounts = scoreAndLabels.combineByKey(
// BinaryLabelCounter用於儲存累加的numPositives和numNegatives
// 先說下label是什麼,scoreAndLabels中右邊那一列,只可能是0或1, 是真實值
// BinaryLabelCounter中判斷是Positive還是Negatives,是通過label,而不是你自己預測的概率,不是左邊那一列
// label > 0.5 為Positive
createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label,
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
).sortByKey(ascending = false)
binnedCounts.collect().foreach(println)
println("-- agg --")
// agg是一個數組,collect返回一個數組
// 前面設定了Partition為2,所以這裡會有兩條資料
// 計算每個Partition中numPos, numNeg的總和
/**
* {numPos: 6, numNeg: 0}
* {numPos: 5, numNeg: 89}
*/
val agg = binnedCounts.values.mapPartitions { iter =>
val agg = new BinaryLabelCounter()
iter.foreach(agg += _)
Iterator(agg)
}.collect()
agg.foreach(println)
// partitionwiseCumulativeCounts的長度是Partition數量加1
// partitionwiseCumulativeCounts的每一行是每個Partition的初始numPos, numNeg數量; 這點很重要, 後面會用到
/**
* {numPos: 0, numNeg: 0} // 第一個Partition的初始, 都是0,
* {numPos: 6, numNeg: 0} // 第一個Partition累加後, 等於第二個Partition的初始值;同樣可以表明第一個Partition中有6個是Positive
* {numPos: 11, numNeg: 89} // 最後一個位置,就是正負樣本的總數; 一共只有兩個Partition,都累加起來自然就是總和。
*/
println("-- partitionwiseCumulativeCounts --")
val partitionwiseCumulativeCounts =
// 建立一個新的BinaryLabelCounter,然後把agg中的值,從左往右,加一遍
agg.scanLeft(new BinaryLabelCounter())(
(agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
partitionwiseCumulativeCounts.foreach(println)
// 列印正負樣本總數
val totalCount = partitionwiseCumulativeCounts.last
println(s"Total counts: $totalCount")
// 列印Partition的數量
println("getNumPartitions = " + binnedCounts.getNumPartitions)
// binnedCounts
// binnedCounts經過mapPartitionsWithIndex後就變成了cumulativeCounts
// 先看cumulativeCounts是怎麼算出來, 跟下面那組cumulativeCounts資料的結合起來看
/**
* (1.0,{numPos: 1, numNeg: 0}) // 第一行是一樣的
* (0.6,{numPos: 5, numNeg: 0}) // 第一行加上第二上,就是cumulativeCounts的第二行
* (0.4,{numPos: 5, numNeg: 0}) // 前三行相加,就是cumulativeCounts的第三行
* (0.0,{numPos: 0, numNeg: 89}) // 以此類推,前四行相加,就是cumulativeCounts的第四行
*/
// cumulativeCounts
// 那cumulativeCounts的這些數是什麼意思呢?
/**
* (1.0,{numPos: 1, numNeg: 0}) // 當取Threshold為1時,有一個樣本,我預測為真
* (0.6,{numPos: 6, numNeg: 0}) // 當取Threshold為0.6時,有6個樣本,我預測為真
* (0.4,{numPos: 11, numNeg: 0}) // 以此類推
* (0.0,{numPos: 11, numNeg: 89})
*/
println("-- cumulativeCounts --")
// 程式碼是怎麼實現的, 資料可是在RDD上
// 首先binnedCounts是sortByKey排過序的,每個Partitions中是有序的
// 再加上Partition的Index, 和之前的計算的partitionwiseCumulativeCounts, 就能夠計算出來
/**
* partitionwiseCumulativeCounts
* {numPos: 0, numNeg: 0} index為0的Partition, 剛開始時, numPos和numNeg都是0
* {numPos: 6, numNeg: 0} 經過index為0的Partition累加後, index為1的Partition, 剛開始時, numPos為6
* {numPos: 11, numNeg: 89}
*/
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
(index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => {
val cumCount = partitionwiseCumulativeCounts(index)
iter.map { case (score, c) =>
// index為0時, cumCount為{numPos: 0, numNeg: 0}; 也就是第一個Partition, 剛開始時, numPos和numNeg都是0
// 第一個過來的是, (1.0,{numPos: 1, numNeg: 0}), 經過cumCount += c, 變成了(1.0,{numPos: 1, numNeg: 0})
// 第二個過來的是, (0.6,{numPos: 5, numNeg: 0}), 經過cumCount += c, (0.6,{numPos: 6, numNeg: 0})
// index為1時, cumCount為{numPos: 6, numNeg: 0}; 也就是第二個Partition, 剛開始時, numPos為6
// 第一個過來的是, (0.4,{numPos: 5, numNeg: 0}), 經過cumCount += c, 變成了(0.4,{numPos: 11, numNeg: 0})
// 第二個過來的是, (0.0,{numPos: 0, numNeg: 89}), 經過cumCount += c, 變成了(0.0,{numPos: 11, numNeg: 89})
cumCount += c
(score, cumCount.clone())
}
// preservesPartitioning = true, mapPartitionsWithIndex運算元計算過程中,不能修改key
}, preservesPartitioning = true)
cumulativeCounts.collect().foreach(println)
/**
* BinaryConfusionMatrixImpl({numPos: 1, numNeg: 0},{numPos: 11, numNeg: 89})
* 這個矩陣應該轉換成下面這種形式來看
*
* 實際為真 實際為假
* 預測為真 1 0
* 預測為假 11-1 89-0
*
* 所以當Threshold不斷變化時,矩陣也在不斷變化,因此在precision在不斷變化
*
* (1.0,BinaryConfusionMatrixImpl({numPos: 1, numNeg: 0},{numPos: 11, numNeg: 89}))
* (0.6,BinaryConfusionMatrixImpl({numPos: 6, numNeg: 0},{numPos: 11, numNeg: 89}))
* (0.4,BinaryConfusionMatrixImpl({numPos: 11, numNeg: 0},{numPos: 11, numNeg: 89}))
* (0.0,BinaryConfusionMatrixImpl({numPos: 11, numNeg: 89},{numPos: 11, numNeg: 89}))
*/
println("-- confusions --")
val confusions = cumulativeCounts.map { case (score, cumCount) =>
(score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix])
}
confusions.collect().foreach(println)
println("-- precision --")
def createCurve(y: BinaryClassificationMetricComputer): RDD[(Double, Double)] = {
confusions.map { case (s, c) =>
(s, y(c))
}
}
createCurve(Precision).collect().foreach(println)
sc.stop()
}
object Precision extends BinaryClassificationMetricComputer {
override def apply(c: BinaryConfusionMatrix): Double = {
val totalPositives = c.numTruePositives + c.numFalsePositives
if (totalPositives == 0) {
1.0
} else {
c.numTruePositives.toDouble / totalPositives