1. 程式人生 > >Spark ML下實現的多分類adaboost+naivebayes演算法在文字分類上的應用

Spark ML下實現的多分類adaboost+naivebayes演算法在文字分類上的應用

1. Naive Bayes演算法

樸素貝葉斯演算法算是生成模型中一個最經典的分類演算法之一了,常用的有Bernoulli和Multinomial兩種。在文字分類上經常會用到這兩種方法。在詞袋模型中,對於一篇文件$d$中出現的詞$w_0,w_1,...,w_n$, 這篇文章被分類為$c$的概率為$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c)}{p(w_0,w_1,...,w_n)}$$ 對於一篇給定文章,分母為常數,基於樸素貝葉斯的各詞在一篇文章中出現獨立性假設,最後我們需要比較的就是在不同類別$c$下$p(w_0|c)*p(w_1|c)*...*p(w_n|c)*p(c)$的大小。

naive bayes模型的引數就是在每個類別$c$下各詞出現的概率的$p(w_0|c),p(w_1|c),...,p(w_n|c))$和該類別出現的概率$p(c)$,引數的估計通常就是根據訓練樣本進行詞頻的統計並計算相應概率,其中$$p(c) = \frac{count(c)}{count(doc)}$$,即為訓練資料中c類別文章的總數量除以訓練集中文章的總數量。針對$p(w_i|c)$的估計,Bernoulli和Multinomial略有不同。

  • Bernoulli

  文章中某詞$ w_i$出現過,則記為1,所以$$p(w_i|c) = \frac{count(w_i,c)}{count(c)}$$ 即為在類別為c的訓練集文章中出現詞$w_i$的文章數量除以訓練集中為別為c的文章總數量

  • Multinomial

  這種情況下文章的詞並不是非0即1的one hot特徵,而是帶有權重的數值特徵,通常可以使用tf或者tf-idf值。$$p(w_i|c) = \frac{T_{ci}}{\sum_{t}{T_{ct}}}$$ 其中$T_{ci}$為類別c的訓練文章中詞$w_i$的所有權重和,$\sum_{t}{T_{ct}}$為類別c的文章中所有詞的權重之和。預測的時候對於詞$w_i$計算該詞在該文章中的權重$T_i$,使用$p(w_i|c)^{T_i}$作為連乘部分的概率。不過實際上經常使用對數概率,所以可以將指數運算變為乘法運算,在程式碼中就可以利用矩陣相乘直接計算。

還有一些細節問題,例如資料稀疏,平滑處理等因為不是本文的重點,這裡就不詳細解釋了。

2. Adaboost演算法

作為一種boosting方法,adaboost在很多演算法上都有著不俗的表現。不過在基於naive bayes的文件分類領域,貌似實際效果很一般。在stack overflow上也看到有人討論,說adaboost對於多個弱分類器的提升效果很不錯,但是naive bayes的文件分類通常已經有很不錯的表現了,提升效果一般。不過不管效果提升怎麼樣,實現一下試試也沒什麼壞處,順便還可以熟悉一下spark的相關操作。經典的adaboost演算法適用於二分類的情況,但是我們的文字是多分類的情況,依靠多個二分類器表決不失為一種方法,但是比較麻煩,好在找到了介紹多分類adaboost演算法的論文,照著論文依葫蘆畫瓢也不難。下面先分別多分類和二分類的adaboost

2.1 二分類adaboost

