1. 程式人生 > 其它 >隨機森林:原理及python實現

隨機森林:原理及python實現

Table of Contents

隨機森林概述

  如圖所示,隨機森林是一種使用Bagging(Bootstrap Aggregating)方法的整合模型(Ensemble Model).其基本思路是,將基模型看作定義在相應模型空間裡的隨機變數,通過取得獨立同分布

的不同模型來投票決定最終的預測結果.隨機森林的基模型是決策樹,整個過程中,會訓練多個決策樹模型,然後融合模型預測的結果給出一個預測結果.

  整合模型有兩個關鍵部分:基模型和結合策略

個體學習器

  個體學習器可以分為同質和異質兩種:

  • 同質指的是不同個體學習器屬於同一種類,比如都是決策樹。
  • 異質指的是個體學習器屬於不同種類,比如既有決策樹又有SVM。

  目前來說,同質個體學習器的應用是最廣泛的,一般我們常說的整合學習的方法都是指的同質個體學習器,而同質個體學習器使用最多的模型是CART決策樹和神經元。

  同質學習器按照個體學習器之間是否存在依賴關係可分為兩類:

  • 存在強依賴關係:學習器序列生成,即新學習器是在上一個學習器基礎上生成的,代表就是boosting系列
  • 不存在強依賴關係:學習器並行生成,各個學習器之間沒有太大的依賴關係,代表就是bagging系列

整合策略

  • 平均值:針對迴歸問題,對多個個體學習器的結果計算平均值作為最終結果,可以使用算數平均值、加權平均值等。
  • 投票法:針對分類問題,對多個個體分類器的結果使用投票作為最終結果,可以使用少數服從多數、絕對多數、加權投票等。
  • 學習法:對個體學習器結果再使用一個學習器來處理,即stacking

隨機森林的一些相關問題

偏差(Bias)與方差(Variance)

  假設某樣本真實值為\(y\),\(f(x)\)總體迴歸方程(PRF,Population Regression Function)

,隨機擾動\(\epsilon\sim N(0,\sigma^2)\),且偏差與樣本特徵獨立\(\hat f(x)\)樣本回歸方程(SRF,Sample Regression Function),特徵向量\(x\),則有

\[y = f(x) + \epsilon \]

  使用MSE作為損失函式,有

\[\begin{split}l &= \frac 1n (y-\hat f(x))^2 \\& =E(y-\hat f(x))^2 \\&=Ey^2-2Ey\hat f(x)+E\hat f^2(x)\end{split} \]

首先看第一項

\[Ey^2 = E(f(x)+\epsilon)^2=Ef^2(x)+2E\epsilon f(x)+E\epsilon^2 \]

  由於\(f(x)\)是PRF,對於總體資料而言,模型函式形式確定意味著PRF確定,因此,

\[Ef^2(x)=f^2(x) \]

由於\(\epsilon\)與樣本特徵獨立,因此,

\[E\epsilon f(x) = E\epsilon Ef(x)=0 \]

又有

\[E\epsilon^2=D\epsilon+(E\epsilon)^2=\sigma^2 \]

\[Ey^2 = f^2(x) + \sigma^2 \]

然後第二項

  由於SRF受樣本影響,本質上是一個隨機變數,因此,

\[-2Ey\hat f(x)=-2E(f(x)+\epsilon)\hat f(x)=-2f(x)E\hat f(x) \]

最後第三項

\[E\hat f^2(x) = D\hat f(x) + (E\hat f(x))^2 \]

把以上代入原式得

\[\begin{split}l &= f^2(x)-2f(x)E\hat f(x)+(E\hat f(x))^2 + D\hat f(x)+\sigma^2\\&=[f(x)-E\hat f(x)]^2+D\hat f(x)+\sigma^2\\&=Bias^2+Variance+\sigma^2\end{split} \]

  也就是說,實際值與真實值的偏差由三部分組成:SRF的方差、SRF的偏差以及隨機擾動。在隨機森林中,SRF就是一個一個的個體分類器(決策樹)。

  Bias代表模型的準確度,模型越複雜,Bias越低。Variance代表模型的不同程度,模型越複雜,越容易過擬合,不同樣本訓練出的SRF差別越大,Variance也就越大,如下圖所示。因此,想要降低偏差平方和,可以從降低偏差、降低方差兩個方面著手。

RF通過降低方差提高預測準確性

  前面說到,Bagging通過訓練多顆獨立同分布的決策樹,然後對其取平均作為預測結果,以此獲得更準確的結果。為什麼要追求獨立同分布呢?首先,決策樹模型同分布自不必多說,每顆樹(SRF)都是由於不同的樣本獲取到的PRF的近似。

  對於同分布的\(n\)個隨機變數\(X_i,i=1,2,3,...,n\).其均值的方差

