1. 程式人生 > >Spark上如何做分散式AUC計算

Spark上如何做分散式AUC計算

by 王犇 20160115

AUC是分類模型常用的評價手段,目前的Spark mllib裡面evaluation包中所提供的auc方法是拿到了roc曲線中的各個點之後再進行auc的計算,但是實際應用場景中(以邏輯迴歸為例),我們常常是對每個樣本進行打分之後整合樣本的label直接進行auc的計算,輸入可能是(label, predict_score)這樣的形式,mllib中提供的方案就不太適用了,所以這裡提供了另一種計算方法,採用了針對0,1分類問題的近似計算方案,叫做BinaryAuc:

首先對predict_score進行排序,然後根據樣本正負例的情況,分別計算每個小梯形的面積,最後彙總成為最終的auc值(由於在spark中資料是分散式RDD的形態,所以計算梯形面積的時候需要知道前一個RDD的offset,這裡需要先遍歷資料,但是避免了彙總到單機進行計算):

package org.apache.spark.mllib.wml

/**
 * @author wangben 2015
 */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.collection.Iterator
import Array._

class BinaryAUC extends Serializable {
  //input format: predictioin,label
  def auc( data: RDD[ (Double, Double) ] ) : Double =
  {
    //group same score result
    val group_result = data.groupByKey().map(x => {
      var r = new Array[Double](2)
      for(item <- x._2) {
        if(item > 0.0) r(1) += 1.0
        else r(0) += 1.0
      }
      (x._1, r) // score, [ FalseN, PositiveN ]
    })

    //points 需要累積
    val group_rank = group_result.sortByKey(false) //big first
    //計算累積
    var step_sizes = group_rank.mapPartitions( x => 
      {
        var r = List[(Double, Double)]()
        var fn_sum = 0.0
        var pn_sum = 0.0
        while( x.hasNext ) 
        {
          val cur = x.next
          fn_sum += cur._2(0)
          pn_sum += cur._2(1)
        }
        r.::(fn_sum, pn_sum).toIterator
      } ,true).collect
    var debug_string = ""
    var step_sizes_sum = ofDim[Double](step_sizes.size, 2) //二維陣列
    for( i <- 0 to (step_sizes.size - 1) ) {
      if(i == 0) { 
        step_sizes_sum(i)(0) = 0.0
        step_sizes_sum(i)(1) = 0.0
      } else {
        step_sizes_sum(i)(0) = step_sizes_sum(i - 1)(0) + step_sizes(i - 1)._1
        step_sizes_sum(i)(1) = step_sizes_sum(i - 1)(1) + step_sizes(i - 1)._2
      }
      debug_string += "\t" + step_sizes_sum(i)(0).toString + "\t" + step_sizes_sum(i)(1).toString 
    }
    val sss_len = step_sizes_sum.size
    val total_fn = step_sizes_sum(sss_len - 1)(0) + step_sizes(sss_len - 1)._1
    val total_pn = step_sizes_sum(sss_len - 1)(1) + step_sizes(sss_len - 1)._2
    //System.out.println( "debug auc_step_size: " + debug_string)
    
    val bc_step_sizes_sum = data.context.broadcast(step_sizes_sum)
    val modified_group_rank = group_rank.mapPartitionsWithIndex( (index, x) => 
      {
        var sss = bc_step_sizes_sum.value
        var r = List[(Double, Array[Double])]()
        //var r = List[(Double, String)]()
        var fn = sss(index)(0) //start point
        var pn = sss(index)(1)
        while( x.hasNext ) 
        {
          var p = new Array[Double](2)
          val cur = x.next
          p(0) = fn + cur._2(0)
          p(1) = pn + cur._2(1)
          fn += cur._2(0)
          pn += cur._2(1)
          //r.::= (cur._1, p(0).toString() + "\t" + p(1).toString()) 
          r.::= (cur._1, p) 
        }
        r.reverse.toIterator
      } ,true)
      
    //output debug info
    //modified_group_rank.map(l => l._1.toString + "\t" + l._2(0).toString + "\t" + l._2(1)).saveAsTextFile("/home/hdp_teu_dia/resultdata/wangben/debug_info")
    
    val score = modified_group_rank.sliding(2).aggregate(0.0)(
      seqOp = (auc: Double, points: Array[ (Double, Array[Double]) ]) => auc + TrapezoidArea(points),
      combOp = _ + _
    )
    System.out.println( "debug auc_mid: " + score 
        + "\t" + (total_fn*total_pn).toString() 
        + "\t" + total_fn.toString() 
        + "\t" + total_pn.toString() )

    score/(total_fn*total_pn)
  }
  
  private def TrapezoidArea(points :Array[(Double, Array[Double])]):Double = {
    val x1 = points(0)._2(0)
    val y1 = points(0)._2(1)
    val x2 = points(1)._2(0)
    val y2 = points(1)._2(1)
    
    val base = x2 - x1
    val height = (y1 + y2)/2.0
    return base*height
  }
}

object AUCTest {
  
  def  main(args: Array[String]){
    val conf=new SparkConf()
    conf.setAppName("TestEvaluation")
    val sc = new SparkContext(conf)
    val accum=sc.accumulator(0)
    val input_file = sc.textFile(args(0))
    val predict_label = input_file.map(l => {
      val x = l.stripPrefix("(").stripSuffix(")") split(",")
      (x(0).toDouble, x(1).toDouble)
      })
    val auc = new BinaryAUC()
    val auc_score = auc.auc(predict_label)
    System.out.println("debug auc_score: " + auc_score.toString())
  }
  
}


附,RDD的例項文件:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

確實比官方的好太多