1. 程式人生 > >二十種特徵變換方法及Spark MLlib呼叫例項(Scala/Java/python)(二)

二十種特徵變換方法及Spark MLlib呼叫例項(Scala/Java/python)(二)

VectorIndexer

演算法介紹:

        VectorIndexer解決資料集中的類別特徵Vector。它可以自動識別哪些特徵是類別型的,並且將原始值轉換為類別指標。它的處理流程如下:

1.獲得一個向量型別的輸入以及maxCategories引數。

2.基於原始數值識別哪些特徵需要被類別化,其中最多maxCategories需要被類別化。

3.對於每一個類別特徵計算0-based類別指標。

4.對類別特徵進行索引然後將原始值轉換為指標。

       索引後的類別特徵可以幫助決策樹等演算法處理類別型特徵,並得到較好結果。

       在下面的例子中,我們讀入一個數據集,然後使用VectorIndexer來決定哪些特徵需要被作為非數值型別處理,將非數值型特徵轉換為他們的索引。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()

Java:

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();

Python:
from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()

Normalizer(正則化)

演算法介紹:

        Normalizer是一個轉換器,它可以將多行向量輸入轉化為統一的形式。引數為p(預設值:2)來指定正則化中使用的p-norm。正則化操作可以使輸入資料標準化並提高後期學習演算法的效果。

      下面的例子展示如何讀入一個libsvm格式的資料,然後將每一行轉換為 以及 形式。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.Normalizer

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()

Java:
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();

Python:
from pyspark.ml.feature import Normalizer

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()

StandardScaler

演算法介紹:

       StandardScaler處理Vector資料,標準化每個特徵使得其有統一的標準差以及(或者)均值為零。它需要如下引數:

1. withStd:預設值為真,使用統一標準差方式。

2. withMean:預設為假。此種方法將產出一個稠密輸出,所以不適用於稀疏輸入。

        StandardScaler是一個Estimator,它可以fit資料集產生一個StandardScalerModel,用來計算彙總統計。然後產生的模可以用來轉換向量至統一的標準差以及(或者)零均值特徵。注意如果特徵的標準差為零,則該特徵在向量中返回的預設值為0.0

       下面的示例展示如果讀入一個libsvm形式的資料以及返回有統一標準差的標準化特徵。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:
from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

MinMaxScaler

演算法介紹:

       MinMaxScaler通過重新調節大小將Vector形式的列轉換到指定的範圍內,通常為[0,1],它的引數有:

1. min:預設為0.0,為轉換後所有特徵的下邊界。

2. max:預設為1.0,為轉換後所有特徵的下邊界。

       MinMaxScaler計算資料集的彙總統計量,併產生一個MinMaxScalerModel。該模型可以將獨立的特徵的值轉換到指定的範圍內。

       對於特徵E來說,調整後的特徵值如下:

 

       如果 ,則 。

       注意因為零值轉換後可能變為非零值,所以即便為稀疏輸入,輸出也可能為稠密向量。

       下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[0,1]之間。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.MinMaxScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:
from pyspark.ml.feature import MinMaxScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

MaxAbsScaler

演算法介紹:

       MaxAbsScaler使用每個特徵的最大值的絕對值將輸入向量的特徵值轉換到[-1,1]之間。因為它不會轉移/集中資料,所以不會破壞資料的稀疏性。

      下面的示例展示如果讀入一個libsvm形式的資料以及調整其特徵值到[-1,1]之間。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.MaxAbsScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Java:
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");
MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();

Python:
from pyspark.ml.feature import MaxAbsScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

Bucketizer

演算法介紹:

       Bucketizer將一列連續的特徵轉換為特徵區間,區間由使用者指定。引數如下:

