機器學習 類條件隨機標籤噪聲情況下的二分類問題研究 復現NIPS論文learning with noisy label(logistic & C-SVM)
阿新 • • 發佈:2020-11-27
論文百度網盤連結:https://pan.baidu.com/s/1XWj1YInZCuKldRpYnZTttg
密碼:u4lb
UCI資料可以在論文中給的一個URL中找到,如找不到也可聯絡QQ1551904915獲取。
對類隨機標籤噪聲下的二分類問題可以通過構造修正損失函式以及設定正負偏置兩種方案解決。
流程:構造二維線性可分資料->UCI高維資料測試->構造二分線性不可分資料->UCI資料測試
import numpy as np import matplotlib.pyplot as plt import scipy.io as scio from sklearn.preprocessing importStandardScaler import math from libsvm.svmutil import * from sklearn.metrics import accuracy_score from sklearn.model_selection import train_test_split class LogisticRegression: def __init__(self, n_iter=200, eta=1e-3, tol=None,rho_positive = 0.2,rho_negative = 0.2): # 訓練迭代次數 self.n_iter = n_iter# 學習率 self.eta = eta # 誤差變化閾值 self.tol = tol # 模型引數w(訓練時初始化) self.w = None # 批量大小 self.batch_size = 0.1 self.rho_positive = rho_positive self.rho_negative = rho_negative def _z(self, X, w): return np.dot(X, w) def_sigmoid(self, z): return 1. / (1. + np.exp(-z)) def _predict_proba(self, X, w): z = self._z(X, w) return self._sigmoid(z) # 標籤噪聲不存在時的梯度 def _loss(self, y, y_proba): ret = 0 for yi, ti in zip(y, y_proba): ret += np.log(1 + np.exp(- yi * ti)) return ret + 1/2*np.dot(self.w,self.w) # 標籤噪聲不存在時的梯度 def _gradient(self, X, y): ret = np.zeros_like(self.w) for xi,yi in zip(X,y): ti = np.dot(self.w.T,xi) ret += -yi*np.exp(-ti*yi)/(1+np.exp(-ti*yi)) * xi return ret + self.w # 存在標籤噪聲情況下的SGD def _gradient_descent_under_noise(self, w, X, y): # 若使用者指定tol, 則啟用早期停止法. if self.tol is not None: loss_old = np.inf self.loss_list = [] self.w = w # 使用梯度下降至多迭代n_iter次, 更新w. for step_i in range(self.n_iter): # 預測所有點為1的概率 y_proba = self._predict_proba(X, self.w) # 計算損失 loss = self._loss_under_noise(y, y_proba) self.loss_list.append(loss) # print('%4i Loss: %s' % (step_i, loss)) # 早期停止法 if self.tol is not None: # 如果損失下降不足閾值, 則終止迭代. if loss_old - loss < self.tol: break loss_old = loss index = np.arange(X.shape[0]) np.random.shuffle(index) selected = index[:math.ceil(self.batch_size * X.shape[0])] # 計算梯度 grad = self._gradient_under_noise(X[selected], y[selected]) # 更新引數w self.w -= self.eta * grad def _gradient_descent(self, w, X, y): # 若使用者指定tol, 則啟用早期停止法. if self.tol is not None: loss_old = np.inf self.loss_list = [] self.w = w # 使用梯度下降至多迭代n_iter次, 更新w. for step_i in range(self.n_iter): # 預測所有點為1的概率 y_proba = self._predict_proba(X, self.w) # 計算損失 loss = self._loss(y, y_proba) self.loss_list.append(loss) print('%4i Loss: %s' % (step_i, loss)) if self.tol is not None: # 如果損失下降不足閾值, 則終止迭代. if loss_old - loss < self.tol: break loss_old = loss index = np.arange(X.shape[0]) np.random.shuffle(index) selected = index[:math.ceil(self.batch_size * X.shape[0])] # 計算梯度 grad = self._gradient(X[selected], y[selected]) # 更新引數w self.w -= self.eta * grad # 資料預處理 def _preprocess_data_X(self, X): m, n = X.shape X_ = np.empty((m, n + 1)) X_[:, 0] = 1 X_[:, 1:] = X return X_ def train(self, X_train, y_train): '''訓練''' # 預處理X_train(新增x0=1) X_train = self._preprocess_data_X(X_train) _, n = X_train.shape self.w = np.random.random(n) * 0.05 self._gradient_descent(self.w, X_train, y_train) def train_from_noise_data(self,X_train,y_train): X_train = self._preprocess_data_X(X_train) _,n = X_train.shape self.w = np.random.random(n) * 0.05 self._gradient_descent_under_noise(self.w,X_train,y_train) # 計算在標籤噪聲下的損失 def _loss_under_noise(self,y,y_proba): ret = 0 for yi, ti in zip(y, y_proba): ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * np.log(1 + np.exp(- yi * ti)) + \ np.where(yi == 1,self.rho_positive,self.rho_negative) * np.log(1 + np.exp(yi * ti)) return (ret + 1 / 2 * np.dot(self.w, self.w)) / (1 - self.rho_negative - self.rho_positive) # return (ret) / (1 - rho_negative - rho_positive) # 在標籤噪聲存在情況下的梯度 def _gradient_under_noise(self,X,y): ret = np.zeros_like(self.w) for xi, yi in zip(X, y): ti = np.dot(self.w.T, xi) ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * (-yi * np.exp(-ti * yi) / (1 + np.exp(-ti * yi)) * xi)\ + np.where(yi == 1,self.rho_positive,self.rho_negative) * (-yi * np.exp(ti * yi) / (1 + np.exp(ti * yi)) * xi) return (ret + self.w) / (1 - self.rho_negative - self.rho_positive) # return (ret) / (1 - rho_negative - rho_positive) def predict(self, X): X = self._preprocess_data_X(X) y_pred = self._predict_proba(X, self.w) return np.where(y_pred >= 0.5, 1, -1) # 生成三角形區域的隨機點集 def random_data(x1,y1,x2,y2,x3,y3): x1, y1 = x1, y1 x3, y3 = x3, y3 x2, y2 = x2, y2 sample_size = 1000 theta = np.arange(0, 1, 0.001) rnd1 = np.random.random(size=sample_size) rnd2 = np.random.random(size=sample_size) rnd2 = np.sqrt(rnd2) x = rnd2 * (rnd1 * x1 + (1 - rnd1) * x2) + (1 - rnd2) * x3 y = rnd2 * (rnd1 * y1 + (1 - rnd1) * y2) + (1 - rnd2) * y3 return x,y def method_1_on_synthetic_data(): x_positive, y_positive = random_data(-100, -100, -100, 100, 100, 100) x_negative, y_negative = random_data(-40, -100, 100, 20, 100, -100) x_data = np.concatenate([x_positive, x_negative]) y_data = np.concatenate([y_positive, y_negative]) label = np.concatenate([np.ones_like(x_positive), np.ones_like(x_negative) * (-1)]) # view data from true distribution plt.scatter(x_data[label == 1], y_data[label == 1], c='', edgecolors='b', facecolor='none', marker='o', s=25, linewidths=1) plt.scatter(x_data[label == -1], y_data[label == -1], c='r', marker='+') plt.axis([-100, 100, -100, 100]) plt.title('initial data') plt.show() print('=' * 10 + '在正確資料下的logistic預測' + '=' * 10) # view regression result from data without noise reg = LogisticRegression(n_iter=100, eta=0.0001, tol=1e-5) # 劃分訓練集以及測試集 X_train, X_test, y_train, y_test = train_test_split(np.column_stack([x_data, y_data]), label, test_size=0.5) reg.train(X_train, y_train) plt.plot(reg.loss_list) plt.title('train loss with SGD') plt.show() y_pred = reg.predict(X_test) accuracy_rate = accuracy_score(y_test, y_pred) print('測試集上的訓練準確度:', accuracy_rate) theta = reg.w print('theta引數為:', theta) print('分割函式:0={0}+({1})*x1+({2})*x2'.format(np.round(theta[0], 3), np.round(theta[1], 3), np.round(theta[2], 3))) rho_positive_list = [0.2,0.3,0.4] rho_negative_list = [0.2,0.1,0.4] print('=' * 10 + '在類標籤噪聲條件下的logistic預測' + '=' * 10) for index in range(3): mask_1 = np.random.uniform(0, 1, size=1000) < rho_positive_list[index] mask_2 = np.random.uniform(0, 1, size=1000) < rho_negative_list[index] label_dirty = np.concatenate([np.where(mask_1, -1, 1), np.where(mask_2, 1, -1)]) # view regression result from data with noise plt.scatter(x_data[label_dirty == 1], y_data[label_dirty == 1], c='', edgecolors='b', facecolor='none', marker='o', s=25, linewidths=1) plt.scatter(x_data[label_dirty == -1], y_data[label_dirty == -1], c='r', marker='+') plt.axis([-100, 100, -100, 100]) plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index],rho_negative_list[index])) plt.show() reg = LogisticRegression(n_iter=1000, eta=0.00001, tol=1e-5, rho_positive=rho_positive_list[index], rho_negative=rho_negative_list[index]) # 劃分訓練集以及測試集 X_train = np.column_stack([x_data, y_data]) reg.train_from_noise_data(X_train, label_dirty) y_pred = reg.predict(X_train) plt.scatter(x_data[y_pred == 1], y_data[y_pred == 1], c='', edgecolors='b', facecolor='none', marker='o', s=25,linewidths=1) plt.scatter(x_data[y_pred == -1], y_data[y_pred == -1], c='r', marker='+') plt.axis([-100, 100, -100, 100]) plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index], rho_negative_list[index])) plt.show() y_pred_under_noise = reg.predict(X_train) accuracy_rate = accuracy_score(label, y_pred_under_noise) print('rho_positive:{0} rho_negative:{1} accuracy:{2}'.format(rho_positive_list[index],rho_negative_list[index],accuracy_rate)) print('='*20) # 模擬模組 def train_mode(data,rho_negative,rho_positive,index): Xdata = data['x'][0][0] label = data['t'][0][0] sampleCapacity,_ = Xdata.shape dirty_label = np.zeros_like(label) # 構造噪聲資料 for i in range(sampleCapacity): if label[i] == 1: dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,-1,1) else: dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,1,-1) eta_list = [0.008,0.002,0.009] model = LogisticRegression(n_iter=1000, eta=eta_list[index], tol=1e-5, rho_positive=rho_positive, rho_negative=rho_negative) index = np.arange(sampleCapacity) np.random.shuffle(index) accuracy_rate = 0 # 3折交叉驗證 for i in range(1, 4): # 構造索引集,將其中五分之一大小的一段作為測試集 selected = np.array([False] * sampleCapacity) selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True X_test = Xdata[index[selected]] X_train = Xdata[index[~selected]] # 對屬性資料進行標準化,消除量綱以及數量級造成的差距 ss = StandardScaler() ss.fit(X_train) X_train_std = ss.transform(X_train) X_test_std = ss.transform(X_test) y_train = dirty_label[index[~selected]] true_test_label = label[index[selected]] model.train_from_noise_data(X_train_std,y_train) pred = model.predict(X_test_std) accuracy_rate += accuracy_score(true_test_label,pred) print("rho_positive={0} rho_negative={1} accuracy:{2}".format(rho_positive,rho_negative,round(accuracy_rate / 3,4))) def train_mode_2(data,rho_negative,rho_positive,index): Xdata = data['x'][0][0] label = data['t'][0][0] label[label == -1] = 0 sampleCapacity, _ = Xdata.shape dirty_label = np.zeros_like(label) # 構造噪聲資料 for i in range(sampleCapacity): if label[i] == 1: dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 0, 1) else: dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 1, 0) index = np.arange(sampleCapacity) np.random.shuffle(index) # 3折交叉驗證 for i in range(1, 4): # 構造索引集,將其中五分之一大小的一段作為測試集 selected = np.array([False] * sampleCapacity) selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True X_test = Xdata[index[selected]] X_train = Xdata[index[~selected]] # 對屬性資料進行標準化,消除量綱以及數量級造成的差距 ss = StandardScaler() ss.fit(X_train) X_train_std = ss.transform(X_train) X_test_std = ss.transform(X_test) y_train = dirty_label[index[~selected]] true_test_label = label[index[selected]] alpha = (1 - rho_positive + rho_negative) / 2 model = svm_train(y_train.reshape(y_train.shape[0],),X_train_std,'-s 0 -t 2 -c 0.01 -w0 {0} -w1 {1}'.format(alpha,1 - alpha)) svm_predict(true_test_label,X_test_std,model) # logistic test on UCI benchmark def method_1_on_UCI_benchmark(): rho_positive_list = [0.2,0.3,0.4] rho_negative_list = [0.2,0.1,0.4] # 載入資料集 benchmarks = scio.loadmat("benchmarks.mat") breastCancer = benchmarks['breast_cancer'] diabetis = benchmarks['diabetis'] thyroid = benchmarks['thyroid'] german = benchmarks['german'] heart = benchmarks['heart'] image = benchmarks['image'] print("test on breastCancer:") for i in range(3): train_mode(data=breastCancer,rho_negative=rho_negative_list[i],rho_positive=rho_positive_list[i],index = i) print("test on diabetis:") for i in range(3): train_mode(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print("test on thyroid:") for i in range(3): train_mode(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print("test on german:") for i in range(3): train_mode(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print("test on heart:") for i in range(3): train_mode(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print("test on image:") for i in range(3): train_mode(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) # C-SVM test on banana data def method_2_on_banana_data(): benchmarks = scio.loadmat("benchmarks.mat") banana = benchmarks['banana'] y_data = banana['t'][0][0].reshape(banana['t'][0][0].shape[0], ) y_data[y_data == -1] = 0 x_data = banana['x'][0][0] plt.scatter(x_data[y_data == 1, 0], x_data[y_data == 1, 1], c='', edgecolors='b', facecolor='none', marker='o', s=25, linewidths=1) plt.scatter(x_data[y_data == 0, 0], x_data[y_data == 0, 1], c='r', marker='+') plt.title('initial image') plt.plot() plt.show() rho_positive_list = [0.2, 0.3, 0.4] rho_negative_list = [0.2, 0.1, 0.4] print('=' * 10 + '在類標籤噪聲條件下的C-SVM對香蕉資料的預測' + '=' * 10) for index in range(3): label_dirty = np.zeros_like(y_data) for i in range(label_dirty.shape[0]): if y_data[i] == 1: label_dirty[i] = np.where(np.random.uniform(0,1) < rho_positive_list[index],0,1) else: label_dirty[i] = np.where(np.random.uniform(0, 1) < rho_negative_list[index], 1, 0) # view regression result from data with noise plt.scatter(x_data[label_dirty == 1,0], x_data[label_dirty == 1,1], c='', edgecolors='b', facecolor='none', marker='o', s=25, linewidths=1) plt.scatter(x_data[label_dirty == 0,0], x_data[label_dirty == 0,1], c='r', marker='+') plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index], rho_negative_list[index])) plt.show() # 劃分訓練集以及測試集 X_train = np.column_stack([x_data, y_data]) alpha = (1 - rho_positive_list[index] + rho_negative_list[index]) / 2 model = svm_train(label_dirty,X_train,'-s 0 -t 2 -c 0.05 -w0 {0} -w1 {1}'.format(alpha,1 - alpha)) # 在真實資料上進心預測 y_pred = svm_predict(y_data,X_train,model)[0] y_pred = np.array(y_pred).astype('int64') # 繪製出預測之後的影象 plt.scatter(x_data[y_pred == 1,0], x_data[y_pred == 1,1], c='', edgecolors='b', facecolor='none', marker='o', s=25, linewidths=1) plt.scatter(x_data[y_pred == 0,0], x_data[y_pred == 0,1], c='r', marker='+') plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index], rho_negative_list[index])) plt.show() print('=' * 20) def method_2_on_UCI_data(): rho_positive_list = [0.2, 0.3, 0.4] rho_negative_list = [0.2, 0.1, 0.4] benchmarks = scio.loadmat("benchmarks.mat") breastCancer = benchmarks['breast_cancer'] diabetis = benchmarks['diabetis'] thyroid = benchmarks['thyroid'] german = benchmarks['german'] heart = benchmarks['heart'] image = benchmarks['image'] print('='*20+"test on breastCancer:"+'='*20) for i in range(3): train_mode_2(data=breastCancer, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print('='*20+"test on diabetis:"+'='*20) for i in range(3): train_mode_2(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print('='*20+"test on thyroid:"+'='*20) for i in range(3): train_mode_2(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print('='*20+"test on german:"+'='*20) for i in range(3): train_mode_2(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print('='*20+"test on heart:"+'='*20) for i in range(3): train_mode_2(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) print('='*20+"test on image:"+'='*20) for i in range(3): train_mode_2(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i) if __name__ == '__main__': # 方法一在問題一的人造資料上進行測試 method_1_on_synthetic_data() # 方法一在UCI資料上進行測試 method_1_on_UCI_benchmark() # 方法二在香蕉資料上的視覺化展示 method_2_on_banana_data() # 方法二在UCI資料上進行測試 method_2_on_UCI_data()
二分線性可分:
原資料:
噪聲概率不同情況下的結果:
測試結果:
==========在正確資料下的logistic預測========== 0 Loss: 655.0571765768963 1 Loss: 523.6231639912271 2 Loss: 508.3195448331075 3 Loss: 506.8197572187573 4 Loss: 506.8141896877468 5 Loss: 506.68658996045286 6 Loss: 507.0420107165859 測試集上的訓練準確度: 0.993 theta引數為: [ 0.00242018 -0.16241579 0.15456798] 分割函式:0=0.002+(-0.162)*x1+(0.155)*x2 ==========在類標籤噪聲條件下的logistic預測========== rho_positive:0.2 rho_negative:0.2 accuracy:0.9955 rho_positive:0.3 rho_negative:0.1 accuracy:0.97 rho_positive:0.4 rho_negative:0.4 accuracy:1.0 ==================== test on breastCancer: rho_positive=0.2 rho_negative=0.2 accuracy:0.6314 rho_positive=0.3 rho_negative=0.1 accuracy:0.5319 rho_positive=0.4 rho_negative=0.4 accuracy:0.6239 test on diabetis: rho_positive=0.2 rho_negative=0.2 accuracy:0.7474 rho_positive=0.3 rho_negative=0.1 accuracy:0.6654 rho_positive=0.4 rho_negative=0.4 accuracy:0.6107 test on thyroid: rho_positive=0.2 rho_negative=0.2 accuracy:0.8184 rho_positive=0.3 rho_negative=0.1 accuracy:0.4797 rho_positive=0.4 rho_negative=0.4 accuracy:0.5445 test on german: rho_positive=0.2 rho_negative=0.2 accuracy:0.696 rho_positive=0.3 rho_negative=0.1 accuracy:0.5609 rho_positive=0.4 rho_negative=0.4 accuracy:0.507 test on heart: rho_positive=0.2 rho_negative=0.2 accuracy:0.8 rho_positive=0.3 rho_negative=0.1 accuracy:0.7667 rho_positive=0.4 rho_negative=0.4 accuracy:0.6704 test on image: rho_positive=0.2 rho_negative=0.2 accuracy:0.7171 rho_positive=0.3 rho_negative=0.1 accuracy:0.6961 rho_positive=0.4 rho_negative=0.4 accuracy:0.6424
二維線性不可分:
原圖:
不同噪聲情況下的分類結果:
高維的C-SVM結果比較長,不展示了。