1. 程式人生 > >spark廈大---決策樹分類器 -- spark.ml

spark廈大---決策樹分類器 -- spark.ml

來源:http://mocom.xmu.edu.cn/article/show/58667ae3aa2c3f280956e7b0/0/1

一、方法簡介

​ 決策樹(decision tree)是一種基本的分類與迴歸方法,這裡主要介紹用於分類的決策樹。決策樹模式呈樹形結構,其中每個內部節點表示一個屬性上的測試,每個分支代表一個測試輸出,每個葉節點代表一種類別。學習時利用訓練資料,根據損失函式最小化的原則建立決策樹模型;預測時,對新的資料,利用決策樹模型進行分類。

二、基本原理

​ 決策樹學習通常包括3個步驟:特徵選擇、決策樹的生成和決策樹的剪枝。

(一)特徵選擇

​ 特徵選擇在於選取對訓練資料具有分類能力的特徵,這樣可以提高決策樹學習的效率。通常特徵選擇的準則是資訊增益(或資訊增益比、基尼指數等),每次計算每個特徵的資訊增益,並比較它們的大小,選擇資訊增益最大(資訊增益比最大、基尼指數最小)的特徵。下面我們重點介紹一下特徵選擇的準則:資訊增益。

​ 首先定義資訊理論中廣泛使用的一個度量標準——熵(entropy),它是表示隨機變數不確定性的度量。熵越大,隨機變數的不確定性就越大。而資訊增益(informational entropy)表示得知某一特徵後使得資訊的不確定性減少的程度。簡單的說,一個屬性的資訊增益就是由於使用這個屬性分割樣例而導致的期望熵降低。資訊增益、資訊增益比和基尼指數的具體定義如下:

資訊增益:特徵A對訓練資料集D的資訊增益g(D,A)g(D,A),定義為集合D的經驗熵H(D)H(D)與特徵A給定條件下D的經驗條件熵H(D|A)H(D|A)之差,即 g(D,A)=H(D)H(D|A)g(D,A)=H(D)−H(D|A)

資訊增益比:特徵A對訓練資料集D的資訊增益比gR(D,A)gR(D,A)定義為其資訊增益g(D,A)g(D,A)與訓練資料集D關於特徵A的值的熵HA(D)HA(D)之比,即 gR(D,A)=g(D,A)HA(D)gR(D,A)=g(D,A)HA(D)

其中,HA(D)=ni=1|Di||D|log2|Di||D|HA(D)=−∑i=1n|Di||D|log2|Di||D|,n是特徵A取值的個數。

基尼指數:分類問題中,假設有K個類,樣本點屬於第K類的概率為pkpk, 則概率分佈的基尼指數定義為:

Gini(p)=K1pk(1pk)Gini(p)=∑1Kpk(1−pk) =1

K1p2k=1−∑1Kpk2

(二)決策樹的生成

​ 從根結點開始,對結點計算所有可能的特徵的資訊增益,選擇資訊增益最大的特徵作為結點的特徵,由該特徵的不同取值建立子結點,再對子結點遞迴地呼叫以上方法,構建決策樹;直到所有特徵的資訊增均很小或沒有特徵可以選擇為止,最後得到一個決策樹。

​ 決策樹需要有停止條件來終止其生長的過程。一般來說最低的條件是:當該節點下面的所有記錄都屬於同一類,或者當所有的記錄屬性都具有相同的值時。這兩種條件是停止決策樹的必要條件,也是最低的條件。在實際運用中一般希望決策樹提前停止生長,限定葉節點包含的最低資料量,以防止由於過度生長造成的過擬合問題。

(三)決策樹的剪枝

​ 決策樹生成演算法遞迴地產生決策樹,直到不能繼續下去為止。這樣產生的樹往往對訓練資料的分類很準確,但對未知的測試資料的分類卻沒有那麼準確,即出現過擬合現象。解決這個問題的辦法是考慮決策樹的複雜度,對已生成的決策樹進行簡化,這個過程稱為剪枝。

