Spark 模型選擇和調參
阿新 • • 發佈:2020-09-28
# Spark - ML Tuning
官方文件:https://spark.apache.org/docs/2.2.0/ml-tuning.html
這一章節主要講述如何通過使用MLlib的工具來除錯模型演算法和pipeline,內建的交叉驗證和其他工具允許使用者優化模型和pipeline中的超引數;
目錄:
- 模型選擇,也就是調參;
- 交叉驗證;
- 訓練集、驗證集劃分;
### 模型選擇(調參)
機器學習的一個重要工作就是模型選擇,或者說根據給定任務使用資料來發現最優的模型和引數,也叫做除錯,既可以針對單個模型進行除錯,也可以針對整個pipeline的各個環節進行除錯,使用者可以一次對整個pipeline進行除錯而不是每次一個pipeline中的部分;
MLlib支援CrossValidator和TrainValidationSplit等模型選擇工具,這些工具需要下列引數:
- Estimator:待除錯的演算法或者Pipeline;
- 引數Map列表:用於搜尋的引數空間;
- Evaluator:衡量模型在集外測試集上表現的方法;
這些工具工作方式如下:
- 分割資料到訓練集和測試集;
- 對每一組訓練&測試資料,應用所有引數空間中的可選引數組合:
- 對每一組引數組合,使用其設定到演算法上,得到對應的model,並驗證該model的效能;
- 選擇得到最好效能的模型使用的引數組合;
Evaluator針對迴歸問題可以是RegressionEvaluator,針對二分資料可以是BinaryClassificationEvaluator,針對多分類問題的MulticlassClassificationEvaluator,預設的驗證方法可以通過setMetricName來修改;
### 交叉驗證
CrossValidator首先將資料分到一個個的fold中,使用這些fold集合作為訓練集和測試集,如果k=3,那麼CrossValidator將生成3個(訓練,測試)組合,也就是通過3個fold排列組合得到的,每一組使用2個fold作為訓練集,另一個fold作為測試集,為了驗證一個指定的引數組合,CrossValidator需要計算3個模型的平均效能,每個模型都是通過之前的一組訓練&測試集訓練得到;
確認了最佳引數後,CrossValidator最終會使用全部資料和最佳引數組合來重新訓練預測;
例子:通過交叉驗證進行模型選擇;
注意:交叉驗證在整個引數網格上是十分耗時的,下面的例子中,引數網格中numFeatures有3個可取值,regParam有2個可取值,CrossValidator使用2個fold,這將會訓練3\*2\*2個不同的模型,在實際工作中,通常會設定更多的引數、更多的引數取值以及更多的fold,換句話說,CrossValidator本身就是十分奢侈的,無論如何,與手工除錯相比,它依然是一種更加合理和自動化的調參手段;
```python
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Prepare training documents, which are labeled.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
print(row)
```
### 劃分訓練、驗證集
對於超引數除錯,Spark還支援TrainValidationSplit,它一次只能驗證一組引數,這與CrossValidator一次進行k次截然不同,因此它更加快速,但是如果訓練集不夠大的化就無法得到一個真實的結果;
不像是CrossValidator,TrainValidationSplit建立一個訓練、測試組合,它根據trainRatio將資料分為兩部分,假設trainRatio=0.75,那麼資料集的75%作為訓練集,25%用於驗證;
與CrossValidator類似的是,TrainValidationSplit最終也會使用最佳引數和全部資料來訓練一個預測器;
```python
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
# Prepare training and test data.
data = spark.read.format("libsvm")\
.load("data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)
lr = LinearRegression(maxIter=10)
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()
# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
# 80% of the data will be used for training, 20% for validation.
trainRatio=0.8)
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)
# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
.select("features", "label", "prediction")\
.sho