最優分箱--卡方分箱Chi-Merge
阿新 • • 發佈:2019-02-19
卡方分箱是依賴於卡方檢驗的分箱方法,在統計指標上選擇卡方統計量(chi-Square)進行判別,分箱的基本思想是判斷相鄰的兩個區間是否有分佈差異,基於卡方統計量的結果進行自下而上的合併,直到滿足分箱的限制條件為止。
卡方分箱的實現步驟:
卡方統計量衡量了區間內樣本的頻數分佈與整體樣本的頻數分佈的差異性,在做分箱處理時可以使用兩種限制條件:
(1)分箱個數:限制最終的分箱個數結果,每次將樣本中具有最小卡方值的區間與相鄰的最小卡方區間進行合併,直到分箱個數達到限制條件為止。
(2)卡方閾值:根據自由度和顯著性水平得到對應的卡方閾值,如果分箱的各區間最小卡方值小於卡方閾值,則繼續合併,直到最小卡方值超過設定閾值為止。
在鋪上程式碼前再補充兩點,
1、由於卡方分箱是思想是相鄰區間合併,在初始化時對變數屬性需先進行排序,要注意名義變數的排序順序
2、卡方閾值的自由度為 分箱數-1,顯著性水平可以取10%,5%或1%
python程式碼實現:舉例:卡方對二分類問題進行變數分箱
import pandas as pd
import numpy as np
from scipy.stats import chi2
def calc_chiSquare(sampleSet): ''' 計算卡方統計量 ''' # 計算樣本期望頻率 target_cnt = sampleSet['target'].sum() sample_cnt = sampleSet['target'].count() expected_ratio = target_cnt * 1.0/sample_cnt # 對變數按屬性值從大到小排序 df = sampleSet[['var', 'target']] col_value = list(set(df['var'])) col_value.sort() # 對變數區間進行遍歷,計算每一個區間對應的卡方統計量 chi_list = []; target_list = []; expected_target_list = [] for value in col_value: df_target_cnt = df.loc[df['var'] == value,'target'].sum() df_cnt = df.loc[df['var'] == value,'target'].count() expected_target_cnt = df_cnt * expected_ratio chi_square = (df_target_cnt - expected_target_cnt)**2 / expected_target_cnt chi_list.append(chi_square) target_list.append(df_target_cnt) expected_target_list.append(expected_target_cnt) # 匯出結果到dataframe chi_result = pd.DataFrame({'var':col_value, 'chi_square':chi_list, 'target_cnt':target_list, 'expected_target_cnt':expected_target_list}) return chi_result
var 表示需要分箱的變數,函式返回卡方統計結果,包括樣本例項區間,卡方統計量,響應頻率和期望響應頻率。
def chiMerge_maxInterval(chi_result, maxInterval=5): ''' 卡方分箱合併--最大區間限制法 ''' group_cnt = len(chi_result) # 如果變數區間超過最大分箱限制,則根據合併原則進行合併 while(group_cnt > maxInterval): min_index = chi_result[chi_result['chi_square']==chi_result['chi_square'].min()].index.tolist()[0] # 如果分箱區間在最前,則向下合併 if min_index == 0: chi_result = merge_chiSquare(chi_result, min_index+1, min_index) # 如果分箱區間在最後,則向上合併 elif min_index == group_cnt-1: chi_result = merge_chiSquare(chi_result, min_index-1, min_index) # 如果分箱區間在中間,則判斷與其相鄰的最小卡方的區間,然後進行合併 else: if chi_result.loc[min_index-1, 'chi_square'] > chi_result.loc[min_index+1, 'chi_square']: chi_result = merge_chiSquare(chi_result, min_index, min_index+1) else: chi_result = merge_chiSquare(chi_result, min_index-1, min_index) group_cnt = len(chi_result) return chi_result def chiMerge_minChiSquare(chi_result, maxInterval=5): ''' 卡方分箱合併--卡方閾值法 ''' threshold = get_chiSquare_distribution(4, 0.1) min_chiSquare = chi_result['chi_square'].min() group_cnt = len(chi_result) # 如果變數區間的最小卡方值小於閾值,則繼續合併直到最小值大於等於閾值 while(min_chiSquare < threshold and group_cnt > 6): print(chi_result) min_index = chi_result[chi_result['chi_square']==chi_result['chi_square'].min()].index.tolist()[0] # 如果分箱區間在最前,則向下合併 if min_index == 0: chi_result = merge_chiSquare(chi_result, min_index+1, min_index) # 如果分箱區間在最後,則向上合併 elif min_index == group_cnt-1: chi_result = merge_chiSquare(chi_result, min_index-1, min_index) # 如果分箱區間在中間,則判斷與其相鄰的最小卡方的區間,然後進行合併 else: if chi_result.loc[min_index-1, 'chi_square'] > chi_result.loc[min_index+1, 'chi_square']: chi_result = merge_chiSquare(chi_result, min_index, min_index+1) else: chi_result = merge_chiSquare(chi_result, min_index-1, min_index) min_chiSquare = chi_result['chi_square'].min() group_cnt = len(chi_result) return chi_result
分箱主體部分包括兩種分箱方法的主體函式,其中merge_chiSquare()是對區間進行合併,get_chiSquare_distribution()是根據自由度和置信度得到卡方閾值。我在這裡設定的是自由度為4,置信度為10%。兩個自定義函式如下
def get_chiSquare_distuibution(dfree=4, cf=0.1):
'''
根據自由度和置信度得到卡方分佈和閾值
dfree:自由度,分類類別-1,預設為4
cf:顯著性水平,預設10%
'''
percents = [ 0.95, 0.90, 0.5,0.1, 0.05, 0.025, 0.01, 0.005]
df = pd.DataFrame(np.array([chi2.isf(percents, df=i) for i in range(1, 30)]))
df.columns = percents
df.index = df.index+1
# 顯示小數點後面數字
pd.set_option('precision', 3)
return df.loc[dfree, cf]
def merge_chiSquare(chi_result, index, mergeIndex, a = 'expected_target_cnt',
b = 'target_cnt', c = 'chi_square'):
'''
按index進行合併,並計算合併後的卡方值
index: 合併後的序列號
mergeIndex: 需合併的區間序號
'''
chi_result.loc[mergeIndex, a] = chi_result.loc[mergeIndex, a] + chi_result.loc[index, a]
chi_result.loc[mergeIndex, b] = chi_result.loc[mergeIndex, b] + chi_result.loc[index, b]
chi_result.loc[mergeIndex, c] = (chi_result.loc[mergeIndex, b] - chi_result.loc[mergeIndex, a])**2 /chi_result.loc[mergeIndex, a]
chi_result = chi_result.drop([index])
chi_result = chi_result.reset_index(drop=True)
return chi_result