1. 程式人生 > 其它 >Spark ML中的特徵轉換演算法(三)

Spark ML中的特徵轉換演算法(三)

一、VectorAssembler

VectorAssembler 是一個轉換器,它將給定的列列表組合成單個向量列。 它對於將原始特徵和不同特徵轉換器生成的特徵組合成單個特徵向量很有用,以便訓練 ML 模型,如邏輯迴歸和決策樹。 VectorAssembler 接受以下輸入列型別:所有數字型別、布林型別和向量型別。 在每一行中,輸入列的值將按指定順序連線成一個向量。

%spark
// 特徵轉換 —— —— VectorAssembler
// 將多列合併為向量列的特徵轉換器。
// 這需要遍歷整個資料集。 如果我們需要從資料中推斷列長度,我們需要額外呼叫“first”Dataset 方法
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset 
= spark.createDataFrame( Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0)) ).toDF("id", "hour", "mobile", "userFeatures", "clicked") val assembler = new VectorAssembler() .setInputCols(Array("hour", "mobile", "userFeatures")) .setOutputCol("features") val output = assembler.transform(dataset) println(
"Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'") output.select("features", "clicked").show(false) 輸出: Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features' +-----------------------+-------+ |features |clicked| +-----------------------+-------+ |[18.0
,1.0,0.0,10.0,0.5]|1.0 | +-----------------------+-------+

二、VectorSizeHint

  有時顯式指定 VectorType 列的向量大小會很有用。例如,VectorAssembler 使用來自其輸入列的大小資訊為其輸出列生成大小資訊和元資料。雖然在某些情況下可以通過檢查列的內容來獲取此資訊,但在流資料幀中,內容在流啟動之前不可用。 VectorSizeHint 允許使用者顯式指定列的向量大小,以便 VectorAssembler 或其他可能需要知道向量大小的轉換器可以使用該列作為輸入。

  要使用 VectorSizeHint,使用者必須設定 inputCol 和 size 引數。將此轉換器應用於資料幀會生成一個新的資料幀,其中包含用於指定向量大小的 inputCol 的更新元資料。對結果資料幀的下游操作可以使用元資料獲得這個大小。

  VectorSizeHint 還可以採用可選的 handleInvalid 引數,當向量列包含空值或大小錯誤的向量時,該引數控制其行為。預設情況下,handleInvalid 設定為“error”,表示應該丟擲異常。此引數也可以設定為“skip”,表示應從結果資料幀中過濾掉包含無效值的行,或“optimistic”,表示不應檢查列的無效值,應保留所有行。請注意,使用“optimistic”可能會導致生成的資料幀處於不一致狀態,這意味著 VectorSizeHint 應用於列的元資料與該列的內容不匹配。使用者應注意避免這種不一致的狀態。

%spark
// 特徵轉換 —— —— VectorSizeHint
// 將大小資訊新增到向量列的元資料的特徵轉換器。 VectorAssembler需要其輸入列的大小資訊,並且不能在沒有此元資料的流資料幀上使用。
// 注意:VectorSizeHint 修改 inputCol 以包含大小元資料,並且沒有 outputCol。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
//   inputCol 中向量的大小。 
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("過濾掉“userFeatures”中大小不正確的行 ")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// 該資料幀可以像以前一樣被下游轉換器使用 
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)

輸出:
過濾掉“userFeatures”中大小不正確的行 
+---+----+------+--------------+-------+
|id |hour|mobile|userFeatures  |clicked|
+---+----+------+--------------+-------+
|0  |18  |1.0   |[0.0,10.0,0.5]|1.0    |
+---+----+------+--------------+-------+

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+

