1. 程式人生 > 其它 >機器學習sklearn(三十一):Pipeline(管道)和 FeatureUnion(特徵聯合): 合併的評估器

機器學習sklearn(三十一):Pipeline(管道)和 FeatureUnion(特徵聯合): 合併的評估器

變換器(Transformers)通常與分類器,迴歸器或其他的學習器組合在一起以構建複合估計器。 完成這件事的最常用工具是Pipeline。 Pipeline 經常與 FeatureUnion 結合起來使用。 FeatureUnion 用於將變換器(transformers)的輸出串聯到複合特徵空間(composite feature space)中。 TransformedTargetRegressor 用來處理變換 target (即對數變化 y)。 作為對比,Pipelines類只用來變換(transform)觀測資料(X)。

1. Pipeline: 鏈式評估器

Pipeline可以把多個評估器連結成一個。這個是很有用的,因為處理資料的步驟一般都是固定的,例如特徵選擇、標準化和分類。

Pipeline在這裡有多種用途:

  • 便捷性和封裝性你只要對資料呼叫fitpredict一次來適配所有的一系列評估器。
  • 聯合的引數選擇你可以一次grid search管道中所有評估器的引數。
  • 安全性訓練轉換器和預測器使用的是相同樣本,管道有助於防止來自測試資料的統計資料洩露到交叉驗證的訓練模型中。

管道中的所有評估器,除了最後一個評估器,管道的所有評估器必須是轉換器。 (例如,必須有transform方法). 最後一個評估器的型別不限(轉換器、分類器等等)

1.1. 用法

1.1.1. 構造

Pipeline使用一系列(key, value)鍵值對來構建,其中key是你給這個步驟起的名字,value

是一個評估器物件:

>>> from sklearn.pipeline import Pipeline
>>> from sklearn.svm import SVC
>>> from sklearn.decomposition import PCA
>>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
>>> pipe = Pipeline(estimators)
>>> pipe
Pipeline(memory=None,
         steps
=[('reduce_dim', PCA(copy=True,...)), ('clf', SVC(C=1.0,...))], verbose=False)

功能函式make_pipeline是構建管道的縮寫; 它接收多個評估器並返回一個管道,自動填充評估器名:

>>> from sklearn.pipeline import make_pipeline
>>> from sklearn.naive_bayes import MultinomialNB
>>> from sklearn.preprocessing import Binarizer
>>> make_pipeline(Binarizer(), MultinomialNB())
Pipeline(memory=None,
         steps=[('binarizer', Binarizer(copy=True, threshold=0.0)),
                ('multinomialnb', MultinomialNB(alpha=1.0,
                                                class_prior=None,
                                                fit_prior=True))],
         verbose=False)

1.1.2. 訪問步驟

管道中的評估器作為一個列表儲存在steps屬性內,但可以通過索引或名稱([idx])訪問管道:

>>> pipe.steps[0]  
('reduce_dim', PCA(copy=True, iterated_power='auto', n_components=None,
                   random_state=None, svd_solver='auto', tol=0.0,
                   whiten=False))
>>> pipe[0]  
PCA(copy=True, iterated_power='auto', n_components=None, random_state=None,
    svd_solver='auto', tol=0.0, whiten=False)
>>> pipe['reduce_dim']  
PCA(copy=True, ...)

管道的named_steps屬性允許在互動式環境中使用tab補全,以按名稱訪問步驟:

>> pipe.named_steps.reduce_dim is pipe['reduce_dim']
True

還可以使用通常用於Python序列(如列表或字串)的切片表示法提取子管道(儘管只允許步驟1)。這對於只執行一些轉換(或它們的逆)是很方便的:

>>> pipe[:1]
Pipeline(memory=None, steps=[('reduce_dim', PCA(copy=True, ...))],...)
>>> pipe[-1:]
Pipeline(memory=None, steps=[('clf', SVC(C=1.0, ...))],...)

1.1.3. 巢狀引數

管道中的評估器引數可以通過<estimator>__<parameter>語義來訪問:

>>> pipe.set_params(clf__C=10)
Pipeline(memory=None,
         steps=[('reduce_dim', PCA(copy=True, iterated_power='auto',...)),
                ('clf', SVC(C=10, cache_size=200, class_weight=None,...))],
         verbose=False)

這對網格搜尋尤其重要:

>>> from sklearn.model_selection import GridSearchCV
>>> param_grid = dict(reduce_dim__n_components=[2, 5, 10],
...                   clf__C=[0.1, 10, 100])
>>> grid_search = GridSearchCV(pipe, param_grid=param_grid)

單獨的步驟可以用多個引數替換,除了最後步驟,其他步驟都可以設定為passthrough來跳過

>>> from sklearn.linear_model import LogisticRegression
>>> param_grid = dict(reduce_dim=['passthrough', PCA(5), PCA(10)],
...                   clf=[SVC(), LogisticRegression()],
...                   clf__C=[0.1, 10, 100])
>>> grid_search = GridSearchCV(pipe, param_grid=param_grid)

管道的估計量可以通過索引檢索:

>>> pipe[0]  
PCA(copy=True, ...)

示例:

也可以參閱:

1.2. 注意

對管道呼叫fit方法的效果跟依次對每個評估器呼叫fit方法一樣, 都是transform輸入並傳遞給下個步驟。 管道中最後一個評估器的所有方法,管道都有。例如,如果最後的評估器是一個分類器,Pipeline可以當做分類器來用。如果最後一個評估器是轉換器,管道也一樣可以。

1.3. 快取轉換器:避免重複計算

適配轉換器是很耗費計算資源的。設定了memory引數,Pipeline將會在呼叫fit方法後快取每個轉換器。 如果引數和輸入資料相同,這個特徵用於避免重複計算適配的轉換器。典型的示例是網格搜尋轉換器,該轉化器只要適配一次就可以多次使用。

memory引數用於快取轉換器。memory可以是包含要快取的轉換器的目錄的字串或一個joblib.Memory物件:

>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> from sklearn.decomposition import PCA
>>> from sklearn.svm import SVC
>>> from sklearn.pipeline import Pipeline
>>> estimators = [('reduce_dim', PCA()), ('clf', SVC())]
>>> cachedir = mkdtemp()
>>> pipe = Pipeline(estimators, memory=cachedir)
>>> pipe
Pipeline(...,
         steps=[('reduce_dim', PCA(copy=True,...)),
                ('clf', SVC(C=1.0,...))], verbose=False)
>>> # Clear the cache directory when you don't need it anymore
>>> rmtree(cachedir)

警告:快取轉換器的副作用

使用Pipeline而不開啟快取功能,還是可以通過檢視原始例項的,例如:

>> from sklearn.datasets import load_digits
>> digits = load_digits()
>> pca1 = PCA()
>> svm1 = SVC(gamma='scale')
>> pipe = Pipeline([('reduce_dim', pca1), ('clf', svm1)])
>> pipe.fit(digits.data, digits.target)
  Pipeline(memory=None,
           steps=[('reduce_dim', PCA(...)), ('clf', SVC(...))],
           verbose=False)
>> # The pca instance can be inspected directly
>> print(pca1.components_)
      [[-1.77484909e-19  ... 4.07058917e-18]]

開啟快取會在適配前觸發轉換器的克隆。因此,管道的轉換器例項不能被直接檢視。 在下面示例中, 訪問PCA例項pca2將會引發AttributeError因為pca2是一個未適配的轉換器。 這時應該使用屬性named_steps來檢查管道的評估器:

>> cachedir = mkdtemp()
>> pca2 = PCA()
>> svm2 = SVC(gamma='scale')
>> cached_pipe = Pipeline([('reduce_dim', pca2), ('clf', svm2)],memory=cachedir)
>> cached_pipe.fit(digits.data, digits.target)
  ...
   Pipeline(memory=...,
            steps=[('reduce_dim', PCA(...)), ('clf', SVC(...))],
            verbose=False)
>> print(cached_pipe.named_steps['reduce_dim'].components_)
  ...
      [[-1.77484909e-19  ... 4.07058917e-18]]
>> # Remove the cache directory
>> rmtree(cachedir)

示例:

2. 迴歸中的目標轉換

TransformedTargetRegressor在擬合迴歸模型之前對目標y進行轉換。這些預測通過一個逆變換被映射回原始空間。它以預測所用的迴歸器為引數,將應用於目標變數的變壓器為引數:

>>> import numpy as np
>>> from sklearn.datasets import load_boston
>>> from sklearn.compose import TransformedTargetRegressor
>>> from sklearn.preprocessing import QuantileTransformer
>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.model_selection import train_test_split
>>> boston = load_boston()
>>> X = boston.data
>>> y = boston.target
>>> transformer = QuantileTransformer(output_distribution='normal')
>>> regressor = LinearRegression()
>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   transformer=transformer)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: 0.67
>>> raw_target_regr = LinearRegression().fit(X_train, y_train)
>>> print('R2 score: {0:.2f}'.format(raw_target_regr.score(X_test, y_test)))
R2 score: 0.64

對於簡單的變換,可以傳遞一對函式,而不是一個Transformer物件,定義變換及其逆對映:

>>> def func(x):
...     return np.log(x)
>>> def inverse_func(x):
...     return np.exp(x)

隨後,物件被建立為:

>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   func=func,
...                                   inverse_func=inverse_func)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: 0.65

預設情況下,所提供的函式在每次匹配時都被檢查為彼此的倒數。但是,可以通過將check_reverse設定為False來繞過這個檢查:

>>> def inverse_func(x):
...     return x
>>> regr = TransformedTargetRegressor(regressor=regressor,
...                                   func=func,
...                                   inverse_func=inverse_func,
...                                   check_inverse=False)
>>> regr.fit(X_train, y_train)
TransformedTargetRegressor(...)
>>> print('R2 score: {0:.2f}'.format(regr.score(X_test, y_test)))
R2 score: -4.50

注意可以通過設定transformer或函式對func和inverse_func來觸發轉換。但是,同時設定這兩個選項會產生錯誤。

示例

3. FeatureUnion(特徵聯合): 複合特徵空間

FeatureUnion合併了多個轉換器物件形成一個新的轉換器,該轉換器合併了他們的輸出。一個FeatureUnion可以接收多個轉換器物件。在適配期間,每個轉換器都單獨的和資料適配。 對於轉換資料,轉換器可以併發使用,且輸出的樣本向量被連線成更大的向量。

FeatureUnion功能與Pipeline一樣- 便捷性和聯合引數的估計和驗證。

可以結合:FeatureUnionPipeline來創造出複雜模型。

(一個FeatureUnion沒辦法檢查兩個轉換器是否會產出相同的特徵。它僅僅在特徵集合不相關時產生聯合並確認是呼叫者的職責。)

3.1. 用法

一個FeatureUnion是通過一系列(key, value)鍵值對來構建的,其中的key給轉換器指定的名字 (一個絕對的字串; 他只是一個代號),value是一個評估器物件:

>>> from sklearn.pipeline import FeatureUnion
>>> from sklearn.decomposition import PCA
>>> from sklearn.decomposition import KernelPCA
>>> estimators = [('linear_pca', PCA()), ('kernel_pca', KernelPCA())]
>>> combined = FeatureUnion(estimators)
>>> combined
FeatureUnion(n_jobs=None,
             transformer_list=[('linear_pca', PCA(copy=True,...)),
                               ('kernel_pca', KernelPCA(alpha=1.0,...))],
             transformer_weights=None, verbose=False)

跟管道一樣,特徵聯合有一個精簡版的構造器叫做:func:make_union,該構造器不需要顯式給每個組價起名字。

正如Pipeline, 單獨的步驟可能用set_params替換 ,並設定為drop來跳過:

>>> combined.set_params(kernel_pca='drop')
...
FeatureUnion(n_jobs=None,
             transformer_list=[('linear_pca', PCA(copy=True,...)),
                               ('kernel_pca', 'drop')],
             transformer_weights=None, verbose=False)

示例:

4. 用於異構資料的列轉換器

警告:compose.ColumnTransformer還在實驗中,它的 API可能會變動的。

許多資料集包含不同型別的特性,比如文字、浮點數和日期,每種型別的特徵都需要單獨的預處理或特徵提取步驟。 通常,在應用scikit-learn方法之前,最容易的是對資料進行預處理,例如pandas。 在將資料傳遞給scikit-learn之前處理資料可能會出現問題,原因如下:

  • 將來自測試資料的統計資訊整合到預處理程式中,使得交叉驗證分數不可靠(被稱為資料洩露)。 例如,在尺度變換或計算缺失值的情況下。
  • 你可能想要在parameter search中包含前處理器引數。

compose.ColumnTransformer對資料的不同列執行不同的變換,該管道不存在資料洩漏,並且可以引數化。ColumnTransformer可以處理陣列、稀疏矩陣和pandas DataFrames

