Spark MLlib 資料預處理-特徵變換(二)
連結:https://zhuanlan.zhihu.com/p/24069545
來源:知乎
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。
演算法介紹:
VectorIndexer解決資料集中的類別特徵Vector。它可以自動識別哪些特徵是類別型的,並且將原始值轉換為類別指標。它的處理流程如下:
1.獲得一個向量型別的輸入以及maxCategories引數。
2.基於原始數值識別哪些特徵需要被類別化,其中最多maxCategories需要被類別化。
3.對於每一個類別特徵計算0-based類別指標。
4.對類別特徵進行索引然後將原始值轉換為指標。
索引後的類別特徵可以幫助決策樹等演算法處理類別型特徵,並得到較好結果。
在下面的例子中,我們讀入一個數據集,然後使用VectorIndexer來決定哪些特徵需要被作為非數值型別處理,將非數值型特徵轉換為他們的索引。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed" )
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
categoricalFeatures.mkString(", "))
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
Java:
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
Python:
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
Normalizer(正則化)
演算法介紹:
Normalizer是一個轉換器,它可以將多行向量輸入轉化為統一的形式。引數為p(預設值:2)來指定正則化中使用的p-norm。正則化操作可以使輸入資料標準化並提高後期學習演算法的效果。
下面的例子展示如何讀入一個libsvm格式的資料,然後將每一行轉換為以及形式。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.Normalizer
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()
Java:
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
Python:
from pyspark.ml.feature import Normalizer
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()
StandardScaler
演算法介紹:
StandardScaler處理Vector資料,標準化每個特徵使得其有統一的標準差以及(或者)均值為零。它需要如下引數:
1. withStd:預設值為真,使用統一標準差方式。
2. withMean:預設為假。此種方法將產出一個稠密輸出,所以不適用於稀疏輸入。
StandardScaler是一個Estimator,它可以fit資料集產生一個StandardScalerModel,用來計算彙總統計。然後產生的模可以用來轉換向量至統一的標準差以及(或者)零均值特徵。
注意如果特徵的標準差為零,則該特徵在向量中返回的預設值為0.0。
下面的示例展示如果讀入一個libsvm形式的資料以及返回有統一標準差的標準化特徵。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Java:
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
Python:
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler
演算法介紹:
MinMaxScaler通過重新調節大小將Vector形式的列轉換到指定的範圍內,通常為[0,1],它的引數有:
1. min:預設為0.0,為轉換後所有特徵的下邊界。
2. max:預設為1.0,為轉換後所有特徵的下邊界。
MinMaxScaler計算資料集的彙總統計量,併產生一個MinMaxScalerModel。該模型可以將獨立的特徵的值轉換到指定的範圍內。
對於特徵E來說,調整後的特徵值如下:
如果,則。
注意因為零值轉換後可能變為非零值,所以即便為稀疏輸入,輸出也可能為稠密向量。
下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[0,1]之間。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.MinMaxScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Java:
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
Python:
from pyspark.ml.feature import MinMaxScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MaxAbsScaler
演算法介紹:
MaxAbsScaler使用每個特徵的最大值的絕對值將輸入向量的特徵值轉換到[-1,1]之間。因為它不會轉移/集中資料,所以不會破壞資料的稀疏性。
下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[-1,1]之間。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.MaxAbsScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Java:
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
Python:
from pyspark.ml.feature import MaxAbsScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Bucketizer
演算法介紹:
Bucketizer將一列連續的特徵轉換為特徵區間,區間由使用者指定。引數如下:
1. splits:分裂數為n+1時,將產生n個區間。除了最後一個區間外,每個區間範圍[x,y]由分裂的x,y決定。分裂必須是嚴格遞增的。在分裂指定外的值將被歸為錯誤。兩個分裂的例子為Array(Double.NegativeInfinity,0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)。
注意,當不確定分裂的上下邊界時,應當新增Double.NegativeInfinity和Double.PositiveInfinity以免越界。
下面將展示Bucketizer的使用方法。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
Java:
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();
Python:
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
ElementwiseProduct
演算法介紹:
ElementwiseProduct按提供的“weight”向量,返回與輸入向量元素級別的乘積。即是說,按提供的權重分別對輸入資料進行縮放,得到輸入向量v以及權重向量w的Hadamard積。
下面例子展示如何通過轉換向量的值來調整向量。
呼叫示例:
Scala:
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
Java:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
Python:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
SQLTransformer
演算法介紹:
SQLTransformer工具用來轉換由SQL定義的陳述。目前僅支援SQL語法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表輸入資料的基礎表。選擇語句指定輸出中展示的欄位、元素和表示式,支援Spark SQL中的所有選擇語句。使用者可以基於選擇結果使用Spark SQL建立方程或者使用者自定義函式。SQLTransformer支援語法示例如下:
1. SELECTa, a + b AS a_b FROM __THIS__
2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
示例:
假設我們有如下DataFrame包含id,v1,v2列:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
使用SQLTransformer語句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"轉換後得到輸出如下:
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0| 3.0 | 4.0 | 3.0
2 | 2.0| 5.0 | 7.0 |10.0
呼叫示例:
Scala:
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
Java:
import java.util.Arrays;
import java.util.List