1. 程式人生 > 其它 >利用pyspark pandas_udf 加速機器學習任務

利用pyspark pandas_udf 加速機器學習任務

實驗是最能定義資料科學家日常生活的詞。為了為給定的問題構建一個合適的機器學習模型,資料科學家需要訓練多個模型。此過程包括諸如尋找模型的最佳超引數、使用 K 折交叉驗證模型,有時甚至訓練具有多個輸出的模型等任務。前面提到的所有這些任務都很耗時,但對於模型開發的成功來說卻極為重要。在這篇博文中,我們將展示如何應用 PySpark Pandas UDF(一個用於在 Spark 叢集上分發 Python 函式的框架)來提高資料科學家的日常工作效率。

PySpark 如何實現 Pandas UDF(使用者定義函式)?

顧名思義,PySpark Pandas UDF 是一種使用 Pandas DataFrame 在 PySpark 中實現使用者定義函式 (UDF) 的方法。PySpark API 文件給出的定義如下:

“Pandas UDF 是使用者定義的函式,由 Spark 執行,使用 Arrow 傳輸資料,Pandas 執行資料,允許向量化操作。Pandas UDF 是使用pandas_udf
作為裝飾器或包裝函式來定義的,不需要額外的配置。Pandas UDF 通常表現為常規的 PySpark 函式 API。”

在這篇文章中,我們將探索PandasUDFType.GROUPED_MAP
,或者在 PySpark 的最新版本中,也稱為pyspark.sql.GroupedData.applyInPandas
. 主要思想很簡單,Pandas UDF 分組資料允許在資料集的每一組中進行操作。由於 spark 中的分組操作是跨叢集節點計算的,因此我們可以以允許在不同節點計算不同模型的方式操作我們的資料集。是的,我的兄弟們……永遠不要低估一個groupBy


.

配置

在進入應用 Pandas UDF 的細節之前,讓我們用一些模組、全域性變數和常用函式設定環境。

第一步是匯入將在這個小實驗中使用的所有模組。

importpandasaspd
fromcatboostimportCatBoostClassifier
fromitertoolsimportproduct
frompyspark.sqlimportDataFrame
frompyspark.sqlimportfunctionsassf
frompyspark.sql.functionsimportpandas_udf
frompyspark.sql.typesimport(
DoubleType,FloatType,IntegerType,StringType,StructField,StructType
)
fromsklearn.datasetsimportmake_multilabel_classification
fromsklearn.metricsimportaccuracy_score
fromsklearn.model_selectionimporttrain_test_split

並設定一些將要多次使用的全域性變數。

N_FEATURES=20
N_CLASSES=10

本文探索的每項任務的一個常見步驟是機器學習模型的訓練和評估。此步驟封裝在以下函式中,該函式根據 CatBoost 模型的準確度得分來訓練和評估該模型。

deftrain_and_evaluate_model(X_train,y_train,X_test,y_test,kwargs={}):

#splitdata
X_train,X_eval,y_train,y_eval=train_test_split(X_train,y_train,test_size=0.2,random_state=42)

#createmodel
model=CatBoostClassifier(
nan_mode='Min',
random_seed=42,
boosting_type='Plain',
bootstrap_type='Bernoulli',
rsm=0.1,
loss_function='Logloss',
use_best_model=True,
early_stopping_rounds=100,
**kwargs
)

#fitmodel
model.fit(X_train.values,y_train.values,eval_set=(X_eval,y_eval))

#evaluatemodel
accuracy=accuracy_score(model.predict(X_test),y_test)

returnaccuracy

為了訓練和測試我們的 CatBoost 模型,我們還需要一些資料。因此,讓我們使用 scikit-learn 的make_multilabel_classification
函式建立我們的資料集,並從中構建我們的 PySpark DataFrame。

X,y=make_multilabel_classification(
n_samples=10000,
n_features=N_FEATURES,
n_classes=N_CLASSES,
random_state=42
)
pdf=pd.DataFrame(X)
foriinrange(N_CLASSES):
pdf[f'y_{i}']=y[:,i]
df=spark.createDataFrame(pdf)
print(f'numberofrowsinthedataset:{df.count()}')
numberofrowsinthedataset:10000
df.limit(5).toPandas()
0123456789...y_0y_1y_2y_3y_4y_5y_6y_7y_8y_9
0 2.0 2.0 0.0 1.0 3.0 5.0 0.0 3.0 4.0 1.0 ... 0 1 1 0 0 0 0 1 0 0
1 4.0 3.0 2.0 2.0 0.0 4.0 1.0 2.0 0.0 3.0 ... 0 0 0 0 0 0 0 1 1 1
2 2.0 2.0 3.0 0.0 0.0 0.0 0.0 6.0 0.0 3.0 ... 0 0 0 0 0 0 0 0 1 0
3 0.0 1.0 4.0 4.0 2.0 0.0 2.0 1.0 3.0 2.0 ... 0 0 0 0 0 0 0 0 0 0
4 0.0 0.0 7.0 2.0 1.0 0.0 1.0 2.0 1.0 2.0 ... 0 0 0 0 0 0 0 0 0 1

