Spark上如何做分散式AUC計算
阿新 • • 發佈:2019-01-06
by 王犇 20160115
AUC是分類模型常用的評價手段,目前的Spark mllib裡面evaluation包中所提供的auc方法是拿到了roc曲線中的各個點之後再進行auc的計算,但是實際應用場景中(以邏輯迴歸為例),我們常常是對每個樣本進行打分之後整合樣本的label直接進行auc的計算,輸入可能是(label, predict_score)這樣的形式,mllib中提供的方案就不太適用了,所以這裡提供了另一種計算方法,採用了針對0,1分類問題的近似計算方案,叫做BinaryAuc:
首先對predict_score進行排序,然後根據樣本正負例的情況,分別計算每個小梯形的面積,最後彙總成為最終的auc值(由於在spark中資料是分散式RDD的形態,所以計算梯形面積的時候需要知道前一個RDD的offset,這裡需要先遍歷資料,但是避免了彙總到單機進行計算):
package org.apache.spark.mllib.wml /** * @author wangben 2015 */ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.mllib.rdd.RDDFunctions._ import scala.collection.Iterator import Array._ class BinaryAUC extends Serializable { //input format: predictioin,label def auc( data: RDD[ (Double, Double) ] ) : Double = { //group same score result val group_result = data.groupByKey().map(x => { var r = new Array[Double](2) for(item <- x._2) { if(item > 0.0) r(1) += 1.0 else r(0) += 1.0 } (x._1, r) // score, [ FalseN, PositiveN ] }) //points 需要累積 val group_rank = group_result.sortByKey(false) //big first //計算累積 var step_sizes = group_rank.mapPartitions( x => { var r = List[(Double, Double)]() var fn_sum = 0.0 var pn_sum = 0.0 while( x.hasNext ) { val cur = x.next fn_sum += cur._2(0) pn_sum += cur._2(1) } r.::(fn_sum, pn_sum).toIterator } ,true).collect var debug_string = "" var step_sizes_sum = ofDim[Double](step_sizes.size, 2) //二維陣列 for( i <- 0 to (step_sizes.size - 1) ) { if(i == 0) { step_sizes_sum(i)(0) = 0.0 step_sizes_sum(i)(1) = 0.0 } else { step_sizes_sum(i)(0) = step_sizes_sum(i - 1)(0) + step_sizes(i - 1)._1 step_sizes_sum(i)(1) = step_sizes_sum(i - 1)(1) + step_sizes(i - 1)._2 } debug_string += "\t" + step_sizes_sum(i)(0).toString + "\t" + step_sizes_sum(i)(1).toString } val sss_len = step_sizes_sum.size val total_fn = step_sizes_sum(sss_len - 1)(0) + step_sizes(sss_len - 1)._1 val total_pn = step_sizes_sum(sss_len - 1)(1) + step_sizes(sss_len - 1)._2 //System.out.println( "debug auc_step_size: " + debug_string) val bc_step_sizes_sum = data.context.broadcast(step_sizes_sum) val modified_group_rank = group_rank.mapPartitionsWithIndex( (index, x) => { var sss = bc_step_sizes_sum.value var r = List[(Double, Array[Double])]() //var r = List[(Double, String)]() var fn = sss(index)(0) //start point var pn = sss(index)(1) while( x.hasNext ) { var p = new Array[Double](2) val cur = x.next p(0) = fn + cur._2(0) p(1) = pn + cur._2(1) fn += cur._2(0) pn += cur._2(1) //r.::= (cur._1, p(0).toString() + "\t" + p(1).toString()) r.::= (cur._1, p) } r.reverse.toIterator } ,true) //output debug info //modified_group_rank.map(l => l._1.toString + "\t" + l._2(0).toString + "\t" + l._2(1)).saveAsTextFile("/home/hdp_teu_dia/resultdata/wangben/debug_info") val score = modified_group_rank.sliding(2).aggregate(0.0)( seqOp = (auc: Double, points: Array[ (Double, Array[Double]) ]) => auc + TrapezoidArea(points), combOp = _ + _ ) System.out.println( "debug auc_mid: " + score + "\t" + (total_fn*total_pn).toString() + "\t" + total_fn.toString() + "\t" + total_pn.toString() ) score/(total_fn*total_pn) } private def TrapezoidArea(points :Array[(Double, Array[Double])]):Double = { val x1 = points(0)._2(0) val y1 = points(0)._2(1) val x2 = points(1)._2(0) val y2 = points(1)._2(1) val base = x2 - x1 val height = (y1 + y2)/2.0 return base*height } } object AUCTest { def main(args: Array[String]){ val conf=new SparkConf() conf.setAppName("TestEvaluation") val sc = new SparkContext(conf) val accum=sc.accumulator(0) val input_file = sc.textFile(args(0)) val predict_label = input_file.map(l => { val x = l.stripPrefix("(").stripSuffix(")") split(",") (x(0).toDouble, x(1).toDouble) }) val auc = new BinaryAUC() val auc_score = auc.auc(predict_label) System.out.println("debug auc_score: " + auc_score.toString()) } }
附,RDD的例項文件:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
確實比官方的好太多