對於給定的二類分類的訓練資料集$$T = {(x_1, y_2),(x_2, y_2)...,(x_N, y_N)}$$ 其中每個$x$是一個樣本的特徵向量,$y\in\{-1, +1\}$,演算法流程如下:

  1. 初始化各個樣本的權重為$$D_1 = (w_{11}, w_{12}, ... , w_{1i}, ... , w_{1N}),  w_{1i} = \frac{1}{N}, i = 1, 2, ... , N$$
  2. 對於第m次迭代,$m = 1, 2, ..., M$:
    • 每次迭代使用帶有當前權重$D_m$的樣本進行訓練,得到一個基本分類器$G_m(x)$
    • 計算在分類器$D_m$下,訓練樣本分類結果的誤差率$$e_m = \sum^{N}_{i = 1}{w_{mi}I(G_m{x_i} \neq{y_i})}$$,因為每一步權重都做了歸一化,所以分母不用再除以樣本權重之和
    • 根據誤差率$e_m$計算分類器$D_m$的係數 $$\alpha_m = log\frac{1-e_m}{e_m}$$
    • 根據係數$\alpha_m$更新各樣本的權重$$D_{m+1} = {w_{m+1, 1}, w_{m+1, 2}, ... , w_{m+1, N}}$$ $$w_{m+1, i} = w_{m, i} * exp(\alpha_m * I(G_m{x_i} \neq{y_i}))$$
    • 對$D_{m+1}$做歸一化處理,使$\sum_{i = 1}^{N}{w_{m+1, i}} = 1$
  3. 最後對多個分類器$D_m$的結果進行加權表決,$$c(x) = argmax_k\sum_{m = 1}^{M}{\alpha_m*I(D_m(x) = k)}$$

注意到對於二分類的adaboost需要每次的分類誤差率$e_m \leq{\frac{1}{2}}$,否則的話將會導致$\alpha_m < 0$,然後樣本權重的更新將會朝著反方向進行。

2.2 多分類adaboost

對於K分類的情況,演算法基本與二分類的情況一致。但是要求每次的分類誤差率$e_m \leq{\frac{1}{2}}$是非常困難的,聯絡到二分類誤差率閾值選擇$\frac{1}{2}$,K分類的情況選擇誤差率為$\frac{K-1}{K}$,然後$\alpha_m$的計算改為 $$\alpha_m = log(\frac{1-e_m}{e_m}) + log(K-1)$$ 容易驗證只要$e_m \leq{\frac{K-1}{K}}$,則有$\alpha_m \geq{log(\frac{1-\frac{K-1}{K}}{\frac{K-1}{K}}) + log(K-1)} = log(\frac{1}{K-1}) + log(K-1) = 0$,這種情況下,多分類adaboost對於被誤分的樣本的側重加大了,因為$\alpha_m$因為添加了正項$log(K-1)$而增大了。

adaboost的一種解釋是模型為加法模型,損失函式為指數函式,學習演算法為前向分步演算法的分類演算法,這個以後再另外寫一篇。這裡給出一個比較直觀好懂的解釋:

  • 迭代過程中誤差率小的模型具有大的模型係數,也就是說表現好的子模型在最後加權的時候具有更大的“話語權”
  • 迭代過程中上一次被誤分的樣本在下一次訓練時將會具有更大的權重,更容易被分類正確

3. Spark ML的使用

提到Spark ml就不得不提Spark mllib,兩者的區別主要在於ml面向的資料是Dataset,而mllib面向的是rdd,Dataset相當於在底層rdd的基礎上做了進一步的優化。而且ml中一系列演算法更適合建立包含從資料清洗到特徵工程再到模型訓練等一系列工作的ML pipelines,這個類似於sklearn中的pipeline,非常簡潔好用。

pipeline中的Transformer,Estimator,Stage等概念官方文件上寫的很清楚,而且還有事例,就不在這裡解釋了。這裡以naive bayes為例簡單介紹一下怎麼利用spark ml的pipelines進行機器學習模型的訓練和預測。

