機器學習:決策樹過擬合與剪枝,決策樹程式碼實現(三)
文章目錄
楔子
上次講到:至此node類的變數和方法基本實現完畢,為什麼說基本呢,因為真正的後剪枝還沒講,他還需要在node類裡新增一些方法。這一次來講一下後剪枝。
首先,後剪枝是對整個生成樹操作,我們給整個樹的操作定義一個基類,定義一個新類就涉及到:變數和方法
Tree 結構需要做到如下幾點:
定義好需要在各個 Node 上呼叫的“全域性變數” 做好資料預處理的工作、保證傳給 Node 的資料是合乎要求的 對各個 Node 進行合適的封裝,做到: 生成決策樹時能夠正確地呼叫它們的生成演算法 進行後剪枝時能夠正確地呼叫它們的區域性剪枝函式 定義預測函式和評估函式以供使用者呼叫
變數
首先,我們思考一下,我們整體考慮生成樹,並對樹進行操作,我們需要操作哪些物件:
1、我們需要剪枝,就需要對結點操作,在這裡我們不好每次都遍歷樹一遍,我們把所有的node存下來專門處理,self.nodes = []
2、每個node都有一個可選features的列表,但是選中某個feature之後,遍歷featureValue時,在node裡面沒有變數定義,在全域性變數裡面定義一個,所有features的featureValue的變數:self.feature_sets;同樣的道理各個特徵的維度是否連續也是如此:self.whether_continuous
3、剪枝屬於全域性的操作,變數也應該是全域性的:限制樹的深度:self.max_depth;CART種需要處理p顆生成樹:self.roots
4、還有一個最終要的變數,就是樹的根:self.root
from copy import deepcopy
from Node import *
import numpy as np
class CvBase:
def __init__(self,max_depth= None, node= None):
# self.nodes:記錄所有node的列表
self.nodes = []
# self.roots:主要用於CART屬性,儲存演算法過程中的各個決策樹
self.roots = []
# self.max_depth:用於記錄決策樹的最大深度
self.max_depth = max_depth
# self.root: 根節點
self.root = node
# self.feature_sets:用於記錄可選特徵維度的列表
self.feature_sets = []
# self.label_dic:類別的轉換字典
self.label_dic = {}
# self.prune_alpha,self.layers:ID3和C4.5剪枝的兩個屬性
self.prune_alpha = 1
# 前者是懲罰因子,後者是記錄每一層的node
# self.whether_continuous:記錄各維度特徵是否是連續的
self.whether_continuous = None
def __str__(self):
return "CvTree ({})".format(self.root.height)
__repr__ = __str__
方法
資料預處理
自動判斷哪些features為連續的;初始化樹的全域性變數
def feed_data(self, x, continuous_rate = 0.2):
# continuous_rate用於判斷該維度是否是連續的
# 利用set獲取各個維度的特徵可能取值
self.feature_sets = [set(dimension) for dimension in x.T]
data_len, data_dim = x.shape
# 判斷是否連續
self.whether_continuous = np.array(
[len(feat) >= continuous_rate*data_len for feat in self.feature_sets])
# 根節點可選的劃分特徵維度
self.root.feats = [i for i in range(x.shape[1])]
# 把
self.root.feed_tree(self)
最後一行我們對根節點呼叫了feed_tree方法,該方法會做以下三件事:
讓決策樹中所有的 Node 記錄一下它們所屬的 Tree 結構
將自己記錄在 Tree 中記錄所有 Node 的列表nodes裡
根據 Tree 的相應屬性更新記錄連續特徵的列表
# 栽樹,會做三件事
# 決策樹所有node記錄他們屬於哪一顆樹
# 把所有結點儲存到self.tree.nodes
# 更新每一個結點的特徵是否連續的列表
def feed_tree(self, tree):
self.tree = tree
self.tree.nodes.append(self)
self.wc = tree.whether_continuous
for child in self.children.values():
if child is not None:
child.feed_tree(tree)
剪枝
剪枝時,需要獲取所有的非葉子結點,為待剪集,從底層像高層一層一層的剪枝。
獲取待剪集:
# =============================================================================
# # 定義Prune
# 因為是後剪枝是針對全域性的考慮,要決定那些結點需要剪枝,然後再呼叫結點的剪枝
# =============================================================================
# 獲取每一層的結點self.layers:[depth,node_lst] = node
def _update_layers(self):
self.layers = [[] for _ in range(self.root.height)]
self.root.update_layers()
# Util
# 獲取以當前結點為根的樹的每一層結點列表
def update_layers(self):
self.tree.layers[self._depth].append(self)
for node in sorted(self.children):
node = self.children[node]
if node is not None:
node.update_layers()
針對ID3,C4.5的剪枝
損失函式的設計
# 新的損失函式,當未剪枝時損失,已剪枝或者葉子的損失
def cost(self, pruned=False):
if not pruned:
return sum([leaf["chaos"] * len(leaf["y"]) for leaf in self.leafs.values()])
return self.chaos * len(self._y)
# node.cost() + self.prune_alpha * len(node.leafs)
基於該損失函式的演算法描述
基於該損失函式的程式碼實現
# 離散資料的剪枝函式
def _prune(self):
# 獲取生成樹每一層的結點,每一層結點按照其劃分feature順序排列
self._update_layers()
# 用於儲存所有的非葉子結點,為待剪枝結點,儲存順序前面的靠近底部,後面的靠近根部
tmp_nodes = []
append = tmp_nodes.append
for node_lst in self.layers[::-1]:
for node in node_lst[::-1]:
if node.category is None:
append(node)
# 剪枝的新損失函式 = 各個葉子不確定度*葉子樣本數量加權和 + alpha*葉子個數
# old為剪枝前的損失函式,所有的待剪枝結點的剪枝前的損失函式
old = np.array([node.cost() + self.prune_alpha * len(node.leafs) for node in tmp_nodes])
# 假如進行剪枝後,當前結點變成葉子,損失函式 = 當前結點的不確定度*樣本個數 + alpha*1
new = np.array([node.cost(pruned=True) + self.prune_alpha for node in tmp_nodes])
# 根據這個得到待剪枝的結點mask
mask = old >= new
while True:
# 剪到根時退出
if self.root.height == 1:
break
# 獲取最深的待剪枝的結點,從下往上的剪枝,取的是第一個True,前面都是靠近底部的結點
p = np.argmax(mask) # type: int
# 判斷一下是否是可剪枝的,每次剪枝之後,會影響上層的結點,可能Ture變成了False,
# 最後一次時裡面,裡面可能全部都是False
if mask[p]:
# 對這個結點剪枝,該做的操作在結點裡面都操作了,裡面還有一項操作
# 就是剪枝該結點,會對那些結點有影響,就是他的祖宗們,已標記node.affecte
tmp_nodes[p].prune()
# 遍歷所有的待剪枝結點,挑出被當前結點影響的結點
for i, node in enumerate(tmp_nodes):
if node.affected:
# 更新那些結點的損失函式
old[i] = node.cost() + self.prune_alpha * len(node.leafs)
# 再次判斷是否需要被剪枝,new是不會變的他只和樣本有關
mask[i] = old[i] >= new[i]
# 重置一下,以免下次也更新他了
node.affected = False
# 把標記為已剪枝的結點從待剪枝結點列表刪除,當前結點也是標記為已剪枝的
# 他已經變成葉子結點,葉子結點是不在待剪枝列表的
for i in range(len(tmp_nodes) - 1, -1, -1):
if tmp_nodes[i].pruned:
tmp_nodes.pop(i)
old = np.delete(old, i)
new = np.delete(new, i)
mask = np.delete(mask, i)
# 假如待剪枝列表沒有可剪枝的也退出
else:
break
# 剪枝完畢之後,新的生成樹,更新一下,這棵樹的nodes列表,把前面刪除的葉子都刪除掉
# 前後的剪枝函式主要處理的是leafs,沒有處理nodes,所以最後處理一下。
self.reduce_nodes()
針對CART的剪枝
損失函式的設計
這個的設計思想是,隨著懲罰因子alpha從0到大不斷增加,結點被一個一個剪掉,每剪掉一顆都是一棵樹儲存起來,最後只剩下root,形成了p棵樹,求p棵樹裡面的最優樹。
每一個結點都有一個alpha的閾值,超過了這個閾值,該節點就可以被剪掉。
閾值的實現:
# 獲取該節點的閾值,就是懲罰因子有多大時,就輪到這個結點被剪掉了,
# 當然這個可能會隨著一些結點被剪掉而變化,
# 隨著懲罰因子的變大,結點會一個一個剪掉,知道只剩下根
def get_threshold(self):
return (self.cost(pruned=True) - self.cost()) / (len(self.leafs) - 1)
# 說初始化整顆樹的self.tree值,這棵樹的每個結點屬於哪棵樹
基於該損失函式的演算法描述
基於該損失函式的程式碼實現
獲得p顆生成樹
# CART的剪枝處理
def _cart_prune(self):
# 初始化整顆樹的self.tree值,這棵樹的每個結點屬於哪棵樹
self.root.cut_tree()
# 獲取待剪枝的結點列表,也就是非葉子結點
tmp_nodes = [node for node in self.nodes if node.category is None]
# 計算這些候選集的閾值
thresholds = np.array([node.get_threshold() for node in tmp_nodes])
while True:
# 理論上我們需要記錄p棵樹,然後在p顆樹裡找最好的那棵樹,
# 因此我們需要深度copy原始樹,在此基本上剪枝,每次形成不同的樹
root_copy = deepcopy(self.root)
# self.roots用於記錄產生的p棵樹,先把原始樹存進來
self.roots.append(root_copy)
# 出口,只剩根結點了,p棵樹產生完畢
if self.root.height == 1:
break
# 取閾值最低的結點,那個結點第一個被剪
p = np.argmin(thresholds) # type: int
# 下面的處理和離散處理一致
tmp_nodes[p].prune()
# 剪掉之後,看哪些結點受影響了,更新受影響的結點
for i, node in enumerate(tmp_nodes):
if node.affected:
# 對於受影響的結點,更新一下閾值
thresholds[i] = node.get_threshold()
node.affected = False
pop = tmp_nodes.pop
for i in range(len(tmp_nodes) - 1, -1, -1):
if tmp_nodes[i].pruned:
pop(i)
thresholds = np.delete(thresholds, i)
self.reduce_nodes()
選取最優生成樹
# 定義選擇那個樹最優的標準,使用加權正確率作為交叉驗證的標準
def acc(self, y, y_pred, weights):
if weights is not None:
return np.sum((np.array(y) == np.array(y_pred))*weights) /len(y)
return np.sum(np.array(y) == np.array(y_pred)) /len(y)
# 後剪枝是通過比較每棵樹在驗證集上的表現來找出最優樹
def prune(self, x_cv, y_cv, weights):
if self.root.is_cart:
if x_cv is not None and y_cv is not None:
self._cart_prune()
# 選出最優的子樹
arg = np.argmax([self.acc(y_cv, tree.predict(x_cv), weights) for tree in self.roots]) # type: int
tar_root = self.roots[arg]
self.nodes = []
# 更新一下樹的相關資訊,所屬tree,所有的nodes
tar_root.feed_tree(self)
# 把指標給root
self.root = tar_root
else:
self._prune()
整個流程處理fit():
方法都有了下面就開始整個操作流程:準備資料,資料預處理,生成樹,剪枝
# =============================================================================
# 引數alpha和剪枝有關;cv_rate用於控制交叉驗證集大小;train_only是否進行資料集切分
def fit(self,x,y,alpha= None, sample_weight= None, eps= 12-8, cv_rate= 0.2, train_only= False):
# 數值化類別向量
_dic = {c:i for i,c in enumerate(set(y))}
# 將y數值化
y = np.array([_dic[yy] for yy in y])
# 儲存ID-->class對映,這樣才可以反向找回去
self.label_dic = {value:key for key,value in _dic.items()}
# 如果x為非數值的,也需要數值化
x = np.array(x)
# 根據特徵個數給出alpha
self.prune_alpha = alpha if alpha is not None else x.shape[1]/2
# 劃分資料集
if not train_only and self.root.is_cart:
# 利用下標實現各種切分
_train_num = int(len(x)*(1-cv_rate))
# 相當於打亂了順序
_indices = np.random.permutation(np.arange(len(x)))
_train_indices = _indices[:_train_num]
_test_indices = _indices[_train_num:]
# 針對樣本權重的處理
if sample_weight is not None:
# 切分後的樣本權重需要做歸一化處理
_train_weight = sample_weight[_train_indices]
_test_weight = sample_weight[_test_indices]
# 歸一化
_train_weight /= np.sum(_train_weight)
_test_weight /= np.sum(_test_weight)
else:
_train_weight = _test_weight = None
x_train, y_train = x[_train_indices],y[_train_indices]
x_cv, y_cv = x[_test_indices],y[_test_indices]
else:
x_train, y_train, _train_weight = x, y, sample_weight
x_cv = y_cv = _test_weight = None
# 資料預處理
self.feed_data(x_train)
# 呼叫根節點的生成演算法
self.root.fit(x_train, y_train, _train_weight, eps)
# 呼叫對node的剪枝演算法的封裝
self.prune(x_cv, y_cv, _test_weight)
# 定義刪除結點方法,從後往前刪除,這樣就可以使用pop
def reduce_nodes(self):
for i in range(len(self.nodes)-1, -1, -1):
if self.nodes[i].pruned:
self.nodes.pop(i)