5 行 × 30 列

最後,為了更高效的 Spark 計算,我們將啟用基於 arrow 的列式資料傳輸。

spark.conf.set('spark.sql.execution.arrow.enabled','true')

分散式網格搜尋

在機器學習中,超引數是其值用於控制模型架構及其學習過程的引數。通常在訓練模型時,您需要優化這些超引數,但是,儘管 ML 能夠找到最佳內部引數和決策閾值,但超引數是手動設定的。

如果搜尋空間包含太多可能性,您將需要花費大量時間進行測試以找到超引數的最佳組合。加速此任務的一種方法是將搜尋過程分佈在 Spark 叢集的節點上。

這種方法產生的一個問題是:“好吧,但我使用的演算法尚未在 Spark 上實現,我如何在這些限制下分配這個過程?” 別擔心!這是我們在這裡要回答的問題!

首先,我們必須定義超引數搜尋空間。為此,我們將建立一個輔助 PySpark DataFrame,其中每一行都是一組唯一的超引數。

values_range=list(
product(
[200,210,220,230,240,250,260,270,280,290],
[3,4,5,6,7],
[0.02,0.07,0.1,0.15,0.2],
['MinEntropy','Uniform','UniformAndQuantiles','GreedyLogSum'],
[1.0,2.0,3.0,4.0,5.0,6.0],
[0.5,0.6,0.7,0.8],
)
)

schema=StructType(
[
StructField('iterations',IntegerType(),True),
StructField('depth',IntegerType(),True),
StructField('learning_rate',DoubleType(),True),
StructField('feature_border_type',StringType(),True),
StructField('l2_leaf_reg',FloatType(),True),
StructField('subsample',FloatType(),True)
]
)

df_grid=spark.createDataFrame(data=values_range,schema=schema)
df_grid=df_grid.withColumn('replication_id',sf.monotonically_increasing_id())
df_grid.limit(5).toPandas()
iterationsdepthlearning_ratefeature_border_typel2_leaf_regsubsamplereplication_ID
0 200 4 0.1 Uniform 2.0 0.5 171798691840
1 200 4 0.1 Uniform 2.0 0.6 171798691841
2 200 4 0.1 Uniform 2.0 0.7 171798691842
3 200 4 0.1 Uniform 2.0 0.8 171798691843
4 200 4 0.1 Uniform 3.0 0.5 171798691844
print(f'numberofdifferenthyperparametercombinations:{df_grid.count()}')
numberofdifferenthyperparametercombinations:24000

對於每個超引數行,我們想要複製我們的資料,以便我們以後可以單獨處理每個超引數集。

df_replicated=df.crossJoin(df_grid)
print(f'numberofrowsinthereplicateddataset:{df_replicated.count()}')
numberofrowsinthereplicateddataset:240000000

最後一步是指定每個 Spark 節點將如何處理資料。為此,我們定義了run_model
函式。它從輸入 Spark DataFrame 中提取超引數和資料,然後訓練和評估模型,返回其結果。

#declaretheschemafortheoutputofourfunction
schema=StructType(
[
StructField('replication_id',IntegerType(),True),
StructField('accuracy',FloatType(),True),
StructField("iterations",IntegerType(),True),
StructField("depth",IntegerType(),True),
StructField("learning_rate",DoubleType(),True),
StructField("feature_border_type",StringType(),True),
StructField("l2_leaf_reg",FloatType(),True),
StructField("subsample",FloatType(),True)
]
)

#decorateourfunctionwithpandas_udfdecorator
@pandas_udf(schema,sf.PandasUDFType.GROUPED_MAP)
defhyperparameter_search(pdf):

#gethyperparametervalues
kwargs={
'iterations':pdf.iterations.values[0],
'depth':pdf.depth.values[0],
'learning_rate':pdf.learning_rate.values[0],
'feature_border_type':pdf.feature_border_type.values[0],
'l2_leaf_reg':pdf.l2_leaf_reg.values[0],
'subsample':pdf.subsample.values[0]
}

#getdataandlabel
X=pdf[[str(i)foriinrange(N_FEATURES)]]
y=pdf['y_0']

#splitdata
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=42)

#getaccuracy
accuracy=train_and_evaluate_model(X_train,y_train,X_test,y_test,kwargs)