首先是pipelines的建立:

 1   // pipeline for train
 2   def createPipiline(dataset: Dataset[_]): Pipeline = {
 3     // step 1 sentence 拆成 words
 4     val tokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern(",")
 5     // step 2 label 轉化為以0開始的labelIndex 為了適應spark.ml
 6     val indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndex").fit(dataset)
 7     // step3 統計tf詞頻
 8     val countModel = new CountVectorizer().setInputCol("words").setOutputCol("rawFeatures")
 9     // step4 tf-idf
10     val idfModel = new IDF().setInputCol("rawFeatures").setOutputCol("features")
11     // step5 normalize tf-idf vector
12     val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures")
13     // step6 naive bayes model
14     val naiveBayes = new NaiveBayes().setFeaturesCol("normalizedFeatures").setLabelCol("labelIndex").setWeightCol("obsWeights").setPredictionCol("prediction").setModelType("multinomial").setSmoothing(1.0)
15     // step7 predict label to real label
16     val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(indexer.labels)
17 
18     new Pipeline().setStages(Array(tokenizer, indexer, countModel, idfModel, normalizer, naiveBayes, labelConverter))
19   }

這裡注意到我在建立這個pipeline的時候還傳入了訓練資料,但是一般情況下訓練資料是在擬合模型而不是在模型建立的時候就提前傳入的。這裡是因為最後面那個labelConverter的transformer需要使用indexer.labels這個引數,而indexer要獲取這個引數就要提前擬合訓練資料,也就是indexer的建立發生在整個pipeline的擬合之前,所以我就先穿入了訓練資料集。注意到這裡訓練資料就相當於被訓練了兩次,所以可以先cache()操作一下。

pipeline建立好以後的使用就相對簡單多了,傳入資料就可以了。

1 val pipeline = ModelUsage.createPipeline(dataRDDTrain)
2 // train and test
3 val combinedModel = pipeline.fit(dataRDDTrain)
4 val predictResult = combinedModel.transform(dataRDDTest).select("predictedLabel", "label").rdd.map(row => (row.getDouble(0), row.getDouble(1)))
5 val evaluator = new MulticlassMetrics(predictResult)
6 
7 println("confusionMatrix:")
8 println(evaluator.confusionMatrix)
9 println(evaluator.accuracy)

注意到ML擬合的結果都是Double型別的,比如說我一個label是55但是輸出是55.0,評估模型準確度的時候注意一下就好,影響不大。

Spark ML的一個好處是資料dataset像水一樣通過預先建立好的pipeline,可以指定每一個stage處理的column名,再新增生成的資料到新的一列。自始至終,這些中間資料都在結果的dataset裡,想要哪些資料指定列名就可以了。這樣的話就避免了每次都要處理資料使它們符合中間模型的輸入結構,而且最後還要自己再整合需要的欄位到一起。

由於我們的文章資料特點比較鮮明,沒有任何引數調優,在4w(80% train 20% test)的四分類資料上就已經有了95%的正確率了。

4. 自定義擴充套件Spark ML

既然直接用現有naive bayes模型就已經有了95%的正確率,那要是加上adaboost呢?

直接實現adaboost演算法很簡單,但是畢竟spark ml的pipeline這麼好用,而dataset這麼好的封裝加上這麼多現有的類似StringIndxer等工具類transformer總不能全部重寫吧。所以就想到怎麼去自定義一個跟Spark ML相容的model,上網查了查發現了以下幾篇比較有用的文章。

Extend Spark ML for your own model/transformer types

spark的NaiveBayes實現原始碼

結合這兩篇文章的內容,在已有的naivebayes模型基礎上進行了改進實現了與Spark ML相容的adaboost naivebayes model。

注意:

1. 由於我們的模型需要先擬合訓練資料得到模型,隨後才能使用模型,這裡面分別涉及到estimator和transformer,因此我們需要分別實現這兩個部分。

2. 我要實現的adaboost+naivebayes模型是一個概率模型,因此我的Estimator和Transformer分別繼承自ProbabilisticClassifier和ProbabilisticClassificationModel,而不是最原始的Estimator和Transformer,這樣就減少了很多不必要的程式碼重寫,但是如果是想玩玩整整自己實現一個模型的話就要從最基本的一點點開始寫了,可以參照上面第一篇文章所講,這裡就不多細說了。