\[D\frac {\sum_{i=1}^nX_i}{n} = \frac 1{n^2}D\sum X_i=\frac 1{n^2}(nDX_i+\sum\limits_{k=1}^n\sum\limits_{j=k}^nCOV(X_k,X_j))=\frac 1{n^2}(n\sigma^2+n(n-1)\rho\sigma^2)=\frac {\sigma^2}n+\frac{n-1}{n}\rho\sigma^2=\rho\sigma^2+\frac{(1-\rho)\sigma^2}n \]

  在隨機森林中,隨機變數數量\(n\)代表決策樹的數量,可以看出,隨著樹的數量\(n\)的增大,方差減小。在樹的棵樹一定時,相關係數\(\rho\)越大,方差越大,當\(\rho=0\)時,方差最小。因此,各顆樹越不相關,方差越小

注意:這裡的\(\sigma\)指的是不同的樹預測值的方差,即上文中的\(D\hat f(x)\),與上面的隨機擾動\(\epsilon\)的方差不同。

Bootstrap(自助取樣)

  顧名思義,隨機森林使用了Bagging(Bootstrap Aggregating)方法,自然也就使用了Booststrap,其流程如下:

  每顆樹訓練時,對於樣本容量為\(m\)的訓練集,有放回地從中抽取\(m\)個樣本,作為這棵樹的訓練集合。當樣本容量\(m\)足夠大時,大約有36.8%的樣本未被取到:

\[P = (1-\frac 1m)^m \]\[\lim\limits_{m \rightarrow+\infty}P =\frac 1e \]

  自助取樣過程還給 Bagging 帶來了另一個優點:由於每個基學習器只使用了初始訓練集中約63.2%的樣本,剩下約36.8%的樣本可用作驗證集來對泛化效能進行“包外估計 ”(out-of-bag estimate)。oob可以用來在決策樹生長的過程中來輔助剪枝。

特徵取樣

  隨機森林對Bagging方法的一個小小改進就是不僅對樣本進行自助取樣,同時對特徵進行了隨機取樣,通常情況下,取樣特徵數量\(k = log_2n\),\(n\)為特徵數量

隨機森林的實現

python實現

  基於cart樹的隨機森林,不知道哪裡有問題,有帶佬發現了麻煩指教一下(不管了,編碼一直是弱項ORZ,自己實現一方面是為了練習coding能力,另一方面主要還是為了加深對演算法的理解)。

from sklearn.metrics import r2_score
from multiprocessing import Pool

import numpy as np
import sklearn.ensemble
from sklearn.datasets import load_diabetes
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')


class DTNode:
    '''
    定義節點類用於儲存決策樹節點。
    '''

    def __init__(self, data):
        # 用於儲存節點資料
        if len(data.shape) == 1:
            self.data = data.reshape([1, -1])
        else:
            self.data = data
        # 最優劃分特徵index
        self.best_feature = None
        # 節點劃分特徵資料型別
        self.feature_type = None
        # 節點劃分條件
        self.judge_value = None
        # 左子節點
        self.left_node = None
        # 右子節點
        self.right_node = None
        # 節點劃分後SE
        self.se_split = np.inf
        # 節點劃分前SE
        self.se_node = np.sum((data[:, -1] - np.mean(data[:, -1])) ** 2)
        # 節點均值
        self.value = np.mean(self.data[:, -1])
        # 是否葉節點
        self.leaf = False
        # 隨機取樣到的列索引,只有根節點不是None
        self.chosen_columns = None