​ 決策樹的剪枝往往通過極小化決策樹整體的損失函式來實現。一般來說,損失函式可以進行如下的定義: Ca(T)=C(T)+a|T|Ca(T)=C(T)+a|T| ​ 其中,T為任意子樹,C(T)C(T)​ 為對訓練資料的預測誤差(如基尼指數),|T||T|​ 為子樹的葉結點個數,a0a≥0​為引數,Ca(T)Ca(T)​ 為引數是aa​時的子樹T的整體損失,引數aa​權衡訓練資料的擬合程度與模型的複雜度。對於固定的aa​,一定存在使損失函式Ca(T)Ca(T)​ 最小的子樹,將其表示為TaTa​ 。當aa​大的時候,最優子樹TaTa​偏小;當aa​小的時候,最優子樹TaTa​偏大。

示例程式碼

​ 以iris資料集(https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data)為例進行分析。iris以鳶尾花的特徵作為資料來源,資料集包含150個數據集,分為3類,每類50個數據,每個資料包含4個屬性,是在資料探勘、資料分類中非常常用的測試集、訓練集。為了便於理解,我們這裡主要用後兩個屬性(花瓣的長度和寬度)來進行分類。

1. 匯入需要的包:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
2. 讀取資料,簡要分析:

​ 首先根據SparkContext來建立一個SQLContext,其中sc是一個已經存在的SparkContext;然後匯入sqlContext.implicits._來實現RDD到Dataframe的隱式轉換。

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@10d83860
scala> import sqlContext.implicits._
import sqlContext.implicits._

​ 讀取文字檔案,第一個map把每行的資料用“,”隔開,每行被分成了5部分,前4部分是鳶尾花的4個特徵,最後一部分是鳶尾花的分類。前面說到,這裡主要用後兩個屬性(花瓣的長度和寬度)來進行分類,所以在下一個map中我們獲取到這兩個屬性,儲存在Vector中。

scala> val observations=sc.textFile("G:/spark/iris.data").map(_.split(",")).map(p => Vectors.dense(p(2).toDouble, p(3).toDouble))
observations: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[14] at map at :37

​ 這裡用case class定義一個schema:Iris,Iris就是需要的資料的結構;然後讀取資料,建立一個Iris模式的RDD,然後轉化成dataframe;最後呼叫show()方法來檢視一下部分資料。

scala> case class Iris(features: Vector, label: String)
defined class Iris
scala> val data = sc.textFile("G:/spark/iris.data")
          |       .map(_.split(","))
          |       .map(p => Iris(Vectors.dense(p(2).toDouble, p(3).toDouble), p(4).toString()))
          |       .toDF()
data: org.apache.spark.sql.DataFrame = [features: vector, label: string]
scala> data.show()
+---------+-----------+
| features|      label|
+---------+-----------+
|[1.4,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.3,0.2]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.7,0.4]|Iris-setosa|
|[1.4,0.3]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.4,0.2]|Iris-setosa|
|[1.5,0.1]|Iris-setosa|
|[1.5,0.2]|Iris-setosa|
|[1.6,0.2]|Iris-setosa|
|[1.4,0.1]|Iris-setosa|
|[1.1,0.1]|Iris-setosa|
|[1.2,0.2]|Iris-setosa|
|[1.5,0.4]|Iris-setosa|
|[1.3,0.4]|Iris-setosa|
|[1.4,0.3]|Iris-setosa|
|[1.7,0.3]|Iris-setosa|
|[1.5,0.3]|Iris-setosa|
+---------+-----------+
only showing top 20 rows
3. 構建ML的pipeline

​ 分別獲取標籤列和特徵列,進行索引,並進行了重新命名。

scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_6033e13b0b2b
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(data)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_b5a8adea6903

​ 接下來,把資料集隨機分成訓練集和測試集,其中訓練集佔70%。

scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.DataFrame = [features: vector, label: string]

testData: org.apache.spark.sql.DataFrame = [features: vector, label: string]

​ 然後,設定決策樹的引數。這裡統一用setter的方法來設定,也可以用ParamMap來設定(具體的可以檢視spark mllib的官網)。這裡設定了用gini指數來進行特徵選擇,設定樹的最大深度為5,具體的可以設定的引數可以通過explainParams()來獲取,還能看已經設定的引數的結果。