當然可能會有疑問,既然可以繼承ProbabilisticClassifier,那為什麼不直接整合NaiveBayes不是更簡單麼?我一開始也是這樣想的,但是發現Spark ML裡NaiveBayes裡大部分方法和屬性都是私有或者受保護的,我要改就得修改Spark原始碼,但是我的Spark程式是在公司伺服器執行的,總不能每次都讓公司用我改過之後的Spark包吧。。。

4.1 模型引數

首先,對於任何一個模型模型的訓練,我們一般都會需要傳遞一些引數,這裡利用scala的trait實現一個引數介面。

 1 trait AdaboostNaiveBayesParams extends Params {
 2   // 進行adaboost時的最高迭代次數
 3   final val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations")
 4   def getMaxIter: Int = $(maxIter)
 5   // 進行adaboost時準確率變化小於某個閾值時迭代提前終止
 6   final val threshold: DoubleParam = new DoubleParam(this, "threshold", "improvement threshold among iterations")
 7   def getThreshold: Double = $(threshold)
 8   // 樸素Bayes的平滑係數
 9   final val smoothing : DoubleParam = new DoubleParam(this, "smoothing", "naive bayes smooth")
10   def getSmoothing : Double = $(smoothing)
11   // 樸素Bayes型別"multinomial"(default) and "bernoulli"
12   final val modelType : Param[String] = new Param[String](this, "modelType", "naive bayes model type")
13   def getModelType : String = $(modelType)
14 }

這一部分沒什麼解釋的,都是一些模型常用引數。

4.2 模型Estimator

這一部分可以說是最重要的部分,Estimator擬合好了,Transformer基本屬於呼叫一下就好了。先貼程式碼,再一行行解釋。

  1 class AdaboostNaiveBayes(override val uid: String)
  2   extends ProbabilisticClassifier[Vector, AdaboostNaiveBayes, AdaboostNaiveBayesModel]
  3   with AdaboostNaiveBayesParams {
  4 
  5   def this() = this(Identifiable.randomUID("AdaboostNaiveBayes"))
  6 
  7   // model parameters assignment
  8   def setMaxIter(value: Int): this.type = set(maxIter, value)
  9   def setThreshold(value: Double): this.type = set(threshold, value)
 10   def setSmoothing(value: Double): this.type = set(smoothing, value)
 11   def setModelType(value: String): this.type = set(modelType, value)
 12 
 13   setMaxIter(20)
 14   setThreshold(0.02)
 15   setSmoothing(1.0)
 16   setModelType("multinomial")
 17 
 18   // method used by fit()
 19   override protected def train(dataset: Dataset[_]): AdaboostNaiveBayesModel = {
 20 
 21     val datasetSize = dataset.count().toInt
 22     val labelSize = dataset.select("label").distinct().count()
 23 
 24     // 各子模型及其權重
 25     val modelWeights = new Array[Double]($(maxIter))
 26     val modelArray = new Array[NaiveBayesModel]($(maxIter))
 27 
 28     var alpha = 0.0
 29     // 初始化各樣本等權重
 30     val dataWeight: (Double, Double, Double) => Double = (obsWeight: Double, labelIndex: Double, prediction: Double) => {
 31       if (labelIndex == prediction) {
 32         obsWeight
 33       }
 34       else {
 35         obsWeight * math.exp(alpha)
 36       }
 37     }
 38     val sqlfunc = udf(dataWeight)
 39     // 初始化還沒有prediction
 40     var temp = dataset.withColumn("obsWeights", lit(1.0))
 41     var i = 0
 42     var error1 = 2.0
 43     var error2 = 1.0// && (error1 - error2) > $(threshold)
 44     var weightSum = datasetSize.toDouble*datasetSize
 45 
 46     while (i < $(maxIter)) {
 47       val naiveBayes = new NaiveBayes().setFeaturesCol($(featuresCol)).setLabelCol($(labelCol)).setWeightCol("obsWeights")
 48         .setPredictionCol($(predictionCol)).setModelType($(modelType)).setSmoothing($(smoothing)).fit(temp)
 49       temp = naiveBayes.transform(temp).cache() 
 50 
 51       var error = temp.select("labelIndex", "prediction", "obsWeights").rdd.map(row => {
 52         if (row(0) != row(1))
 53           row.getDouble(2)
 54         else
 55           0.0
 56       }
 57       ).sum()/(datasetSize)
 58       val t5 = System.nanoTime()
 59       error1 = error2
 60       error2 = error
 61       alpha = Math.log((labelSize - 1) * (1 - error) / error)
 62 
 63       modelWeights(i) = alpha
 64       modelArray(i) = naiveBayes
 65       // 更新權重
 66       temp = temp.withColumn("obsWeights", sqlfunc(col("obsWeights"), col($(labelCol)), col($(predictionCol))));
 67       weightSum = temp.select("obsWeights").rdd.map(row => (row.getDouble(0))).sum()
 68       temp = temp.drop($(predictionCol), $(rawPredictionCol), $(probabilityCol))
 69       temp = temp.withColumn("obsWeights", col("obsWeights")/(weightSum/datasetSize)) 70 
 71       i += 1
 72     }
 73 
 74     new AdaboostNaiveBayesModel(uid, i, modelWeights, modelArray)
 75   }
 76 
 77   override def copy(extra: ParamMap): AdaboostNaiveBayes = defaultCopy(extra)
 78 }