class RandomForestRegressor:
    def __init__(self, X, y, n_estimators=20, min_mse=1,
                 oob=False, feature_size=None):
        self.features = np.array(X).reshape(len(X), -1)
        self.labels = np.array(y).reshape(len(y), -1)
        self.data = np.hstack([self.features, self.labels])
        # 決策樹數量
        self.n_estimators = n_estimators
        # 最小MSE,MSE低於這個值不再進行節點劃分
        self.min_mse = min_mse
        self.oob = oob
        # 特徵取樣數量
        if feature_size is None:
            self.feature_size = np.int32(np.log2(self.features.shape[1]))
        else:
            self.feature_size = feature_size
        # 用於儲存隨機森林,訓練完成後,決策樹被存在長為n_estimator的列表裡
        self.forests = None

    def bootstrap(self, data, oob=None):
        '''
        自助取樣
        :param data: 待取樣資料,最後一列為標籤的增廣矩陣
        :param oob: 是否保留袋外值,用於剪枝,暫時未使用
        :return: 自助取樣後的資料集
        '''
        X = np.array(data).reshape(len(data), -1)
        if oob is None:
            oob = self.oob
        chosen_index = np.random.randint(0, len(data), len(data))
        if oob is True:
            oob_index = list(set(range(len(data))) - set(chosen_index))
            return data[chosen_index, :], data[oob_index, :]
        else:
            return data[chosen_index, :]

    def choose_feature(self, data):
        '''
        特徵選擇
        :param data: 帶有標籤的增廣矩陣
        :return: 取樣後增廣矩陣,取樣特徵所在列索引(一維陣列,包括y)
        '''
        data = np.array(data).reshape(len(data), -1)
        if data.shape[1] == 2:
            return data
        else:
            size = self.feature_size
            chosen_col = np.random.randint(0, data.shape[1]-1, size)
            chosen_col = np.append(chosen_col, data.shape[1]-1)
            return data[:, chosen_col], chosen_col

    def isCategorical(self, vec):
        '''
        判斷是否為分類變數
        :param vec: 待判斷資料列
        :return: True為分類變數,False為連續變數
        '''
        vec = np.array(vec)
        if all(np.int32(vec) == vec):
            return True
        else:
            return False

    def calSE(self, array):
        '''
        計算輸入陣列的偏差平方和
        :param array:陣列
        :return: 輸入陣列與均值的偏差平方和
        '''
        return np.sum((array - np.mean(array)) ** 2)

    def bestSplitCat(self, X, y, index):
        '''

        節點使用分類變數特徵最優劃分方法
        :param X: 特徵矩陣
        :param y: 標籤向量
        :param index:劃分特徵所在列索引
        :return: node類例項,使用最優劃分後的節點
        '''

        data = np.hstack([X, y])
        node = DTNode(data)
        node.best_feature = index
        node.feature_type = 'Categorical'
        diff_val = np.unique(data[:, index])

        # 如果分類變數只有一個值,那麼將節點標記為葉節點並返回
        if len(diff_val) == 1:
            node.leaf = True
            return node
        # 如果標籤資料只有一個值,也就是偏差平方和為0,標記為葉節點並返回
        if len(np.unique(y)) == 1:
            node.leaf = True
            return node
        # 遍歷所有可能的二劃分方法,如果劃分後的偏差平方和小於當前節點偏差平方和,則更新節點資訊
        for i in range(1, 2 ** len(diff_val)-1):
            bin_i = bin(i)[2:]
            len_i = len(bin_i)
            num_zero = len(diff_val) - len_i
            if num_zero < 0:
                num_zero = 0
            final_i = '0' * num_zero + bin_i
            chosen_index = np.array([int(j) for j in final_i])
            chosen_value = diff_val[chosen_index.astype(bool)]
            filter_ = np.in1d(X[:, index], chosen_value)
            left_node = data[filter_]
            right_node = data[~filter_]
            SE = self.calSE(left_node[:, -1]) + self.calSE(right_node[:, -1])
            if SE < node.se_split:
                node.left_node = DTNode(left_node)
                node.right_node = DTNode(right_node)
                # 對於分類變數,judge_value第一個值是左節點中的值,第二個是右節點中的值
                node.judge_value = [chosen_value,
                                    np.setdiff1d(diff_val, chosen_value)]
                node.se_split = SE
        # 如果劃分後偏差平方和仍然大於等於節點偏差平方和,那麼標記為葉節點
        if node.se_split >= node.se_node:
            node.leaf = True
        return node

    def bestSplitCon(self, X, y, index):
        '''
        連續變數特徵的最優劃分方法
        :param X: 特徵矩陣
        :param y: 標籤向量
        :param index: 待劃分特徵所在列索引
        :return:node類例項,使用劃分後的最優節點
        '''
        data = np.hstack([X, y])
        node = DTNode(data)
        node.best_feature = index
        node.feature_type = 'Continuous'
        feature_data = X[:, index]
        sorted_feature = sorted(feature_data)
        thresholds = [np.mean([sorted_feature[j], sorted_feature[j+1]])
                      for j in range(len(sorted_feature)-1)]
        if len(np.unique(feature_data)) == 1:
            node.leaf = True
            return node
        for thresh in thresholds:
            filter_ = feature_data > thresh
            left_node = data[filter_]
            right_node = data[~filter_]
            SE = self.calSE(left_node[:, -1]) + self.calSE(right_node[:, -1])
            if SE < node.se_split:
                node.left_node = DTNode(left_node)
                node.right_node = DTNode(right_node)
                node.se_split = SE
                node.judge_value = thresh
        return node

    def fit_regression_tree(self, node_data, chosen_columns=None):
        '''
        訓練一顆決策樹
        :param node_data: 輸入的特徵取樣後的資料
        :param chosen_columns: 特徵取樣的列索引,預測時使用
        :return:node類物件例項,決策樹根節點
        '''
        X = np.array(node_data[:, :-1]).reshape(len(node_data), -1)
        y = np.array(node_data[:, -1]).reshape(len(node_data), -1)
        data = np.hstack([X, y])
        node = DTNode(data)
        node.chosen_columns = chosen_columns
        if len(node_data) == 1:
            node.leaf = True
            return node
        if node.se_node < node.data.shape[0] * self.min_mse:
            node.leaf = True
            return node
        if len(np.unique(y)) == 1:
            node.leaf = True
            return node
        if len(np.unique(X, axis=0).reshape(-1, X.shape[1])) == 1:
            node.leaf = True
            return node
        for i in range(X.shape[1]):
            if self.isCategorical(X[:, i]):
                n = self.bestSplitCat(X, y, i)
                if n.se_split < node.se_split:
                    node = n
                    node.chosen_columns = chosen_columns
            else:
                n = self.bestSplitCon(X, y, i)
                if n.se_split < node.se_split:
                    node = n
                    node.chosen_columns = chosen_columns
        # if node.se_split == np.inf:

        if node.leaf is True:
            return node
        node.left_node = self.fit_regression_tree(node.left_node.data)
        node.right_node = self.fit_regression_tree(node.right_node.data)
        return node

    def fit_forest(self, ind):
        data = self.bootstrap(self.data)
        data, columns = self.choose_feature(data)
        return self.fit_regression_tree(data, columns)

    def fit(self):
        with Pool() as p:
            forests = p.map(self.fit_forest, list(range(self.n_estimators)))
        self.forests = forests

    def tree_predict(self, vec, node):
        if node.leaf is True:
            return node.value
        else:
            if node.feature_type == 'Categorical':
                if vec[node.best_feature] in node.judge_value[0]:
                    next_node = node.left_node
                else:
                    next_node = node.right_node
            elif vec[node.best_feature] > node.judge_value:
                next_node = node.left_node
            else:
                next_node = node.right_node
            return self.tree_predict(vec, next_node)

    def predict_vec(self, vec):
        vec = np.array(vec)
        l = []
        for node in self.forests:
            vec_ = vec[node.chosen_columns[:-1]]
            l.append(self.tree_predict(vec_, node))
        return np.mean(l)

    def predict(self, data):
        if len(data.shape) == 1:
            return self.predict_vec(data)
        else:
            r = []
            for i in data:
                r.append(self.predict_vec(i))
            return r


