1. 程式人生 > 程式設計 >python 決策樹演算法的實現

python 決策樹演算法的實現

'''
資料集:Mnist
訓練集數量:60000
測試集數量:10000
------------------------------
執行結果:ID3(未剪枝)
  正確率:85.9%
  執行時長:356s
'''

import time
import numpy as np


def loadData(fileName):
  '''
  載入檔案
  :param fileName:要載入的檔案路徑
  :return: 資料集和標籤集
  '''
  # 存放資料及標記
  dataArr = [];
  labelArr = []
  # 讀取檔案
  fr = open(fileName)
  # 遍歷檔案中的每一行
  for line in fr.readlines():
    # 獲取當前行,並按“,”切割成欄位放入列表中
    # strip:去掉每行字串首尾指定的字元(預設空格或換行符)
    # split:按照指定的字元將字串切割成每個欄位,返回列表形式
    curLine = line.strip().split(',')
    # 將每行中除標記外的資料放入資料集中(curLine[0]為標記資訊)
    # 在放入的同時將原先字串形式的資料轉換為整型
    # 此外將資料進行了二值化處理,大於128的轉換成1,小於的轉換成0,方便後續計算
    dataArr.append([int(int(num) > 128) for num in curLine[1:]])
    # 將標記資訊放入標記集中
    # 放入的同時將標記轉換為整型
    labelArr.append(int(curLine[0]))
  # 返回資料集和標記
  return dataArr,labelArr


def majorClass(labelArr):
  '''
  找到當前標籤集中佔數目最大的標籤
  :param labelArr: 標籤集
  :return: 最大的標籤
  '''
  # 建立字典,用於不同類別的標籤技術
  classDict = {}
  # 遍歷所有標籤
  for i in range(len(labelArr)):
    # 當第一次遇到A標籤時,字典內還沒有A標籤,這時候直接幅值加1是錯誤的,
    # 所以需要判斷字典中是否有該鍵,沒有則建立,有就直接自增
    if labelArr[i] in classDict.keys():
      # 若在字典中存在該標籤,則直接加1
      classDict[labelArr[i]] += 1
    else:
      # 若無該標籤,設初值為1,表示出現了1次了
      classDict[labelArr[i]] = 1
  # 對字典依據值進行降序排序
  classSort = sorted(classDict.items(),key=lambda x: x[1],reverse=True)
  # 返回最大一項的標籤,即佔數目最多的標籤
  return classSort[0][0]


def calc_H_D(trainLabelArr):
  '''
  計算資料集D的經驗熵,參考公式5.7 經驗熵的計算
  :param trainLabelArr:當前資料集的標籤集
  :return: 經驗熵
  '''
  # 初始化為0
  H_D = 0
  # 將當前所有標籤放入集合中,這樣只要有的標籤都會在集合中出現,且出現一次。
  # 遍歷該集合就可以遍歷所有出現過的標記並計算其Ck
  # 這麼做有一個很重要的原因:首先假設一個背景,當前標籤集中有一些標記已經沒有了,比如說標籤集中
  # 沒有0(這是很正常的,說明當前分支不存在這個標籤)。 式5.7中有一項Ck,那按照式中的針對不同標籤k
  # 計算Cl和D並求和時,由於沒有0,那麼C0=0,此時C0/D0=0,log2(C0/D0) = log2(0),事實上0並不在log的
  # 定義區間內,出現了問題
  # 所以使用集合的方式先知道當前標籤中都出現了那些標籤,隨後對每個標籤進行計算,如果沒出現的標籤那一項就
  # 不在經驗熵中出現(未參與,對經驗熵無影響),保證log的計算能一直有定義
  trainLabelSet = set([label for label in trainLabelArr])
  # 遍歷每一個出現過的標籤
  for i in trainLabelSet:
    # 計算|Ck|/|D|
    # trainLabelArr == i:當前標籤集中為該標籤的的位置
    # 例如a = [1,1],c = (a == 1): c == [True,false,True]
    # trainLabelArr[trainLabelArr == i]:獲得為指定標籤的樣本
    # trainLabelArr[trainLabelArr == i].size:獲得為指定標籤的樣本的大小,即標籤為i的樣本
    # 數量,就是|Ck|
    # trainLabelArr.size:整個標籤集的數量(也就是樣本集的數量),即|D|
    p = trainLabelArr[trainLabelArr == i].size / trainLabelArr.size
    # 對經驗熵的每一項累加求和
    H_D += -1 * p * np.log2(p)

  # 返回經驗熵
  return H_D


def calcH_D_A(trainDataArr_DevFeature,trainLabelArr):
  '''
  計算經驗條件熵
  :param trainDataArr_DevFeature:切割後只有feature那列資料的陣列
  :param trainLabelArr: 標籤集陣列
  :return: 經驗條件熵
  '''
  # 初始為0
  H_D_A = 0
  # 在featue那列放入集合中,是為了根據集合中的數目知道該feature目前可取值數目是多少
  trainDataSet = set([label for label in trainDataArr_DevFeature])

  # 對於每一個特徵取值遍歷計算條件經驗熵的每一項
  for i in trainDataSet:
    # 計算H(D|A)
    # trainDataArr_DevFeature[trainDataArr_DevFeature == i].size / trainDataArr_DevFeature.size:|Di| / |D|
    # calc_H_D(trainLabelArr[trainDataArr_DevFeature == i]):H(Di)
    H_D_A += trainDataArr_DevFeature[trainDataArr_DevFeature == i].size / trainDataArr_DevFeature.size \
         * calc_H_D(trainLabelArr[trainDataArr_DevFeature == i])
  # 返回得出的條件經驗熵
  return H_D_A