#returnresultsaspandasDF
kwargs.update({
'replication_id':pdf.replication_id.values[0],
'accuracy':accuracy
})
results=pd.DataFrame([kwargs])

returnresults

我們現在可以按 對 Spark 資料幀進行分組replication_id
並應用該run_model
函式。這樣,每個超引數組合都將用於在分散式系統中訓練不同的模型。

results=df_replicated.groupby('replication_id').apply(hyperparameter_search)
%%time

results.sort('accuracy',ascending=False).limit(5).toPandas()
CPUtimes:user11.6s,sys:13.5s,total:25.1s
Walltime:29min10s
replication_idaccuracyiterationsdepthlearning_ratefeature_border_typel2_leaf_regsubsample
0 24 0.9145 210 7 0.20 Uniform 6.0 0.6
1 22 0.9125 250 3 0.20 Uniform 2.0 0.5
2 13 0.9125 230 6 0.15 MinEntropy 3.0 0.7
3 11 0.9125 290 3 0.20 Uniform 5.0 0.7
4 7 0.9125 220 3 0.10 MinEntropy 6.0 0.5

通過這種分散式方法,我們能夠在 29 分鐘內執行 24000 個超引數組合。

分散式 K Fold 交叉驗證

有了最優的超引數集,另一個重要的任務是對模型進行 K-Fold 交叉驗證,以防止(或最小化)過擬合的不良影響。在這個實驗中新增的摺疊越多,你的模型就越健壯。然而,你將不得不花更多的時間來訓練每個摺疊的模型。同樣,避免時間陷阱的一種方法是使用 Spark 並在 Spark 叢集的單個節點上計算每個摺疊。

我們以與分佈網格搜尋的方式非常相似的方式執行此操作,不同之處在於我們根據摺疊數複製我們的資料集。所以如果我們的交叉驗證使用 8 折,我們的資料集將被複制 8 次。

在這裡,我們的第一步是定義我們想要交叉驗證模型的摺疊次數。

N_FOLDS=8

在此之後,我們定義了一些程式碼來根據上面定義的摺疊數隨機拆分我們的資料集。

proportion=1/N_FOLDS
splits=df.randomSplit([proportion]*N_FOLDS,42)
df_folds=splits[0].withColumn('fold',sf.lit(0))
foriinrange(1,N_FOLDS):
df_folds=df_folds.union(
splits[i].withColumn('fold',sf.lit(i))
)

拆分後,我們將資料集複製 K 次。

df_numbers=spark.createDataFrame(
pd.DataFrame(list(range(N_FOLDS)),columns=['replication_id'])
)
df_numbers.toPandas()
replication_id
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
df_replicated=df_folds.crossJoin(df_numbers)
print(f'numberofrowsinthereplicateddataset:{df_replicated.count()}')
numberofrowsinthereplicateddataset:80000

與網格搜尋方法相比,我們還有另一個不同之處。在下面的函式中,我們根據replication_id
fold_id
定義訓練和測試資料集。如果replication_id
等於fold_id
,我們將該摺疊設定為測試摺疊,而其餘摺疊用作訓練集。

#declaretheschemafortheoutputofourfunction
schema=StructType(
[
StructField('replication_id',IntegerType(),True),
StructField('accuracy',FloatType(),True)
]
)

#decorateourfunctionwithpandas_udfdecorator
@pandas_udf(schema,sf.PandasUDFType.GROUPED_MAP)
defcross_validation(pdf):

#getrepliactionid
replication_id=pdf.replication_id.values[0]

#getdataandlabel
columns=[str(i)foriinrange(N_FEATURES)]
X_train=pdf[pdf.fold!=replication_id][columns]
X_test=pdf[pdf.fold==replication_id][columns]
y_train=pdf[pdf.fold!=replication_id]['y_0']
y_test=pdf[pdf.fold==replication_id]['y_0']

#getaccuracy
accuracy=train_and_evaluate_model(X_train,y_train,X_test,y_test)

#returnresultsaspandasDF
results=pd.DataFrame([{
'replication_id':replication_id,
'accuracy':accuracy
}])

#savethemodel(ifyouwanttoretrieveitlater)

returnresults

使用此方法可能需要考慮的一件事是如何儲存每個經過訓練的模型,因為每個模型都在不同的節點中進行訓練。為此,根據您的雲提供商,您可以使用一些開發的 Python 庫將檔案從叢集節點直接傳輸到雲端儲存桶(如 Google Cloud Storage 或 Amazon S3)。但是,如果您只對交叉驗證模型的效能感興趣,那麼上面的函式就足夠了。

results=df_replicated.groupby('replication_id').apply(cross_validation)
%%time