三、QuantileDiscretizer

  QuantileDiscretizer 採用具有連續特徵的列,並輸出具有分箱分類特徵的列。可以使用 numBuckets 引數設定 bin 的數量。使用的桶數可能會小於此值,例如,如果輸入的不同值太少而無法建立足夠的不同分位數。從 2.3.0 開始,QuantileDiscretizer 可以通過設定 inputCols 引數一次對映多個列。如果同時設定了 inputCol 和 inputCols 引數,則會丟擲異常。要指定每列的桶數,可以設定 numBucketsArray 引數,或者如果跨列的桶數應該相同,可以設定 numBuckets 以方便。請注意,在多列的情況下,相對誤差適用於所有列。

  NaN 處理:在 QuantileDiscretizer 擬合期間,將忽略列中的 null 和 NaN 值。這將生成一個用於進行預測的 Bucketizer 模型。在轉換過程中,Bucketizer 在資料集中找到 NaN 值時會引發錯誤,但使用者也可以通過設定 handleInvalid 來選擇保留或刪除資料集中的 NaN 值。如果使用者選擇保留 NaN 值,會進行特殊處理,放入自己的桶中,例如,如果使用 4 個桶,則非 NaN 資料將放入桶[0-3],但 NaN 將計數在一個特殊的桶中[4]。

  演算法:使用近似演算法選擇 bin 範圍(有關詳細說明,請參閱 org.apache.spark.sql.DataFrameStatFunctions.approxQuantile 的文件)。近似的精度可以通過 relativeError 引數來控制。 bin 的下限和上限將是 -Infinity 和 +Infinity,涵蓋所有實數值。

%spark
// 特徵轉換 —— —— QuantileDiscretizer
// QuantileDiscretizer 採用具有連續特徵的列,並輸出具有分箱分類特徵的列。
// QuantileDiscretizer(分位數離散化)將一列連續型的資料列轉成分型別資料。通過取一個樣本的資料,並將其分為大致相等的部分,設定範圍。其下限為 -Infinity(負無窮大) ,上限為+Infinity(正無窮大)。
// 按分位數,對給出的資料列進行離散化分箱處理。
import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
//   將資料點分組到其中的儲存桶數(分位數或類別)。必須大於或等於 2。預設值:2
  .setNumBuckets(3)
//   支援引數:"skip","keep","error"
//   .setHandleInvalid("error")

val result = discretizer.fit(df).transform(df)
result.show(false)

輸出:
+---+----+------+
|id |hour|result|
+---+----+------+
|0  |18.0|2.0   |
|1  |19.0|2.0   |
|2  |8.0 |1.0   |
|3  |5.0 |1.0   |
|4  |2.2 |0.0   |
+---+----+------+

四、Imputer

Imputer 估計器使用缺失值所在列的平均值或中值來完成資料集中的缺失值。 輸入列應該是數字型別。 目前 Imputer 不支援分類特徵,並且可能為包含分類特徵的列建立不正確的值。 Imputer 可以通過 .setMissingValue(custom_value) 估算除“NaN”以外的自定義值。 例如,.setMissingValue(0) 將估算所有出現的 (0)。

注意輸入列中的所有空值都被視為缺失,因此也被估算。

使用缺失值所在列的平均值或中值來完成缺失值的插補估計器。輸入列應該是數字型別。目前 Imputer 不支援分類特徵 (SPARK-15041) 並且可能為分類特徵建立不正確的值。

請注意,當輸入列是整數時,估算值被強制轉換(截斷)為整數型別。例如,如果輸入列是 IntegerType (1, 2, 4, null),則平均插補後的輸出將是 IntegerType (1, 2, 4, 2)。

請注意,均值/中值是在過濾掉缺失值後計算的。輸入列中的所有 Null 值都被視為缺失,因此也被估算。對於計算中位數,使用 DataFrameStatFunctions.approxQuantile,相對誤差為 0.001。

%spark
// 特徵轉換 —— —— Imputer
// 缺失值處理類
// 在此示例中,Imputer 將使用從相應列中的其他值計算的平均值(預設插補策略)替換所有出現的 Double.NaN(缺失值的預設值)。 在此示例中,a 列和 b 列的代理值分別為 3.0 和 4.0。 轉換後,輸出列中的缺失值將替換為相關列的代理值。 
import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))
//   缺失值的佔位符。 所有出現的缺失值都將被估算。 請注意,空值始終被視為缺失。 預設值:Double.NaN 
// final valmissingValue: DoubleParam
  .setMissingValue(Double.NaN)
//   插補策略。目前僅支援“mean”和“median”。如果“mean”,則使用特徵的平均值替換缺失值。如果是“median”,則使用特徵的近似中值替換缺失值。預設值:mean 
  .setStrategy("mean")

val model = imputer.fit(df)
model.transform(df).show()

輸出:
+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+