1. 程式人生 > >Spark MLlib原始碼解讀之樸素貝葉斯分類器,NaiveBayes

Spark MLlib原始碼解讀之樸素貝葉斯分類器,NaiveBayes

Spark MLlib 樸素貝葉斯NaiveBayes 原始碼分析

基本原理介紹

首先是基本的條件概率求解的公式。

P(A|B)=P(AB)P(B)

在現實生活中,我們經常會碰到已知一個條件概率,求得兩個時間交換後的概率的問題。也就是在已知P(A|B)的情況下,如何求得P(B|A). 其中P(A|B)表示在事件B發生的前提下, 事件A發生的概率。
其中P(A)指的是先驗概率或者叫做邊緣概率。因為他不需要考慮任何B方面的影響。
P(A|B)指的是B發生後A的條件概率。
P(B)是B的先驗概率或者是邊緣概率,也被稱為標準化常量(normalized constant)。
這裡給出貝葉斯的計算公式:

P(B|A)=P(A|B)P(B)P(A)

對於給定的分類項,求解此分類項在各個類別下出現的概率。哪個最大,哪個就被認為是分好類的型別。

假設 A={a1,a2,a3…}為待分類項,a1,a2是每個待分類項的屬性。
有一個類別的集合B={y1,y2….}.我們需要計算各個類別的先驗概率,並取對數,計算公式如下.

p(i)=log(p(yi))
=log((i類別的次數+平滑因子)/(總次數+類別數*平滑因子))

我們需要計算P(y1|A),P(y2|A),P(y3|A)等等。求得計算結果的最大值,我們可以認為這個特徵屬性的類別為y。

首先我們需要計算在每一個類別下的各個特徵屬性的概率,即P(a1|y1),P(a2|y1),P(a3|y1),
P(a1|y2),P(a2|y2),P(a3|y2,P(a1|y3),P(a2|y3),P(a3|y3)等。

在各個類別下的各個特徵屬性的條件概率估計,並取對數。
theta(i)(j)=log(p(aj|yi))
=log(sumTermFreqs(j)+平滑因子)-thetaLogDenom.

其中theta(i)(j)是i類別下j的特徵的概率,sumTermFreqs(j)是特徵j出現的次數,thetaLogDenom 分為如下的兩個型別:

  • 1 多項式模型:thetaLogDenom=log(sumTermFreqs.values.sum+numFeatures*lamda).其中sumTermFreqs.values.sum解釋為類i下的單詞總數,numFeatures是特徵數量,lamda是平滑因子。
  • 2 伯努利模型:thetaLogDenom=log(n+2.0*lamda).
    文字分類的時候n可以視作是類別i下的單詞總數。lamda表示平滑因子。

假設各個特徵屬性的概率是相互獨立的,有貝葉斯定理,我們有,

P(yi|A)=P(A|yi)P(yi)P(A)

由於分母為常數,所以只需要分子求值即可。又因為各個屬性相互獨立。可以得到:

P(A|yi)P(yi)=P(a1|yi)P(a2|yi)P(a3|yi)=P(yi)j=1m(P(aj|yi))

對上式同取log。

log(p(x|yi)p(yi))=log(p(yi))+j=1mlog(p(aj|yi))

spark 的原始碼實現分析:
分散式的實現方法大致如下:首先對樣本進行聚合操作,統計所有標籤出現的次數,和對應的特徵之和。即對labledPoint(label, features)這樣子的一個元祖採用一個combineByKey聚合函式:對同一標籤資料進行聚合統計操作。
通過聚合操作後,可以通過聚合結果先驗概率,條件概率。然後返回一個貝葉斯模型。
預測的時候,將需要預測的樣本向量乘以theta矩陣,即條件概率矩陣。然後計算每個樣本屬於每個類別的概率,然後選取其中的最大項作為類別。

NaiveBayes 原始碼大致框架:

  • 1,NaiveBayes分類伴生物件:NaiveBayes
  • 1.1 包含靜態train方法,根據輸入引數,初始化NaiveBayes類,通過run方法進行訓練。
  • 2,貝葉斯分類:NaiveBayes
  • 2.1 run方法:開始訓練貝葉斯模型,這個方法通過計算各個類別下的先驗概率和條件概率來計算貝葉斯模型。
  • 3 貝葉斯模型類:NaiveBayesModel:
  • 4 預測計算:predict方法: 根據之前計算的先驗概率和條件概率,計算樣本屬於每個類別的概率。去最大的項做為樣本類別。

好了接下來我們來看看NaiveBayes的原始碼,首先是其伴生物件。

1,object NaiveBayes:
伴生物件主要定義了訓練靜態貝葉斯的分類模型的train方法。train方法通過設定訓練引數進行模型訓練,主要有如下的引數:

input:訓練樣本,其格式為RDD[LabeledPoint],其中labeledPoint格式為(label,features)。
lamda:平滑因子,防止分母出現0.

/**
 *樸素貝葉斯的伴生物件。
 */
@Since("0.9.0")
object NaiveBayes {

  private[spark] val Multinomial: String = "multinomial" //表示的是多項式型別

  private[spark] val Bernoulli: String = "bernoulli" // 表示的是伯努利型別


  private[spark] val supportedModelTypes = Set(Multinomial, Bernoulli)

  //訓練貝葉斯模型,根據訓練樣本,型別為RDD[LabeledPoint]
  //LabeledPointoink,其格式為(label,features)

  def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
    new NaiveBayes().run(input)
  }

  @Since("0.9.0")
  //這個train 方法,除了上一個的基本引數之外,傳入了一個平滑因子,lamda
  //
  def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
    new NaiveBayes(lambda, Multinomial).run(input)
  }


  @Since("1.4.0")
  //輸入樣本,平滑因子,還有模型的類別,分別為多項式型別,和伯努利型別。
  def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel = {
    require(supportedModelTypes.contains(modelType),
      s"NaiveBayes was created with an unknown modelType: $modelType.")
    new NaiveBayes(lambda, modelType).run(input)
  }

}