if __name__ == '__main__':
    diabetes = load_diabetes()
    X = diabetes.data
    y = diabetes.target
    rf = sklearn.ensemble.RandomForestRegressor().fit(X, y)
    pre = rf.predict(X)
    score = mean_squared_error(pre, y)
    print(f"sklearn的MSE:{score}")
    print(f"sklearn的可決係數:{r2_score(pre,y)}")
    print()
    rfm = RandomForestRegressor(X, y, 20)
    rfm.fit()
    p = rfm.predict(X)
    print(f"自寫的MSE:{mean_squared_error(p, y)}")
    print(f"自寫的可決係數:{r2_score(p,y)}")
sklearn的MSE:483.6802142533937
sklearn的可決係數:0.8779661780599455

自寫的MSE:730.1600067207331
自寫的可決係數:0.7926145660075448

Sklearn實現

隨機森林分類的一個例子

  下面的例子是一個使用隨機森林進行分類的示例:

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
#在two_moon資料集上構造5棵樹組成的隨機森林
import seaborn as sns
import mglearn

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split

X,y = make_moons(n_samples=100,noise=0.25,random_state=3)
X_train,X_test,y_train,y_test = train_test_split(X,y,stratify=y,random_state=42)

RFC = RandomForestClassifier(n_estimators=100,random_state=2).fit(X_train,y_train)

# 將每棵樹及總預測視覺化
fig,axes = plt.subplots(2,3,figsize=(20,10))
for i,(ax,tree) in enumerate(zip(axes.ravel(),RFC.estimators_)):
    ax.set_title('Tree{}'.format(i))
    mglearn.plots.plot_tree_partition(X_train,y_train,tree,ax=ax)