def calcBestFeature(trainDataList,trainLabelList):
  '''
  計算資訊增益最大的特徵
  :param trainDataList: 當前資料集
  :param trainLabelList: 當前標籤集
  :return: 資訊增益最大的特徵及最大資訊增益值
  '''
  # 將資料集和標籤集轉換為陣列形式
  # trainLabelArr轉換後需要轉置,這樣在取數時方便
  # 例如a = np.array([1,2,3]); b = np.array([1,3]).T
  # 若不轉置,a[0] = [1,3],轉置後b[0] = 1,b[1] = 2
  # 對於標籤集來說,能夠很方便地取到每一位是很重要的
  trainDataArr = np.array(trainDataList)
  trainLabelArr = np.array(trainLabelList).T

  # 獲取當前特徵數目,也就是資料集的橫軸大小
  featureNum = trainDataArr.shape[1]

  # 初始化最大資訊增益
  maxG_D_A = -1
  # 初始化最大資訊增益的特徵
  maxFeature = -1
  # 對每一個特徵進行遍歷計算
  for feature in range(featureNum):
    # “5.2.2 資訊增益”中“演算法5.1(資訊增益的演算法)”第一步:
    # 1.計算資料集D的經驗熵H(D)
    H_D = calc_H_D(trainLabelArr)
    # 2.計算條件經驗熵H(D|A)
    # 由於條件經驗熵的計算過程中只涉及到標籤以及當前特徵,為了提高運算速度(全部樣本
    # 做成的矩陣運算速度太慢,需要剔除不需要的部分),將資料集矩陣進行切割
    # 資料集在初始時刻是一個Arr = 60000*784的矩陣,針對當前要計算的feature,在訓練集中切割下
    # Arr[:,feature]這麼一條來,因為後續計算中資料集中只用到這個(沒明白的跟著算一遍例5.2)
    # trainDataArr[:,feature]:在資料集中切割下這麼一條
    # trainDataArr[:,feature].flat:將這麼一條轉換成豎著的列表
    # np.array(trainDataArr[:,feature].flat):再轉換成一條豎著的矩陣,大小為60000*1(只是初始是
    # 這麼大,執行過程中是依據當前資料集大小動態變的)
    trainDataArr_DevideByFeature = np.array(trainDataArr[:,feature].flat)
    # 3.計算資訊增益G(D|A)  G(D|A) = H(D) - H(D | A)
    G_D_A = H_D - calcH_D_A(trainDataArr_DevideByFeature,trainLabelArr)
    # 不斷更新最大的資訊增益以及對應的feature
    if G_D_A > maxG_D_A:
      maxG_D_A = G_D_A
      maxFeature = feature
  return maxFeature,maxG_D_A


def getSubDataArr(trainDataArr,trainLabelArr,A,a):
  '''
  更新資料集和標籤集
  :param trainDataArr:要更新的資料集
  :param trainLabelArr: 要更新的標籤集
  :param A: 要去除的特徵索引
  :param a: 當data[A]== a時,說明該行樣本時要保留的
  :return: 新的資料集和標籤集
  '''
  # 返回的資料集
  retDataArr = []
  # 返回的標籤集
  retLabelArr = []
  # 對當前資料的每一個樣本進行遍歷
  for i in range(len(trainDataArr)):
    # 如果當前樣本的特徵為指定特徵值a
    if trainDataArr[i][A] == a:
      # 那麼將該樣本的第A個特徵切割掉,放入返回的資料集中
      retDataArr.append(trainDataArr[i][0:A] + trainDataArr[i][A + 1:])
      # 將該樣本的標籤放入返回標籤集中
      retLabelArr.append(trainLabelArr[i])
  # 返回新的資料集和標籤集
  return retDataArr,retLabelArr


