1. 程式人生 > >spark ML 中 VectorIndexer, StringIndexer等用法

spark ML 中 VectorIndexer, StringIndexer等用法

VectorIndexer

主要作用:提高決策樹或隨機森林等ML方法的分類效果。
VectorIndexer是對資料集特徵向量中的類別(離散值)特徵(index categorical features categorical features )進行編號。
它能夠自動判斷那些特徵是離散值型的特徵,並對他們進行編號,具體做法是通過設定一個maxCategories,特徵向量中某一個特徵不重複取值個數小於maxCategories,則被重新編號為0~K(K<=maxCategories-1)。某一個特徵不重複取值個數大於maxCategories,則該特徵視為連續值,不會重新編號(不會發生任何改變)。結合例子看吧,實在太繞了。

    VectorIndexer helps index categorical features in datasets of Vectors. It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:

    Take an input column of type Vector and a parameter maxCategories. Decide which features should be categorical based on the number of distinct values, where features with at most maxCategories are declared categorical.
Compute 0-based category indices for each categorical feature.
Index categorical features and transform original feature values to indices.

    Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.

    This transformed data could then be passed to algorithms such as DecisionTreeRegressor that handle categorical features.

用一個簡單的資料集舉例如下:

//定義輸入輸出列和最大類別數為5,某一個特徵
//(即某一列)中多於5個取值視為連續值
VectorIndexerModel featureIndexerModel=new VectorIndexer()
                 .setInputCol("features")
                 .setMaxCategories(5)
                 .setOutputCol("indexedFeatures")
                 .fit(rawData);
//加入到Pipeline
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
pipeline.fit(rawData).transform(rawData).select("features","indexedFeatures").show(20,false);
//顯示如下的結果:        
+-------------------------+-------------------------+
|features                 |indexedFeatures          |
+-------------------------+-------------------------+
|(3,[0,1,2],[2.0,5.0,7.0])|(3,[0,1,2],[2.0,1.0,1.0])|
|(3,[0,1,2],[3.0,5.0,9.0])|(3,[0,1,2],[3.0,1.0,2.0])|
|(3,[0,1,2],[4.0,7.0,9.0])|(3,[0,1,2],[4.0,3.0,2.0])|
|(3,[0,1,2],[2.0,4.0,9.0])|(3,[0,1,2],[2.0,0.0,2.0])|
|(3,[0,1,2],[9.0,5.0,7.0])|(3,[0,1,2],[9.0,1.0,1.0])|
|(3,[0,1,2],[2.0,5.0,9.0])|(3,[0,1,2],[2.0,1.0,2.0])|
|(3,[0,1,2],[3.0,4.0,9.0])|(3,[0,1,2],[3.0,0.0,2.0])|
|(3,[0,1,2],[8.0,4.0,9.0])|(3,[0,1,2],[8.0,0.0,2.0])|
|(3,[0,1,2],[3.0,6.0,2.0])|(3,[0,1,2],[3.0,2.0,0.0])|
|(3,[0,1,2],[5.0,9.0,2.0])|(3,[0,1,2],[5.0,4.0,0.0])|
+-------------------------+-------------------------+
結果分析:特徵向量包含3個特徵,即特徵0,特徵1,特徵2。如Row=1,對應的特徵分別是2.0,5.0,7.0.被轉換為2.0,1.0,1.0。
我們發現只有特徵1,特徵2被轉換了,特徵0沒有被轉換。這是因為特徵0有6中取值(2,3,4,5,8,9),多於前面的設定setMaxCategories(5)
,因此被視為連續值了,不會被轉換。
特徵1中,(4,5,6,7,9)-->(0,1,2,3,4,5)
特徵2中,  (2,7,9)-->(0,1,2)

輸出DataFrame格式說明(Row=1):
3個特徵 特徵0,1,2      轉換前的值  
|(3,    [0,1,2],      [2.0,5.0,7.0])
3個特徵 特徵1,1,2       轉換後的值
|(3,    [0,1,2],      [2.0,1.0,1.0])|