results.sort('accuracy',ascending=False).toPandas()
CPUtimes:user1.03s,sys:1.24s,total:2.27s
Walltime:35.9s
replication_idaccuracy
0 4 0.900715
1 5 0.895292
2 3 0.893720
3 2 0.893601
4 1 0.891801
5 7 0.890048
6 0 0.883293
7 6 0.882946

在這個實驗中,我們僅在 35 秒內評估了 8 個摺疊(叢集的每個節點中一個)。最佳摺疊(編號 4)的準確度得分為 0.900。

分散式多輸出模型

遵循相同的理念,我們可以利用 PySpark Pandas UDF 來分發多輸出模型的訓練。對於這個任務,我們有一組特徵和一組標籤,我們必須用相同的訓練資料為每個標籤訓練一個模型。

一些軟體包scikit-learn
已經實現了這種隨機森林演算法的方法。CatBoost
還可以選擇多輸出訓練。然而,與單輸出 API 相比,這些實現具有有限的超引數和損失函式選項。考慮到這一點,Pandas UDF 是一次自動訓練多個模型的替代方案,它使用任何其他機器學習庫通常為單輸出模型訓練提供的所有選項。

由於我們的資料集有多個標籤列,這次的方法是以一種可以複製每個特定標籤的資料的方式來旋轉我們的資料。因此,我們建立一列來對映每個標籤,並將所有標籤附加到一個標籤列中,如下所示:

features=[f'{i}'foriinrange(N_FEATURES)]
targets=[f'y_{i}'foriinrange(N_CLASSES)]

df_multipe_output=df.select(
*features,
sf.lit(targets[0]).alias('y_group'),
sf.col(targets[0]).alias('Y')
)
fortargetintargets[1:]:
df_multipe_output=df_multipe_output.union(
df.select(
*features,
sf.lit(target).alias('y_group'),
sf.col(target).alias('Y')
)
)
print(f'numberofrowsinthedataset:{df_multipe_output.count()}')
numberofrowsinthedataset:100000
df_multipe_output.limit(5).toPandas()
0123456789...1213141516171819y_groupy
0 1.0 3.0 9.0 1.0 6.0 0.0 5.0 0.0 4.0 1.0 ... 2.0 1.0 3.0 1.0 1.0 1.0 2.0 3.0 y_0 0
1 1.0 4.0 2.0 1.0 4.0 2.0 1.0 2.0 0.0 1.0 ... 3.0 2.0 5.0 2.0 2.0 3.0 3.0 3.0 y_0 1
2 2.0 6.0 3.0 6.0 0.0 5.0 4.0 3.0 2.0 4.0 ... 2.0 1.0 3.0 0.0 5.0 4.0 3.0 1.0 y_0 0
3 3.0 2.0 0.0 1.0 5.0 3.0 0.0 3.0 2.0 3.0 ... 3.0 1.0 0.0 6.0 1.0 0.0 3.0 1.0 y_0 1
4 6.0 3.0 0.0 0.0 3.0 6.0 0.0 2.0 3.0 0.0 ... 4.0 3.0 6.0 7.0 0.0 5.0 6.0 3.0 y_0 0

5 行 × 22 列

定義了我們的 spark 多輸出資料集後,我們準備定義執行模型訓練的函式。

#declaretheschemafortheoutputofourfunction
schema=StructType(
[
StructField('y_group',StringType(),True),
StructField('accuracy',FloatType(),True)
]
)

#decorateourfunctionwithpandas_udfdecorator
@pandas_udf(schema,sf.PandasUDFType.GROUPED_MAP)
defmulti_models(pdf):

#getgroup
y_group=pdf.y_group.values[0]

#getdataandlabel
X=pdf.drop(['Y','y_group'],axis=1)
y=pdf['Y']

#splitdata
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=42)

#getaccuracy
accuracy=train_and_evaluate_model(X_train,y_train,X_test,y_test)

#returnresultsaspandasDF
results=pd.DataFrame([{
'y_group':y_group,
'accuracy':accuracy
}])

returnresults

一切設定好後,就可以在y_group
列上呼叫 groupBy 方法來分發每個模型的訓練。

results=df_multipe_output.groupby('y_group').apply(multi_models).orderBy('accuracy')
%%time

results.sort('accuracy',ascending=False).limit(5).toPandas()
CPUtimes:user193ms,sys:195ms,total:388ms
Walltime:9.24s
y_groupaccuracy
0 y_6 0.9740
1 y_4 0.9330
2 y_5 0.9325
3 y_8 0.8990
4 y_0 0.8910

結論

在這篇文章中,我們展示了一些示例,說明如何使用 PySpark Pandas UDF 來分發涉及機器學習模型訓練的流程。展示的一些方法可用於節省時間或進行更大規模的實驗,否則會佔用過多記憶體或成本過高。