2, 接下來是NaiveBayes的主類:
class NaiveBayes:我們首先來看看它的基本的構造器和基本方法:

/**
 *樸素貝葉斯分類器的類,訓練一個樸素貝葉斯模型,根據rdd樣本資料,
 * 其格式為(label,features).
 *
 *訓練樸素貝葉斯分類器模型,可以通過TF-IDF 生成向量。用於文件分類,
 *如果讓向量為0-1模式,則可以應用於bernoulli NB,輸入的特徵必須是非負的。
 *
 */

 class NaiveBayes private (
    private var lambda: Double,
    private var modelType: String) extends Serializable with Logging {

  import NaiveBayes.{Bernoulli, Multinomial}

  @Since("1.4.0")
  def this(lambda: Double) = this(lambda, NaiveBayes.Multinomial)

  @Since("0.9.0")
  def this() = this(1.0, NaiveBayes.Multinomial) //在這個引數裡面預設的平滑因子是1

  /** Set the smoothing parameter. Default: 1.0. */
  @Since("0.9.0")
  def setLambda(lambda: Double): NaiveBayes = { //設定平滑因子。
    this.lambda = lambda
    this
  }

接下來我們來看看整個樸素貝葉斯最為重要的run方法,所有的核心程式碼都在這個裡面。

  • 1 NaiveBayes的run方法用來訓練模型,這個方法主要用於計算先驗概率和條件概率。這個方法的實現的邏輯是:首先對樣本進行聚合。以label為key,呼叫combineByKey方法,聚合裡面的同一個label的features,得到所有的label的統計(label,(count, features之和))。

  • 2 根據先驗概率的計算公式p=log((n+lamda)/(numDocuments+numLabels*lamda))計算每個label的先驗概率。根據條件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)計算每個label的先驗概率。

  • 3 根據條件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)計算在各個label下面的各個features的條件概率,返回的是一個二維陣列。

  • 4 最後通過標籤列表,類別先驗概率,特徵的條件概率,型別生成一個樸素貝葉斯模型。

run方法的程式碼如下:

def run(data: RDD[LabeledPoint]): NaiveBayesModel = {


    //在這個裡面定義了一個函式,來判斷輸入的特徵向量的值。要求所有的向量值非負
    val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
      val values = v match {   //判斷向量的型別,是密集向量還是稀疏向量
        case sv: SparseVector => sv.values // 根據不同型別的向量,得到其value值。
        case dv: DenseVector => dv.values
      }
      //判斷是不是所有的值都大於0,否則丟擲一個錯誤。多項式型需要每一個value值得大於0.
      if (!values.forall(_ >= 0.0)) {
        throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")
      }
    }

    //檢測所有的伯努利的值,要求所有的向量值為0或者是1.
    val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
      val values = v match {
        case sv: SparseVector => sv.values
        case dv: DenseVector => dv.values
      }

      if (!values.forall(v => v == 0.0 || v == 1.0)) {
        throw new SparkException(
          s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
      }
    }


    //從這個地方開始對資料進行計算。
    //對於每一個特徵進行聚合,求得每一個標籤的對應的特徵的頻數,
    //aggretaded表示通過label為key,聚合同一個label的features特徵。他的返回格式是 (label,(計數,features之和))

    //注意這個combineByKey 運算元: 其中,createCombiner表示,當combineByKey第一次遇到值為k的Key時,呼叫createCombiner函式,將v轉換為c
    //然後是第二個mergeValue: combineByKey不是第一次遇到值為k的Key時,呼叫mergeValue函式,將v累加到c中
    //mergeCombiners:將兩個c,合併成一個。

    ///注意首先計算的是條件概率,返回個是為(label,(計數,features之和)),
    //表示每個樣本標籤的數量的和,和其對應的樣本向量的特徵之和。

    val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)](


      //完成從V->C型別的轉換,(v:Vector)=》(c:(long,vector))
      createCombiner = (v: Vector) => {  // 根據上面所說的,輸入的是一個vector,通過createCombiner函式將
        if (modelType == Bernoulli) {    //將這個v轉換成c的格式。格式為(1,densevector)型別。第二步需要做的就是每次遇到一個v,將其合併為c。
          requireZeroOneBernoulliValues(v)
        } else {
          requireNonnegativeValues(v)
        }
        (1L, v.copy.toDense)  //這個是第一個函式的返回值,即將(c:Vector)=》(v:(Long,DenseVector))
      },

      //mergeValue指的是。當接下來遇到vector的時候,將vector合併到c中去。前提是他們的key必須相同。
      //過程如下:(c:(Long,DenseVector),v:Vector => (c:(Long,DenseVector))) 
      mergeValue = (c: (Long, DenseVector), v: Vector) => {
        requireNonnegativeValues(v) //判斷向量的是否符合條件
        BLAS.axpy(1.0, v, c._2)  //c._2=c._2+v
        (c._1 + 1L, c._2) //這個地方繼續返回一個元祖,其中對於第一個值進行加1操作。這裡的c._2表示的是c._2=c._2+v
      },


      //接下來對根據相同的key來合併多個c。
      //mergeCombiners。過程如下(c1:(Long, DenseVector),c2: (Long, DenseVector)) => c:(c:(Long,DenseVector))
      mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => {
        BLAS.axpy(1.0, c2._2, c1._2) //c1._2=c1._2+c2._2
        (c1._1 + c2._1, c1._2)  // 返回一個c。表示在一個key的情況下的每一個特徵的數量之和加上其向量之和。
      }
    ).collect().sortBy(_._1)  //有一個排序操作是根據key來排序


    val numLabels = aggregated.length //aggregated的長度表示的是類別標籤的個數

    var numDocuments = 0L  //這個表示文件的數量
    //注意這個aggredaged的格式。首先它是一個數組。陣列的元素的元祖的第二個值的格式為(Long,DenseVector)格式。
    //下面的這個n表示的是某一個特徵下的數量。這個方法主要是對其進行彙總。總文件的數量進行彙總

    aggregated.foreach { case (_, (n, _)) =>
      numDocuments += n
    }

    //這個是用於計算特徵的數量
    val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }

       //labels類別標籤的列表。
    val labels = new Array[Double](numLabels)

    //pi類別的先驗概率
    val pi = new Array[Double](numLabels)

    //這個表示theta這個特徵在各個類別下的概率。是個二維陣列,theta[i][j],i表示第i個類別,j表示第j個特徵。
    val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

    //計算總文件數量的對數值,用於計算theta值。在下面使用到。    
    val piLogDenom = math.log(numDocuments + numLabels * lambda)
    var i = 0
    aggregated.foreach { case (label, (n, sumTermFreqs)) =>

      //aggregated的每一行表示label,和計數還有其特徵向量之和。
      labels(i) = label //表示取出第一個標籤放在lebels(i)裡面。

      pi(i) = math.log(n + lambda) - piLogDenom //計算先驗概率,並取log。log((n+lamda)/(numDocuments+numlabels*lamda))

      val thetaLogDenom = modelType match {
        case Multinomial => 
        math.log(sumTermFreqs.values.sum + numFeatures * lambda) //多項式模型,比如說計算類a下的文章的總數
        case Bernoulli =>
         math.log(n + 2.0 * lambda) //貝努力模型,比如說用於計算類a下的文章的總數。
        case _ =>
          throw new UnknownError(s"Invalid modelType: $modelType.")
      }
      var j = 0 //每一個i類別下,都需要計算j類別的條件概率,每次j從0開始
      while (j < numFeatures) {
        //這個用於計算各個特徵在各個類別下的條件概率
        //表示的是類別i下這個特徵j的次數,除以總的出現次數
        theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
        j += 1
      }
      i += 1  //這裡面有兩層迴圈,i表示的是類別。j表示的是特徵。先從類別開始迴圈。然後在開始對特徵進行迴圈。
    }

     //最後生成模型。包括類別標籤列表,類別先驗概率,各個類別下每個特徵的條件概率,多項式和伯努利
    new NaiveBayesModel(labels, pi, theta, modelType)
  }
}