scala> val dt = new DecisionTreeClassifier()
  |            .setLabelCol("indexedLabel")
  |            .setFeaturesCol("indexedFeatures")
  |            .setImpurity("gini")
  |            .setMaxDepth(5)
dt: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_16842f2bb6a7
scala> println("DecisionTreeClassifier parameters:\n" + dt.explainParams() + "\n")
DecisionTreeClassifier parameters:
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default:10)
featuresCol: features column name (default: features, current: indexedFeatures)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name (default: label, current: indexedLabel)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 5)
maxMemoryInMB: Maximum memory in MB allocated to histogram aggregation. (default: 256)
minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)
minInstancesPerNode: Minimum number of instances each child must have after split.  If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1. (default: 1)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
seed: random seed (default: 159147643)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold. (undefined)

​ 這裡設定一個labelConverter,目的是把預測的類別重新轉化成字元型的。

scala> val labelConverter = new IndexToString().
     |       setInputCol("prediction").
     |       setOutputCol("predictedLabel").
     |       setLabels(labelIndexer.labels)
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_5c051ba9ebd3

​ 構建pipeline,設定stage,然後呼叫fit()來訓練模型。

scala> val pipeline = new Pipeline().
     |       setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
pipeline: org.apache.spark.ml.Pipeline = pipeline_eb9649610b0f
// Fit the pipeline to training documents.
scala> val model = pipeline.fit(trainingData)
model: org.apache.spark.ml.PipelineModel = pipeline_eb9649610b0f

​ pipeline本質上是一個Estimator,當pipeline呼叫fit()的時候就產生了一個PipelineModel,本質上是一個Transformer。然後這個PipelineModel就可以呼叫transform()來進行預測,生成一個新的DataFrame,即利用訓練得到的模型對測試集進行驗證。

scala> val predictions = model.transform(testData)
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: string,
indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probabilit
y: vector, prediction: double, predictedLabel: string]

​ 最後可以輸出預測的結果,其中select選擇要輸出的列,collect獲取所有行的資料,用foreach把每行打印出來。

scala> predictions.
     |       select("predictedLabel", "label", "features").
     |       collect().
     |       foreach { case Row(predictedLabel: String, label: String, features:
 Vector) =>
     |         println(s"($label, $features) --> predictedLabel=$predictedLabel")
     |     }
(Iris-setosa, [1.2,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.3]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.1]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.9,0.2]) --> predictedLabel=Iris-setosa
(Iris-versicolor, [4.0,1.0]) --> predictedLabel=Iris-versicolor
... ...
4. 模型評估

​ 建立一個MulticlassClassificationEvaluator例項,用setter方法把預測分類的列名和真實分類的列名進行設定;然後計算預測準確率和錯誤率。

scala> val evaluator = new MulticlassClassificationEvaluator().
     |       setLabelCol("indexedLabel").
     |       setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mc Eval_4651752eb9e9
scala> val accuracy = evaluator.evaluate(predictions)
accuracy: Double = 0.96
scala> println("Test Error = " + (1.0 - accuracy))
Test Error = 0.040000000000000036

​ 從上面可以看到預測的準確性達到96%,接下來可以通過model來獲取我們訓練得到的決策樹模型。前面已經說過model是一個PipelineModel,因此可以通過呼叫它的stages來獲取模型,具體如下:

scala> val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
treeModel: org.apache.spark.ml.classification.DecisionTreeClassificationModel =
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
scala> println("Learned classification tree model:\n" + treeModel.toDebugString)
Learned classification tree model:
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
  If (feature 0 <= 1.9)
   Predict: 2.0
  Else (feature 0 > 1.9)
   If (feature 1 <= 1.7)
    If (feature 0 <= 4.9)
     If (feature 1 <= 1.6)
      Predict: 0.0
     Else (feature 1 > 1.6)
      Predict: 1.0
    Else (feature 0 > 4.9)
     If (feature 1 <= 1.6)
      Predict: 1.0
     Else (feature 1 > 1.6)
      Predict: 0.0
   Else (feature 1 > 1.7)
    If (feature 0 <= 4.8)
     Predict: 0.0
    Else (feature 0 > 4.8)
     Predict: 1.0