1-3行是繼承ProbabilisticClassifer和實現前面我們自己定義的AdaboostNaiveBayesParam引數介面,ProbabilisticClassifer的繼承使用看看原始碼裡NaiveBayes是怎麼做的就可以照著學了。

5行是一個最基本的建構函式,分配給物件一個id值

77行是一個拷貝建構函式,這個必須要實現,最簡單的可以直接像這裡一樣呼叫defaultCopy函式就好了。這個函式用來在引入新的引數的時候複製當前stage返回加入新引數後的一個新模型

8-11行是給模型設定初始引數用的,這幾個函式沒有定義在AdaboostNaiveBayesParam裡是因為這些引數的傳入只發生在模型擬合前,在預測的時候是不能設定的,所以對後面的Transformer應該是不可見的,因此只在這裡定義。注意到這些函式的返回型別和模型型別一致,其實就是每一步都返回一個加入的引數的新的模型,這裡就利用了之前的拷貝建構函式。

13-16行是給模型設定預設引數。

19行開始的train函式就是我們在對模型呼叫fit方法時使用的函式。返回的是一個AdaboostNaiveBayesModel,是我們隨後需要定義的跟AdaboostNaiveBayes這個Estimator對應的Transformer。

21-22行分別獲取資料集的數量和其中label的數量

25-26是初始化所有子模型及其權重,因為adaboost每一次迭代都會生成一個新的模型並計算該模型在最終結果投票時的權重。

30-38是一個自定義udf函式,對每個樣本計算預測的label和真實label,並根據該樣本的現有權重obsWeight進行更新,可以理解為如果分類正確,其權重不變,否則增大其權重。

40行 初始化所有樣本為等權重,如果樣本資料非常不平衡的話,可以嘗試在這一步就引入偏差權重,我由於使用的資料各個類之間數量是一樣的,所以全部初始話為1

41-44行初始化一些錯誤率等引數

46行開始進行adaboost迭代過程。

47-49行是在當前樣本權重情況下呼叫普通的NaiveBayes進行訓練的到當前迭代下的子模型

49行這個cache一定不能少,否則迭代的速度只能呵呵了,畢竟temp用到了非常多次的action。

51-57行是計算該模型的錯誤率

59-61行是更新誤差,並計算該模型的權重alpha

63-64行是儲存當前子模型和權重