3 最後是貝葉斯分類模型 class NaiveBayesModel

訓練完成後,會生成貝葉斯分類模型。其包含如下引數:labels–類別標籤列表. pi–每個類別的先驗概率,theta–各個特徵在各個類別下的先驗概率,modelType–多項式或者是伯努利模型。
模型類主要包含一下的方法,即predict方法,load方法和save方法。
首先我們來看看其構造器:

class NaiveBayesModel private[spark] (
    @Since("1.0.0") val labels: Array[Double],  //這個表示的是labels陣列
    @Since("0.9.0") val pi: Array[Double],     //這個表示的是先驗概率陣列
    @Since("0.9.0") val theta: Array[Array[Double]], //這個表示theta這個特徵在各個類別下的概率。條件概率陣列,二維陣列。
    @Since("1.4.0") val modelType: String)  //這個表示類別
  extends ClassificationModel with Serializable with Saveable 

  private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =
    this(labels, pi, theta, NaiveBayes.Multinomial)

接下來是predict方法,預設的輸入引數的型別是RDD[Vector],這個方法內部呼叫的是predict(testData:Vector)方法。該方法的返回值是一個double型別。

//這個方法會將輸入的rdd轉換為向量,然後呼叫下一個predict方法
override def predict(testData: RDD[Vector]): RDD[Double] = {
    val bcModel = testData.context.broadcast(this)    //廣播一下模型,
    testData.mapPartitions { iter => //採用mappartition進行操作,對每一個分割槽進行操作。
      val model = bcModel.value //在每個分割槽裡面獲取廣播變數值,
      iter.map(model.predict) //對分割槽的每一個元素,呼叫predict方法。呼叫的是下面一個方法。
    }
  }

  override def predict(testData: Vector): Double = {
    modelType match {
      case Multinomial => //如果是多項式型別的,則呼叫multinomialCalculation
        labels(multinomialCalculation(testData).argmax) //注意這個地方,會呼叫求最大值,下面類似
      case Bernoulli =>   //如果是伯努利型別的,bernoulliCalculation
        labels(bernoulliCalculation(testData).argmax)
    }
  }

下面是兩個計算方法,分別是multinomialCalculation和bernoulliCalculation。

private def multinomialCalculation(testData: Vector) = {
    val prob = thetaMatrix.multiply(testData) //用條件概率矩陣,乘以樣本向量。 theta*testData
    BLAS.axpy(1.0, piVector, prob) //prob=1.0*piVector+prob (本來是相乘的,但是取log之後變成相加,結果是一樣的。)
    prob  //得到結果之後,去向量的最大值。
  }

  private def bernoulliCalculation(testData: Vector) = {
    testData.foreachActive((_, value) =>
      if (value != 0.0 && value != 1.0) { 如果不滿足條件的話
        throw new SparkException(
          s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
      }
    )
    val prob = thetaMinusNegTheta.get.multiply(testData) //用條件概率矩陣,乘以樣本向量。theta*testData
    BLAS.axpy(1.0, piVector, prob)  //prob=1.0*piVector+prob
    BLAS.axpy(1.0, negThetaSum.get, prob)
    prob
  }

接下來就是最基本的載入和儲存的方法了。分為save方法和load方法。

 def load(sc: SparkContext, path: String): NaiveBayesModel = {
      val sqlContext = SQLContext.getOrCreate(sc)
      // Load Parquet data.
      val dataRDD = sqlContext.read.parquet(dataPath(path))
      // Check schema explicitly since erasure makes it hard to use match-case for checking.
      checkSchema[Data](dataRDD.schema)
      val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
      assert(dataArray.length == 1, s"Unable to load NaiveBayesModel data from: ${dataPath(path)}")
      val data = dataArray(0)
      val labels = data.getAs[Seq[Double]](0).toArray
      val pi = data.getAs[Seq[Double]](1).toArray
      val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
      val modelType = data.getString(3)
      new NaiveBayesModel(labels, pi, theta, modelType)
    }
def save(sc: SparkContext, path: String, data: Data): Unit = {
      val sqlContext = SQLContext.getOrCreate(sc)
      import sqlContext.implicits._

      // Create JSON metadata.
      val metadata = compact(render(
        ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
          ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
      sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))

      // Create Parquet data.
      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
      dataRDD.write.parquet(dataPath(path))
    }

好了,謝謝大家,以上就是我對於spark mllib的樸素貝葉斯的程式碼的解析。