對每一列,都會應用一個不同的變換, 比如preprocessing或某個特定的特徵抽取方法:

>>> import pandas as pd
>>> X = pd.DataFrame(
...     {'city': ['London', 'London', 'Paris', 'Sallisaw'],
...      'title': ["His Last Bow", "How Watson Learned the Trick",
...                "A Moveable Feast", "The Grapes of Wrath"],
...      'expert_rating': [5, 3, 4, 5],
...      'user_rating': [4, 5, 4, 3]})

對於這些資料,我們可能希望使用preprocessing.OneHotEncodercity列編碼為一個分類變數,同時使用feature_extraction.text.CountVectorizer來處理title列。由於我們可能會把多個特徵抽取器用在同一列上, 我們給每一個變換器取一個唯一的名字,比如“city_category”和“title_bow”。預設情況下,忽略其餘的ranking列(remainder='drop'):

>>> from sklearn.compose import ColumnTransformer
>>> from sklearn.feature_extraction.text import CountVectorizer
>>> column_trans = ColumnTransformer(
...     [('city_category', CountVectorizer(analyzer=lambda x: [x]), 'city'),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder='drop')

>>> column_trans.fit(X)
ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
    transformer_weights=None,
    transformers=...)

>>> column_trans.get_feature_names()
...
['city_category__London', 'city_category__Paris', 'city_category__Sallisaw',
'title_bow__bow', 'title_bow__feast', 'title_bow__grapes', 'title_bow__his',
'title_bow__how', 'title_bow__last', 'title_bow__learned', 'title_bow__moveable',
'title_bow__of', 'title_bow__the', 'title_bow__trick', 'title_bow__watson',
'title_bow__wrath']

>>> column_trans.transform(X).toarray()
...
array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
       [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0],
       [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
       [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1]]...)

在上面的示例中,CountVectorizer希望接受一維陣列作為輸入,因此列被指定為字串('title')。然而,preprocessing.OneHotEncoder就像大多數其他轉換器一樣,期望2D資料,因此在這種情況下,您需要將列指定為字串列表(['city'])。

除了標量或單個項列表外,列選擇可以指定為多個項、整數陣列、片或布林掩碼的列表。如果輸入是DataFrame,則字串可以引用列,整數總是解釋為位置列。

我們可以通過設定remainder='passthrough'來保留其餘的ranking列。這些值被附加到轉換的末尾:

>>> column_trans = ColumnTransformer(
...     [('city_category', OneHotEncoder(dtype='int'),['city']),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder='passthrough')

>>> column_trans.fit_transform(X)
...
array([[1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 5, 4],
       [1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 3, 5],
       [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 4, 4],
       [0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 5, 3]]...)

可以將remainder設定為estimator來轉換剩餘的ranking列。轉換後的值被附加到轉換的末尾:

>>> from sklearn.preprocessing import MinMaxScaler
>>> column_trans = ColumnTransformer(
...     [('city_category', OneHotEncoder(), ['city']),
...      ('title_bow', CountVectorizer(), 'title')],
...     remainder=MinMaxScaler())

>>> column_trans.fit_transform(X)[:, -2:]
...
array([[1. , 0.5],
       [0. , 1. ],
       [0.5, 0.5],
       [1. , 0. ]])

函式make_column_transformer可用來更簡單的建立類物件ColumnTransformer。 特別的,名字將會被自動給出。上面的示例等價於

>>> from sklearn.compose import make_column_transformer
>>> column_trans = make_column_transformer(
...     (OneHotEncoder(), ['city']),
...     (CountVectorizer(), 'title'),
...     remainder=MinMaxScaler())
>>> column_trans
ColumnTransformer(n_jobs=None, remainder=MinMaxScaler(copy=True, ...),
         sparse_threshold=0.3,
         transformer_weights=None,
         transformers=[('onehotencoder', ...)

示例

classsklearn.pipeline.Pipeline(steps,*,memory=None,verbose=False)

Pipeline of transforms with a final estimator.

Sequentially apply a list of transforms and a final estimator. Intermediate steps of the pipeline must be ‘transforms’, that is, they must implement fit and transform methods. The final estimator only needs to implement fit. The transformers in the pipeline can be cached usingmemoryargument.

The purpose of the pipeline is to assemble several steps that can be cross-validated together while setting different parameters. For this, it enables setting parameters of the various steps using their names and the parameter name separated by a ‘__’, as in the example below. A step’s estimator may be replaced entirely by setting the parameter with its name to another estimator, or a transformer removed by setting it to ‘passthrough’ orNone.

Read more in theUser Guide.

New in version 0.5.

Parameters
stepslist

List of (name, transform) tuples (implementing fit/transform) that are chained, in the order in which they are chained, with the last object an estimator.

memorystr or object with the joblib.Memory interface, default=None

Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attributenamed_stepsorstepsto inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming.

verbosebool, default=False

If True, the time elapsed while fitting each step will be printed as it is completed.

Attributes
named_stepsBunch

Dictionary-like object, with the following attributes. Read-only attribute to access any step parameter by user given name. Keys are step names and values are steps parameters.

>>> from sklearn.svm import SVC
>>> from sklearn.preprocessing import StandardScaler
>>> from sklearn.datasets import make_classification
>>> from sklearn.model_selection import train_test_split
>>> from sklearn.pipeline import Pipeline
>>> X, y = make_classification(random_state=0)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
...                                                     random_state=0)
>>> pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
>>> # The pipeline can be used as any other estimator
>>> # and avoids leaking the test set into the train set
>>> pipe.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('svc', SVC())])
>>> pipe.score(X_test, y_test)
0.88