66-69行是利用之前定義的udf函式更新所有樣本的權重並對其進行歸一化

74 行是利用計算得到的引數去構建一個AdaboostNaiveBayesModel,這裡傳入所有的子模型及其權重,i表示的是總迭代次數,就是子模型的數量。

4.3 模型Transformer

這裡要實現的AdaboostNaiveBayesModel是從ProbabilisticClassificationModel,因此要手動實現對應的必須要的幾個方法。

程式碼如下:

 1 class AdaboostNaiveBayesModel(override val uid: String, val iternums: Int, val modelWeights: Array[Double], val modelArray: Array[NaiveBayesModel])
 2   extends ProbabilisticClassificationModel[Vector, AdaboostNaiveBayesModel]
 3   with AdaboostNaiveBayesParams {
 4 
 5   override val numClasses = modelArray(0).pi.size
 6 
 7   private def multinomialCalculation(features: Vector): Vector = {
 8     val result: Vector = new DenseVector(new Array(numClasses))
 9     for (i <- 0 until iternums) {
10       val prob: Vector = modelArray(i).theta.multiply(features)
11       prob.foreachActive { (index, value) => {
12         prob.toArray(index) = value + modelArray(i).pi(index)
13         }
14       }
15       result.toArray(prob.argmax) = result(prob.argmax) + modelWeights(i)
16     }
17     result
18   }
19 
20   override def predictRaw(features: Vector): Vector = {
21     multinomialCalculation(features)
22   }
23 
24   override def raw2probabilityInPlace(rawPrediction: Vector): Vector = {
25     rawPrediction match {
26       case dv: DenseVector =>
27         var i = 0
28         val size = dv.size
29         val maxLog = dv.values.max
30         for (i <- 0 until size) {
31           dv.values(i) = math.exp(dv.values(i) - maxLog)
32         }
33         val probSum = dv.values.sum
34 
35         for (i <- 0 until size) {
36           dv.values(i) = dv.values(i) / probSum
37         }
38         dv
39       case sv: SparseVector =>
40         throw new RuntimeException("Unexpected error in AdaboostNaiveBayesModel:" +
41         " raw2probabilityInPlace encountered SparseVector")
42     }
43   }
44  override def copy(extra: ParamMap) = {
45   defaultCopy(extra)
46  }
47 }

第5行是讀取一下總的標籤個數以供後面使用

44-46行是拷貝建構函式

20-22行是對一個輸入計算它在各個label下的得分,這個得分的大小表示的是判斷到該標籤概率的大小,但是並不是概率值,因為我們的BayesModel模型引數是做了log變換的

24-43行是怎麼講結果向量轉化為和為1的概率值,30-32行是個小技巧,我一開始好奇為什麼一定要減掉maxLog,因為這個按理說並不會影響到後面的計算結果,後來發現這樣能避免浮點數的問題,因為不減的話,會出現求完math.exp後值約為零的情況,導致後面的計算出現問題

這樣就完成了概率模型需要的幾個方法了,可以對一個輸入給出一個概率向量,每個維度代表在這個類的概率。

5. 寫在最後

利用自定義的adaboost+naivebayes模型,測試準確率從95%增加到了96.5%左右。由於訓練資料比較好,95%已經很不錯了,這裡主要是通過寫一個自定義模型學習一下Spark ML方面的知識。之前都只是聽說過,從來沒用過,學習一下還是很有必要的,畢竟不能總指望著單機就能搞定所有問題。

不過注意到這裡我並不是從0開始造輪子,我是從ProbabilisticClassification繼承過來加以修改的,如果想要做其他模型的修改還是推薦看上面的兩篇文章,然後多看看Spark ML原始碼裡類似的模型並根據自己的需要進行修改。

然後scala也是為了用Spark ML現學的,程式碼可以優化的地方估計很多。

這是本人第一篇部落格,希望以後可以堅持寫,作為對自己工作學習的總結筆記。