機器學習實戰---決策樹CART迴歸樹實現
機器學習實戰---決策樹CART簡介及分類樹實現
一:對比分類樹
CART迴歸樹和CART分類樹的建立演算法大部分是類似的,所以這裡我們只討論CART迴歸樹和CART分類樹的建立演算法不同的地方。
首先,我們要明白,什麼是迴歸樹,什麼是分類樹。
兩者的區別在於樣本輸出:
如果樣本輸出是離散值,那麼這是一顆分類樹。
如果果樣本輸出是連續值,那麼那麼這是一顆迴歸樹。
除了概念的不同,CART迴歸樹和CART分類樹的建立和預測的區別主要有下面兩點:
1)連續值的處理方法不同
2)決策樹建立後做預測的方式不同。
對於連續值的處理,我們知道CART分類樹採用的是用基尼係數的大小來度量特徵的各個劃分點的優劣情況,這比較適合分類模型。
但是對於迴歸模型,我們使用了常見的和方差的度量方式。
CART迴歸樹的度量目標是,對於任意劃分特徵A,對應的任意劃分點s兩邊劃分成的資料集D1和D2,求出使D1和D2各自集合的均方差最小,同時D1和D2的均方差之和最小所對應的特徵和特徵值劃分點。
表示式為:
其中,c1為D1資料集的樣本輸出均值,c2為D2資料集的樣本輸出均值。
對於決策樹建立後做預測的方式,上面講到了CART分類樹採用葉子節點裡概率最大的類別作為當前節點的預測類別。而回歸樹輸出不是類別,它採用的是用最終葉子的均值或者中位數來預測輸出結果。
除了上面提到了以外,CART迴歸樹和CART分類樹的建立演算法和預測沒有什麼區別。
二:迴歸樹的實現
(一)實現葉子節點均值計算
def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y)
(二)實現計算資料集總方差
def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size #np.var是求解平均誤差,我們這裡需要總方差進行比較
(三)實現資料集切分
def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx= np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx]
(四)實現選取最優特徵及特徵值(含預剪枝處理)
def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果
(五)實現決策樹建立
def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree
(六)資料集載入及測試
import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y
data_X,data_Y = loadDataSet("ex0.txt") print(createTree(data_X,data_Y))
結果顯示:
{'feaIdx': 1, 'feaVal': 0.39435, 'left': { 'feaIdx': 1, 'feaVal': 0.582002, 'left': { 'feaIdx': 1, 'feaVal': 0.797583, 'left': 3.9871632, 'right': 2.9836209534883724 }, 'right': 1.980035071428571 }, 'right': { 'feaIdx': 1, 'feaVal': 0.197834, 'left': 1.0289583666666666, 'right': -0.023838155555555553 } }
(七)全部程式碼
import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree data_X,data_Y = loadDataSet("ex0.txt") print(createTree(data_X,data_Y))View Code
三:樹剪枝
一棵樹如果節點過多,表示該模型可能對資料進行了過擬合(使用測試集交叉驗證法即可),這時就需要我們進行剪枝處理,避免過擬合
(一)預剪枝
前面建立決策樹過程中,我們已經進行了預剪枝操作。即設定的ops引數,包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數。用於在建立決策樹過程中進行預剪枝操作。
下面例項中,檢視ops引數設定對剪枝的影響:
import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree決策樹建立函式
1.預設引數ops(1,4)---表示誤差大於1,樣本數大於4的劃分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(1,4)))
出現大量樹分叉,過擬合
3.設定引數ops(1000,4)---表示誤差大於1000,樣本數大於4的劃分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(1000,4)))
擬合狀態還不錯。
3.設定引數ops(10000,4)---表示誤差大於10000,樣本數大於4的劃分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(10000,4)))
有點欠擬合。
(二)後剪枝
後剪枝通常比預剪枝保留更多的分支,欠擬合風險小。但是後剪枝是在決策樹構造完成後進行的,其訓練時間的開銷會大於預剪枝。
後剪枝是基於已經建立好的樹,進行的葉子節點合併操作。
使用後剪枝方法需要將資料集分為測試集和訓練集。通過訓練集和引數ops使用預剪枝方法構建決策樹。然後使用構建的決策樹和測試集資料進行後剪枝處理
後剪枝演算法實現:
#開啟後剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是資料 def getMean(tree): #獲取當前樹的合併均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞迴將這棵樹進行合併返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集資料進行後剪枝處理,不能按照訓練集進行後剪枝,因為建立決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集資料為空,則不需要後面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這裡我們先將測試集資料劃分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這裡是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這裡是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合併 if not isTree(tree['left']) and not isTree(tree['right']): #先劃分測試集資料 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合併的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合併後的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #列印提示資訊 return treemean #返回合併後的塌陷值 else: return tree #不進行合併,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合併情況由3可以知道
後剪枝演算法測試:
data_X,data_Y = loadDataSet("ex2.txt") myTree = createTree(data_X,data_Y,ops=(0,1)) #設定0,1表示不進行預剪枝,我們只對比後剪枝 print(myTree) Testdata_X,Testdata_Y = loadDataSet("ex2test.txt") #獲取測試集,開始進行後剪枝 myTree2 = prune(myTree,Testdata_X,Testdata_Y) print(myTree2)
可以看到進行了大量的剪枝操作!
import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟後剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是資料 def getMean(tree): #獲取當前樹的合併均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞迴將這棵樹進行合併返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集資料進行後剪枝處理,不能按照訓練集進行後剪枝,因為建立決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集資料為空,則不需要後面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這裡我們先將測試集資料劃分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這裡是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這裡是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合併 if not isTree(tree['left']) and not isTree(tree['right']): #先劃分測試集資料 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合併的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合併後的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #列印提示資訊 return treemean #返回合併後的塌陷值 else: return tree #不進行合併,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合併情況由3可以知道 data_X,data_Y = loadDataSet("ex2.txt") myTree = createTree(data_X,data_Y,ops=(0,1)) #設定0,1表示不進行預剪枝,我們只對比後剪枝 print(myTree) Testdata_X,Testdata_Y = loadDataSet("ex2test.txt") #獲取測試集,開始進行後剪枝 myTree2 = prune(myTree,Testdata_X,Testdata_Y) print(myTree2)全部程式碼
四:模型樹實現
(一)實現模型樹葉節點生成函式和誤差計算函式
import numpy as np import matplotlib.pyplot as plt def linearSolve(data_X,data_Y): X = np.c_[np.ones(data_X.shape[0]), data_X] XTX = X.T @ X if np.linalg.det(XTX) == 0: raise NameError("this matrix can`t inverse") W = np.linalg.inv(XTX) @ (X.T @ data_Y) return W,X,data_Y def modelLeaf(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) return W def modelErr(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) yPred = X@W return sum(np.power(yPred-data_Y,2))
(二)修改原有函式
def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_X,data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_X,data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_X,dataLg_Y)+errType(dataGt_X,dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_X,data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_X,data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟後剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是資料 def getMean(tree): #獲取當前樹的合併均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞迴將這棵樹進行合併返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集資料進行後剪枝處理,不能按照訓練集進行後剪枝,因為建立決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集資料為空,則不需要後面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這裡我們先將測試集資料劃分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這裡是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這裡是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合併 if not isTree(tree['left']) and not isTree(tree['right']): #先劃分測試集資料 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合併的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合併後的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #列印提示資訊 return treemean #返回合併後的塌陷值 else: return tree #不進行合併,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合併情況由3可以知道
(三)測試函式
data_X,data_Y = loadDataSet("exp2.txt") myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,10)) #設定0,1表示不進行預剪枝,我們只對比後剪枝 print(myTree) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
五:實現迴歸樹預測,對比決策樹和線性迴歸
由於我上面沒有很好的處理迴歸樹和模型樹的引數保持一致性,所以這裡我對每一個預測使用不同程式碼(就是同上面一樣,各自改變了引數,也可以該一下即可)
(一)實現決策樹--迴歸樹和模型樹預測函式
#實現預測迴歸樹 def regTreeEval(model,data_X): #對於迴歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集資料進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得迴歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞迴預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特徵上的值大於決策樹特徵值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集資料預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred
(三)測試迴歸樹預測結果和測試集標籤相關性(R2越接近1越好)
import numpy as np import matplotlib.pyplot as plt def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟後剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是資料 def getMean(tree): #獲取當前樹的合併均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞迴將這棵樹進行合併返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集資料進行後剪枝處理,不能按照訓練集進行後剪枝,因為建立決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集資料為空,則不需要後面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這裡我們先將測試集資料劃分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這裡是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這裡是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合併 if not isTree(tree['left']) and not isTree(tree['right']): #先劃分測試集資料 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合併的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合併後的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #列印提示資訊 return treemean #返回合併後的塌陷值 else: return tree #不進行合併,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合併情況由3可以知道 #實現預測迴歸樹 def regTreeEval(model,data_X): #對於迴歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集資料進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得迴歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞迴預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特徵上的值大於決策樹特徵值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集資料預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集資料 myTree = createTree(data_X,data_Y,ops=(1,20)) #訓練集資料建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集資料 yPred = createForecast(myTree,testData_X) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()全部程式碼
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集資料 myTree = createTree(data_X,data_Y,ops=(1,20)) #訓練集資料建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集資料 yPred = createForecast(myTree,testData_X) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
(四)測試模型樹預測結果和測試集標籤相關性(R2越接近1越好)
import numpy as np import matplotlib.pyplot as plt def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標籤均值表示迴歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為劃分依據 return np.var(data_Y)*data_Y.size def linearSolve(data_X,data_Y): X = np.c_[np.ones(data_X.shape[0]), data_X] XTX = X.T @ X if np.linalg.det(XTX) == 0: raise NameError("this matrix can`t inverse") W = np.linalg.inv(XTX) @ (X.T @ data_Y) return W,X,data_Y def modelLeaf(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) return W def modelErr(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) yPred = X@W return sum(np.power(yPred-data_Y,2)) def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行資料集劃分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回撥方式呼叫葉節點計算和誤差計算,函式中含有預剪枝操作 :param data_X: 傳入資料集 :param data_Y: 傳入標籤值 :param leafType: 要呼叫計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函式,這裡是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要資訊, tolS tolN用於控制函式的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續劃分子樹放入createTree方法中,這裡可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標籤值全部相同,則返回特徵None表示不需要進行下一步劃分,返回葉節點 return None,leafType(data_X,data_Y) #遍歷獲取最優特徵和特徵值 TosErr = errType(data_X,data_Y) #獲取全部資料集的誤差,後面計算劃分後兩個子集的總誤差,如果誤差下降小於tolS,則不進行劃分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這裡兩個我們設定為0,而不是-1,因為我們必須保證可以取到一個特徵(後面迴圈可能一直continue),我們需要在後面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特徵 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #資料集劃分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小資料集,不進行計算 continue concErr = errType(dataLg_X,dataLg_Y)+errType(dataGt_X,dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最後求解的誤差,小於我們要求的誤差距離,則不進行下一步劃分資料集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_X,data_Y) #3.如果我們上面的資料集本身較小,則無論如何切分,資料集都<tolN,我們就需要在這裡再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 資料集劃分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小資料集,不進行計算 return None,leafType(data_X,data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立迴歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞迴建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 資料集劃分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟後剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是資料 def getMean(tree): #獲取當前樹的合併均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞迴將這棵樹進行合併返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集資料進行後剪枝處理,不能按照訓練集進行後剪枝,因為建立決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集資料為空,則不需要後面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這裡我們先將測試集資料劃分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這裡是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這裡是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合併 if not isTree(tree['left']) and not isTree(tree['right']): #先劃分測試集資料 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合併的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合併後的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #列印提示資訊 return treemean #返回合併後的塌陷值 else: return tree #不進行合併,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合併情況由3可以知道 #實現預測迴歸樹 def regTreeEval(model,data_X): #對於迴歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集資料進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得迴歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞迴預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特徵上的值大於決策樹特徵值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集資料預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集資料 myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,20)) #訓練集資料建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集資料 yPred = createForecast(myTree,testData_X,modelTreeEval) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()全部程式碼
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集資料 myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,20)) #訓練集資料建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集資料 yPred = createForecast(myTree,testData_X,modelTreeEval) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
可以看到模型樹優於迴歸樹
(五)一般線性迴歸
利用我們上面實現的linearSolve方法,獲取訓練集的引數向量權重即可!!
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集資料 testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集資料 W,X,Y = linearSolve(data_X,data_Y) yPred2 = np.zeros((testData_X.shape[0],1)) testDX = np.c_[np.ones(testData_X.shape[0]),testData_X] for i in range(testData_X.shape[0]): yPred2[i] = testDX[i]@W print(np.corrcoef(yPred2,testData_Y,rowvar=0)[0,1])
所以,樹迴歸方法在預測複雜資料時,會比簡單的線性模型更加有效