1. splits:分裂數為n+1時,將產生n個區間。除了最後一個區間外,每個區間範圍[x,y]由分裂的xy決定。分裂必須是嚴格遞增的。在分裂指定外的值將被歸為錯誤。兩個分裂的例子為Array(Double.NegativeInfinity,0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)

       注意,當不確定分裂的上下邊界時,應當新增Double.NegativeInfinityDouble.PositiveInfinity以免越界。

      下面將展示Bucketizer的使用方法。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()

Java:
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();

Python:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()

ElementwiseProduct

演算法介紹:

        ElementwiseProduct按提供的“weight”向量,返回與輸入向量元素級別的乘積。即是說,按提供的權重分別對輸入資料進行縮放,得到輸入向量v以及權重向量wHadamard積。

 

       下面例子展示如何通過轉換向量的值來調整向量。

呼叫示例:

Scala:

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()

Java:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();

Python:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()

SQLTransformer

演算法介紹:

       SQLTransformer工具用來轉換由SQL定義的陳述。目前僅支援SQL語法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表輸入資料的基礎表。選擇語句指定輸出中展示的欄位、元素和表示式,支援Spark SQL中的所有選擇語句。使用者可以基於選擇結果使用Spark SQL建立方程或者使用者自定義函式。SQLTransformer支援語法示例如下:

1. SELECTa, a + b AS a_b FROM __THIS__

2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5

3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

示例:

假設我們有如下DataFrame包含idv1v2列:id |  v1 |  v2

----|-----|-----

0  | 1.0 | 3.0 

2  | 2.0 | 5.0

使用SQLTransformer語句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"轉換後得到輸出如下:

id |  v1 | v2 |  v3 |  v4

----|-----|-----|-----|-----

 0  | 1.0| 3.0 | 4.0 | 3.0

 2  | 2.0| 5.0 | 7.0 |10.0

呼叫示例:

Scala:

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()

Java:
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();

Python:
from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()

VectorAssembler

演算法介紹:

        VectorAssembler是一個轉換器,它將給定的若干列合併為一列向量。它可以將原始特徵和一系列通過其他轉換器得到的特徵合併為單一的特徵向量,來訓練如邏輯迴歸和決策樹等機器學習演算法。VectorAssembler可接受的輸入列型別:數值型、布林型、向量型。輸入列的值將按指定順序依次新增到一個新向量中。

示例:

       假設我們有如下DataFrame包含id,hour,mobile, userFeatures以及clicked列:

id | hour | mobile| userFeatures     | clicked

----|------|--------|------------------|---------

 0  |18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

        userFeatures列中含有3個使用者特徵。我們想將hour,mobile以及userFeatures合併為一個新列。將VectorAssembler的輸入指定為hour,mobile以及userFeatures,輸出指定為features,通過轉換我們將得到以下結果:

id | hour | mobile| userFeatures     | clicked | features

----|------|--------|------------------|---------|-----------------------------

 0  |18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

呼叫示例:

Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())

Java:
import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println(output.select("features", "clicked").first());

Python:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())

QuantileDiscretizer

演算法介紹:

       QuantileDiscretizer講連續型特徵轉換為分級類別特徵。分級的數量由numBuckets引數決定。分級的範圍有漸進演算法決定。漸進的精度由relativeError引數決定。當relativeError設定為0時,將會計算精確的分位點(計算代價較高)。分級的上下邊界為負無窮到正無窮,覆蓋所有的實數值。

示例:假設我們有如下DataFrame包含id,hour

id | hour

----|------

 0  |18.0

----|------

 1  |19.0

----|------

 2  | 8.0

----|------

 3  | 5.0

----|------

 4  | 2.2

hour是一個Double型別的連續特徵,將引數numBuckets設定為3,我們可以將hour轉換為如下分級特徵。

id | hour | result

----|------|------

 0  |18.0 | 2.0

----|------|------

 1  |19.0 | 2.0

----|------|------

 2  |8.0  | 1.0

----|------|------

 3  |5.0  | 1.0

----|------|------

 4  |2.2  | 0.0

呼叫示例:

Scala:

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()

Java:
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show();

Python:
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()