StringIndexer

理解了前面的VectorIndexer之後,StringIndexer對資料集的label進行重新編號就很容易理解了,都是採用類似的轉換思路,看下面的例子就可以了。

//定義一個StringIndexerModel,將label轉換成indexedlabel
StringIndexerModel labelIndexerModel=new StringIndexer().
                setInputCol("label")
                .setOutputCol("indexedLabel")
                .fit(rawData);
//加labelIndexerModel加入到Pipeline中
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
//檢視結果
pipeline.fit(rawData).transform(rawData).select("label","indexedLabel").show(20,false);

按label出現的頻次,轉換成0~num numOfLabels-1(分類個數),頻次最高的轉換為0,以此類推:
label=3,出現次數最多,出現了4次,轉換(編號)為0
其次是label=2,出現了3次,編號為1,以此類推
+-----+------------+
|label|indexedLabel|
+-----+------------+
|3.0  |0.0         |
|4.0  |3.0         |
|1.0  |2.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|1.0  |2.0         |
+-----+------------+

在其它地方應用StringIndexer時還需要注意兩個問題:
(1)StringIndexer本質上是對String型別–>index( number);如果是:數值(numeric)–>index(number),實際上是對把數值先進行了型別轉換( cast numeric to string and then index the string values.),也就是說無論是String,還是數值,都可以重新編號(Index);
(2)利用獲得的模型轉化新資料集時,可能遇到異常情況,見下面例子。

StringIndexer對String按頻次進行編號
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0
 如果轉換模型(關係)是基於上面資料得到的 (a,b,c)->(0.0,2.0,1.0),如果用此模型轉換category多於(a,b,c)的資料,比如多了d,e,就會遇到麻煩:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | d        | ?
 3  | e        | ?
 4  | a        | 0.0
 5  | c        | 1.0
 Spark提供了兩種處理方式:
 StringIndexerModel labelIndexerModel=new StringIndexer().
                setInputCol("label")
                .setOutputCol("indexedLabel")
                //.setHandleInvalid("error")
                .setHandleInvalid("skip")
                .fit(rawData);
 (1)預設設定,也就是.setHandleInvalid("error"):會丟擲異常
 org.apache.spark.SparkException: Unseen label: d,e
 (2).setHandleInvalid("skip") 忽略這些label所在行的資料,正常執行,將輸出如下結果:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 4  | a        | 0.0
 5  | c        | 1.0

IndexToString

相應的,有StringIndexer,就應該有IndexToString。在應用StringIndexer對labels進行重新編號後,帶著這些編號後的label對資料進行了訓練,並接著對其他資料進行了預測,得到預測結果,預測結果的label也是重新編號過的,因此需要轉換回來。見下面例子,轉換回來的convetedPrediction才和原始的label對應。

         Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. A common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString. 
IndexToString converter=new IndexToString()
                .setInputCol("prediction")//Spark預設預測label行
                .setOutputCol("convetedPrediction")//轉換回來的預測label
                .setLabels(labelIndexerModel.labels());//需要指定前面建好相互相互模型
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
pipeline.fit(rawData).transform(rawData)
        .select("label","prediction","convetedPrediction").show(20,false);  
|label|prediction|convetedPrediction|
+-----+----------+------------------+
|3.0  |0.0       |3.0               |
|4.0  |1.0       |2.0               |
|1.0  |2.0       |1.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|1.0  |2.0       |1.0               |
+-----+----------+------------------+

離散<->連續特徵或Label相互轉換

oneHotEncoder

獨熱編碼將類別特徵(離散的,已經轉換為數字編號形式),對映成獨熱編碼。這樣在諸如Logistic迴歸這樣需要連續數值值作為特徵輸入的分類器中也可以使用類別(離散)特徵。