Methods

decision_function(X)

Apply transforms, and decision_function of the final estimator

fit(X[,y])

Fit the model

fit_predict(X[,y])

Applies fit_predict of last step in pipeline after transforms.

fit_transform(X[,y])

Fit the model and transform with the final estimator

get_params([deep])

Get parameters for this estimator.

predict(X,**predict_params)

Apply transforms to the data, and predict with the final estimator

predict_log_proba(X)

Apply transforms, and predict_log_proba of the final estimator

predict_proba(X)

Apply transforms, and predict_proba of the final estimator

score(X[,y,sample_weight])

Apply transforms, and score with the final estimator

score_samples(X)

Apply transforms, and score_samples of the final estimator.

set_params(**kwargs)

Set the parameters of this estimator.

classsklearn.pipeline.FeatureUnion(transformer_list,*,n_jobs=None,transformer_weights=None,verbose=False)

Concatenates results of multiple transformer objects.

This estimator applies a list of transformer objects in parallel to the input data, then concatenates the results. This is useful to combine several feature extraction mechanisms into a single transformer.

Parameters of the transformers may be set using its name and the parameter name separated by a ‘__’. A transformer may be replaced entirely by setting the parameter with its name to another transformer, or removed by setting to ‘drop’.

Read more in theUser Guide.

New in version 0.13.

Parameters
transformer_listlist of (string, transformer) tuples

List of transformer objects to be applied to the data. The first half of each tuple is the name of the transformer. The tranformer can be ‘drop’ for it to be ignored.

Changed in version 0.22:DeprecatedNoneas a transformer in favor of ‘drop’.

n_jobsint, default=None

Number of jobs to run in parallel.Nonemeans 1 unless in ajoblib.parallel_backendcontext.-1means using all processors. SeeGlossaryfor more details.

Changed in version v0.20:n_jobsdefault changed from 1 to None

transformer_weightsdict, default=None

Multiplicative weights for features per transformer. Keys are transformer names, values the weights. Raises ValueError if key not present intransformer_list.

verbosebool, default=False

If True, the time elapsed while fitting each transformer will be printed as it is completed.

Attributes
n_features_in_

Examples

>>> from sklearn.pipeline import FeatureUnion
>>> from sklearn.decomposition import PCA, TruncatedSVD
>>> union = FeatureUnion([("pca", PCA(n_components=1)),
...                       ("svd", TruncatedSVD(n_components=2))])
>>> X = [[0., 1., 3], [2., 2., 5]]
>>> union.fit_transform(X)
array([[ 1.5       ,  3.0...,  0.8...],
       [-1.5       ,  5.7..., -0.4...]])

Methods

fit(X[,y])

Fit all transformers using X.

fit_transform(X[,y])

Fit all transformers, transform the data and concatenate results.

get_feature_names()

Get feature names from all transformers.

get_params([deep])

Get parameters for this estimator.

set_params(**kwargs)

Set the parameters of this estimator.

transform(X)

Transform X separately by each transformer, concatenate results.