def createTree(*dataSet):
  '''
  遞迴建立決策樹
  :param dataSet:(trainDataList, trainLabelList) <<-- 元祖形式
  :return:新的子節點或該葉子節點的值
  '''
  # 設定Epsilon,“5.3.1 ID3演算法”第4步提到需要將資訊增益與閾值Epsilon比較,若小於則直接處理後返回T
  Epsilon = 0.1
  # 從引數中獲取trainDataList和trainLabelList
  trainDataList = dataSet[0][0]
  trainLabelList = dataSet[0][1]
  # 列印資訊:開始一個子節點建立,列印當前特徵向量數目及當前剩餘樣本數目
  print('start a node',len(trainDataList[0]),len(trainLabelList))

  # 將標籤放入一個字典中,當前樣本有多少類,在字典中就會有多少項
  # 也相當於去重,多次出現的標籤就留一次。舉個例子,假如處理結束後字典的長度為1,那說明所有的樣本
  # 都是同一個標籤,那就可以直接返回該標籤了,不需要再生成子節點了。
  classDict = {i for i in trainLabelList}
  # 如果D中所有例項屬於同一類Ck,則置T為單節點數,並將Ck作為該節點的類,返回T
  # 即若所有樣本的標籤一致,也就不需要再分化,返回標記作為該節點的值,返回後這就是一個葉子節點
  if len(classDict) == 1:
    # 因為所有樣本都是一致的,在標籤集中隨便拿一個標籤返回都行,這裡用的第0個(因為你並不知道
    # 當前標籤集的長度是多少,但執行中所有標籤只要有長度都會有第0位。
    return trainLabelList[0]

  # 如果A為空集,則置T為單節點數,並將D中例項數最大的類Ck作為該節點的類,返回T
  # 即如果已經沒有特徵可以用來再分化了,就返回佔大多數的類別
  if len(trainDataList[0]) == 0:
    # 返回當前標籤集中佔數目最大的標籤
    return majorClass(trainLabelList)

  # 否則,按式5.10計算A中個特徵值的資訊增益,選擇資訊增益最大的特徵Ag
  Ag,EpsilonGet = calcBestFeature(trainDataList,trainLabelList)

  # 如果Ag的資訊增益比小於閾值Epsilon,則置T為單節點樹,並將D中例項數最大的類Ck
  # 作為該節點的類,返回T
  if EpsilonGet < Epsilon:
    return majorClass(trainLabelList)

  # 否則,對Ag的每一可能值ai,依Ag=ai將D分割為若干非空子集Di,將Di中例項數最大的
  # 類作為標記,構建子節點,由節點及其子節點構成樹T,返回T
  treeDict = {Ag: {}}
  # 特徵值為0時,進入0分支
  # getSubDataArr(trainDataList,trainLabelList,Ag,0):在當前資料集中切割當前feature,返回新的資料集和標籤集
  treeDict[Ag][0] = createTree(getSubDataArr(trainDataList,0))
  treeDict[Ag][1] = createTree(getSubDataArr(trainDataList,1))

  return treeDict


def predict(testDataList,tree):
  '''
  預測標籤
  :param testDataList:樣本
  :param tree: 決策樹
  :return: 預測結果
  '''
  # treeDict = copy.deepcopy(tree)

  # 死迴圈,直到找到一個有效地分類
  while True:
    # 因為有時候當前字典只有一個節點
    # 例如{73: {0: {74:6}}}看起來節點很多,但是對於字典的最頂層來說,只有73一個key,其餘都是value
    # 若還是採用for來讀取的話不太合適,所以使用下行這種方式讀取key和value
    (key,value),= tree.items()
    # 如果當前的value是字典,說明還需要遍歷下去
    if type(tree[key]).__name__ == 'dict':
      # 獲取目前所在節點的feature值,需要在樣本中刪除該feature
      # 因為在建立樹的過程中,feature的索引值永遠是對於當時剩餘的feature來設定的
      # 所以需要不斷地刪除已經用掉的特徵,保證索引相對位置的一致性
      dataVal = testDataList[key]
      del testDataList[key]
      # 將tree更新為其子節點的字典
      tree = value[dataVal]
      # 如果當前節點的子節點的值是int,就直接返回該int值
      # 例如{403: {0: 7,1: {297:7}},dataVal=0
      # 此時上一行tree = value[dataVal],將tree定位到了7,而7不再是一個字典了,
      # 這裡就可以直接返回7了,如果tree = value[1],那就是一個新的子節點,需要繼續遍歷下去
      if type(tree).__name__ == 'int':
        # 返回該節點值,也就是分類值
        return tree
    else:
      # 如果當前value不是字典,那就返回分類值
      return value


def accuracy(testDataList,testLabelList,tree):
  '''
  測試準確率
  :param testDataList:待測試資料集
  :param testLabelList: 待測試標籤集
  :param tree: 訓練集生成的樹
  :return: 準確率
  '''
  # 錯誤次數計數
  errorCnt = 0
  # 遍歷測試集中每一個測試樣本
  for i in range(len(testDataList)):
    # 判斷預測與標籤中結果是否一致
    if testLabelList[i] != predict(testDataList[i],tree):
      errorCnt += 1
  # 返回準確率
  return 1 - errorCnt / len(testDataList)


if __name__ == '__main__':
  # 開始時間
  start = time.time()

  # 獲取訓練集
  trainDataList,trainLabelList = loadData('../Mnist/mnist_train.csv')
  # 獲取測試集
  testDataList,testLabelList = loadData('../Mnist/mnist_test.csv')

  # 建立決策樹
  print('start create tree')
  tree = createTree((trainDataList,trainLabelList))
  print('tree is:',tree)

  # 測試準確率
  print('start test')
  accur = accuracy(testDataList,tree)
  print('the accur is:',accur)

  # 結束時間
  end = time.time()
  print('time span:',end - start)

以上就是python 決策樹演算法的實現的詳細內容,更多關於python 決策樹演算法的資料請關注我們其它相關文章!