獨熱編碼即 One-Hot 編碼,又稱一位有效編碼,其方法是使用N位 狀態寄存
器來對N個狀態進行編碼,每個狀態都由他獨立的暫存器 位,並且在任意
時候,其 中只有一位有效。
例如: 自然狀態碼為:000,001,010,011,100,101
獨熱編碼為:000001,000010,000100,001000,010000,100000
可以這樣理解,對於每一個特徵,如果它有m個可能值,那麼經過獨 熱編碼
後,就變成了m個二元特徵。並且,這些特徵互斥,每次只有 一個啟用。因
此,資料會變成稀疏的。
這樣做的好處主要有:
解決了分類器不好處理屬性資料的問題
在一定程度上也起到了擴充特徵的作用

One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.

//onehotencoder前需要轉換為string->numerical
        Dataset indexedDf=new StringIndexer()
                        .setInputCol("category")
                        .setOutputCol("indexCategory")
                        .fit(df)
                        .transform(df);
        //對隨機分佈的類別進行OneHotEncoder,轉換後可以當成連續數值輸入
        Dataset coderDf=new OneHotEncoder()
                        .setInputCol("indexCategory")
                        .setOutputCol("ontHotCategory")//不需要fit                     
                        .transform(indexedDf);

Bucketizer

分箱(分段處理):將連續數值轉換為離散類別
比如特徵是年齡,是一個連續數值,需要將其轉換為離散類別(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。
分類的標準是自己定義的,在Spark中為split引數,定義如下:
double[] splits = {0, 18, 35,50, Double.PositiveInfinity}
將數值年齡分為四類0-18,18-35,35-50,55+四個段。
如果左右邊界拿不準,就設定為,Double.NegativeInfinity, Double.PositiveInfinity,不會有錯的。

Bucketizer transforms a column of continuous features to a column of
feature buckets, where the buckets are specified by users.

//
double[] splits={0,18,35,55,Double.POSITIVE_INFINITY};Dataset bucketDf=new Bucketizer()
             .setInputCol("ages")
             .setOutputCol("bucketCategory")
             .setSplits(splits)//設定分段標準
             .transform(df);
//輸出
/*
+---+----+--------------+
|id |ages|bucketCategory|
+---+----+--------------+
|0.0|2.0 |0.0           |
|1.0|67.0|3.0           |
|2.0|36.0|2.0           |
|3.0|14.0|0.0           |
|4.0|5.0 |0.0           |
|5.0|98.0|3.0           |
|6.0|65.0|3.0           |
|7.0|23.0|1.0           |
|8.0|37.0|2.0           |
|9.0|76.0|3.0           |
+---+----+--------------+

*/

QuantileDiscretizer

分位樹為數離散化,和Bucketizer(分箱處理)一樣也是:將連續數值特徵轉換為離散類別特徵。實際上Class QuantileDiscretizer extends (繼承自) Class(Bucketizer)。

引數1:不同的是這裡不再自己定義splits(分類標準),而是定義分幾箱(段)就可以了。QuantileDiscretizer自己呼叫函式計算分位數,並完成離散化。
-引數2: 另外一個引數是精度,如果設定為0,則計算最精確的分位數,這是一個高時間代價的操作。另外上下邊界將設定為正負無窮,覆蓋所有實數範圍。

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.

new QuantileDiscretizer()
             .setInputCol("ages")
             .setOutputCol("qdCategory")
             .setNumBuckets(4)//設定分箱數
             .setRelativeError(0.1)//設定precision-控制相對誤差
             .fit(df)
             .transform(df)
             .show(10,false);    
//例子:
+---+----+----------+
|id |ages|qdCategory|
+---+----+----------+
|0.0|2.0 |0.0       |
|1.0|67.0|3.0       |
|2.0|36.0|2.0       |
|3.0|14.0|1.0       |
|4.0|5.0 |0.0       |
|5.0|98.0|3.0       |
|6.0|65.0|2.0       |
|7.0|23.0|1.0       |
|8.0|37.0|2.0       |
|9.0|76.0|3.0       |
+---+----+----------+