mglearn.plots.plot_2d_separator(RFC,X_train,fill=True,ax=axes[-1,-1],alpha=.4)
axes[-1,-1].set_title('Random Forest')
mglearn.discrete_scatter(X_train[:,0],X_train[:,1],y_train)
plt.show()


  可以看出,隨機森林也是實現了曲線邊界

使用oob

# 在乳腺癌資料上構建隨機森林
from sklearn.datasets import load_breast_cancer

cancer = load_breast_cancer()
X_train,X_test,y_train,y_test=train_test_split(cancer.data,cancer.target,stratify=cancer.target,random_state=0)

RFC = RandomForestClassifier(n_estimators=100,max_features=5,oob_score=True,random_state=88).fit(X_train,y_train)
print('訓練精度:{}'.format(RFC.score(X_train,y_train)))
print('測試精度:{}'.format(RFC.score(X_test,y_test)))
print('袋外分數:{}'.format(RFC.oob_score_))
訓練精度:1.0
測試精度:0.958041958041958
袋外分數:0.9624413145539906

特徵重要性

  隨機森林的特徵重要性,sklean使用的是跟之前提到的決策樹的特徵重要性相同,只是對不同樹的特徵重要性取了平均值。

fig = plt.figure(dpi=100)
plt.barh(range(len(RFC.feature_importances_)),RFC.feature_importances_)
plt.yticks(range(len(RFC.feature_importances_)),cancer.feature_names)
plt.xlabel("Feature Importance")
plt.show()


隨機森林的推廣

extra tree

  Extra Tree與Random Forest十分類似,區別就是Extra Tree不使用boostrap對樣本進行隨機取樣,但是,在選擇特徵的時候,隨機在所有特徵中選擇一個進行特徵劃分。這進一步增強了樹之間的獨立性,但是會導致偏差增大。

from sklearn.ensemble import ExtraTreesClassifier
ETC = ExtraTreesClassifier(n_estimators=100,random_state=88).fit(X_train,y_train)
print('訓練精度:{}'.format(ETC.score(X_train,y_train)))
print('測試精度:{}'.format(ETC.score(X_test,y_test)))
訓練精度:1.0
測試精度:0.9370629370629371

TRTE(Totally Random Trees Embedding,資料轉換方法)

  TRTE是一種非監督的資料轉換方法。其原理是,比如,訓練10棵樹,每棵樹都有5個葉節點,對於一條樣本,在某棵樹中被分配到第3個葉節點中,那這個樣本點在這棵樹的表示就是(0,0,1,0,0),使用One-Hot表示,然後,將10棵樹的樣本向量拼接起來,即得到轉換後的樣本向量。與FB經典的gbdt+lr演算法十分類似。

  如下所示,選用了10棵樹構建的隨機森林,由於沒有限制最大葉子節點數,因此,不同樹的葉節點數量不同,X_train原始為\(426\times30\)的矩陣,轉換後變成了\(426*176\)的矩陣。sklearn中RandomTreesEmbedding類在fit時沒有y,它的y是使用np.uniform生成的在0-1上的服從均勻分佈的向量,長度跟X相同。也就是說,在構建樹時,隨機挑選特徵構建。

from sklearn.ensemble import RandomTreesEmbedding

display(X_train.shape)

RTE = RandomTreesEmbedding(n_estimators=10,random_state=50).fit(X_train)
r = RTE.transform(X_train)
r
(426, 30)


<426x176 sparse matrix of type '<class 'numpy.float64'>'
	with 4260 stored elements in Compressed Sparse Row format>

Isolation Forest(iForest,異常檢測演算法)

  iFroest基於這樣一種思想,如下圖所示,對於非異常點\(x_i\),周圍的資料點比較多,使用決策樹對其進行劃分,當它進入葉節點時,通常需要經過更多的層數。而異常點,如下\(x_o\),在比較淺層的時候就被分入了葉節點。因此,所在葉節點越“淺”,越有可能是異常點。

  對於某樣本點,其為異常值的概率可以表示為:

\[s(x,m) = 2^{-\frac{h(x)}{c(m)}} \]

其中:

\[c(m) = 2H(m-1)-\frac{2(m-1)}n \]

H(i)可近似估算:

\[H(i) = ln(i)+\xi ;\xi為尤拉常數 \]

\(h(x)\)——樣本x的葉節點所在樹深均值

\(c(m)\)——樣本量為m的資料集,樣本點所在葉節點深度的均值

from sklearn.ensemble import IsolationForest
X = [[-1.1], [0.3], [0.5], [100]]
clf = IsolationForest(random_state=0).fit(X)
clf.predict([[0.1], [0], [90]])
array([ 1,  1, -1])