1. 程式人生 > >Spark MLlib 資料預處理-特徵變換(二)

Spark MLlib 資料預處理-特徵變換(二)

作者:劉玲源
連結:https://zhuanlan.zhihu.com/p/24069545
來源:知乎
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。

演算法介紹:

VectorIndexer解決資料集中的類別特徵Vector。它可以自動識別哪些特徵是類別型的,並且將原始值轉換為類別指標。它的處理流程如下:

1.獲得一個向量型別的輸入以及maxCategories引數。

2.基於原始數值識別哪些特徵需要被類別化,其中最多maxCategories需要被類別化。

3.對於每一個類別特徵計算0-based類別指標。

4.對類別特徵進行索引然後將原始值轉換為指標。

索引後的類別特徵可以幫助決策樹等演算法處理類別型特徵,並得到較好結果。

在下面的例子中,我們讀入一個數據集,然後使用VectorIndexer來決定哪些特徵需要被作為非數值型別處理,將非數值型特徵轉換為他們的索引。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed"
) .setMaxCategories(10) val indexerModel = indexer.fit(data) val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", ")) // Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data) indexedData.show()

Java:

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();

Python:

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
Normalizer(正則化)

演算法介紹:

Normalizer是一個轉換器,它可以將多行向量輸入轉化為統一的形式。引數為p(預設值:2)來指定正則化中使用的p-norm。正則化操作可以使輸入資料標準化並提高後期學習演算法的效果。

下面的例子展示如何讀入一個libsvm格式的資料,然後將每一行轉換為\[{L^2}\]以及\[{L^\infty }\]形式。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.Normalizer

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()

Java:

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();

Python:

from pyspark.ml.feature import Normalizer

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()
StandardScaler

演算法介紹:

StandardScaler處理Vector資料,標準化每個特徵使得其有統一的標準差以及(或者)均值為零。它需要如下引數:

1. withStd:預設值為真,使用統一標準差方式。

2. withMean:預設為假。此種方法將產出一個稠密輸出,所以不適用於稀疏輸入。

StandardScaler是一個Estimator,它可以fit資料集產生一個StandardScalerModel,用來計算彙總統計。然後產生的模可以用來轉換向量至統一的標準差以及(或者)零均值特徵。

注意如果特徵的標準差為零,則該特徵在向量中返回的預設值為0.0。

下面的示例展示如果讀入一個libsvm形式的資料以及返回有統一標準差的標準化特徵。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler

演算法介紹:

MinMaxScaler通過重新調節大小將Vector形式的列轉換到指定的範圍內,通常為[0,1],它的引數有:

1. min:預設為0.0,為轉換後所有特徵的下邊界。

2. max:預設為1.0,為轉換後所有特徵的下邊界。

MinMaxScaler計算資料集的彙總統計量,併產生一個MinMaxScalerModel。該模型可以將獨立的特徵的值轉換到指定的範圍內。

對於特徵E來說,調整後的特徵值如下:

\[{\mathop{\rm Re}\nolimits} scaled({e_i}) = \frac{{{e_i} - {E_{\min }}}}{{{E_{\max }} - {E_{\min }}}}*(\max  - \min ) + \min \]

如果{E_{\max }} =  &  = {E_{\min }},則{\mathop{\rm Re}\nolimits} scaled = 0.5*(\max  - \min )

注意因為零值轉換後可能變為非零值,所以即便為稀疏輸入,輸出也可能為稠密向量。

下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[0,1]之間。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.MinMaxScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:

from pyspark.ml.feature import MinMaxScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MaxAbsScaler

演算法介紹:

MaxAbsScaler使用每個特徵的最大值的絕對值將輸入向量的特徵值轉換到[-1,1]之間。因為它不會轉移/集中資料,所以不會破壞資料的稀疏性。

下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[-1,1]之間。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.MaxAbsScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");
MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:

from pyspark.ml.feature import MaxAbsScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Bucketizer

演算法介紹:

Bucketizer將一列連續的特徵轉換為特徵區間,區間由使用者指定。引數如下:

1. splits:分裂數為n+1時,將產生n個區間。除了最後一個區間外,每個區間範圍[x,y]由分裂的x,y決定。分裂必須是嚴格遞增的。在分裂指定外的值將被歸為錯誤。兩個分裂的例子為Array(Double.NegativeInfinity,0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)。

注意,當不確定分裂的上下邊界時,應當新增Double.NegativeInfinity和Double.PositiveInfinity以免越界。

下面將展示Bucketizer的使用方法。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()

Java:

import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();

Python:

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
ElementwiseProduct

演算法介紹:

ElementwiseProduct按提供的“weight”向量,返回與輸入向量元素級別的乘積。即是說,按提供的權重分別對輸入資料進行縮放,得到輸入向量v以及權重向量w的Hadamard積。

\left( \begin{array}{l}{v_1}\\...\\{v_n}\end{array} \right)*\left( \begin{array}{l}{w_1}\\...\\{w_n}\end{array} \right) = \left( \begin{array}{l}{v_1}{w_1}\\...\\{v_n}{w_n}\end{array} \right)

下面例子展示如何通過轉換向量的值來調整向量。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()

Java:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();

Python:

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
SQLTransformer

演算法介紹:

SQLTransformer工具用來轉換由SQL定義的陳述。目前僅支援SQL語法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表輸入資料的基礎表。選擇語句指定輸出中展示的欄位、元素和表示式,支援Spark SQL中的所有選擇語句。使用者可以基於選擇結果使用Spark SQL建立方程或者使用者自定義函式。SQLTransformer支援語法示例如下:

1. SELECTa, a + b AS a_b FROM __THIS__

2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5

3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

示例:

假設我們有如下DataFrame包含id,v1,v2列:
id | v1 | v2

----|-----|-----

0 | 1.0 | 3.0

2 | 2.0 | 5.0

使用SQLTransformer語句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"轉換後得到輸出如下:

id | v1 | v2 | v3 | v4

----|-----|-----|-----|-----

0 | 1.0| 3.0 | 4.0 | 3.0

2 | 2.0| 5.0 | 7.0 |10.0

呼叫示例:

Scala:

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
 sqlTrans.transform(df).show()

Java:

import java.util.Arrays;
import java.util.List