機器學習入門-邏輯迴歸演算法
梯度下降: 對theta1, theta2, theta3 分別求最快梯度下降的方向,然後根據給定的學習率,進行theta1, theta2, theta3的引數跟新
假定目標函式 J(theta) = 1/2m * np.sum(h(theta) - y)^2 / len(X)
梯度下降的策略分為3種,
批量梯度下降: 每次迭代輸入全部的資料, 效果好,但耗時
隨機梯度下降: 每次輸入一個樣本,時間快,迭代效果差
小批量梯度下降:每次輸入部分資料,效果好,時間適中,一般都是16, 32, 64
邏輯迴歸: 是一種典型的二分類,也可以是多分類,主要在於cost的定義
邏輯迴歸的概率似然函式: h(theta)**y * (1-h(theta)) ** (1-y)
邏輯迴歸的對數似然函式 l(theta) = 1/ m * np.sum(y*logh(theta) - (1-y)*log(1-h(theta))) # 及損失函式
依據theta對損失函式進行求導,求出梯度下降的方向,用於跟新引數
grad = 1/m np.sum(h(theta) - y) * xj xj表示的是一列特徵
theta = theta - grad
接下來進行程式碼分析
需要完成的函式
主要函式
sigmoid #將數值對映為概率
model # 構造h(theta) 即 sigmoid(np.dot(X, theta.T))
cost # 計算損失值及對數似然函式 1/ m * np.sum(-y*logh(theta) - (1-y)*log(1-h(theta)))
gradient # 用於計算梯度 grad = 1/m np.sum(h(theta) - y) * xj
descent # 用於進行引數更新
runExpe # 進行畫圖操作
predict # 進行結果預測
次要函式
shuffledata # 用於進行資料清洗
StopCriter # 停止情況判斷
程式碼:
import numpy as np import pandas as pd import time import matplotlib.pyplot as plt pdData = pd.read_csv('data/LogiReg_data.txt', header=None, names=['exam1', 'exam2', 'admitted']) # 插入一列全1的資料, 為了和theta0進行匹配 pdData.insert(0, 'ones', 1) # 將資料轉換的為numpy形式 orig_data = pdData.as_matrix() # 獲得列的維度 cols = orig_data.shape[1] # 分出樣本 X = orig_data[:, :cols-1] # 分出標籤 y = orig_data[:, cols-1:] # 初始化theta theta = np.zeros([1, 3]) # 定義sigmoid函式 def sigmoid(z): return (1 / (1 + np.exp(-z))) # 定義H(theta) def model(X, theta): return sigmoid(np.dot(X, theta.T)) # 定義損失函式即對數似然函式 1/ m * np.sum(-y*logh(theta) - (1-y)*log(1-h(theta))) def cost(X, y, theta): left = np.multiply(-y, np.log(model(X, theta))) right = np.multiply((1-y), np.log(1-model(X, theta))) return np.sum(left - right) / len(X) # 定義資料洗牌的函式 def shuffle_data(data): # 進行資料洗牌 np.random.shuffle(data) # 分離出X和y cols = data.shape[1] X = data[:, :cols-1] y = data[:, cols-1:] return X, y # 定義停止條件的函式 Stop_iter = 0 Stop_cost = 1 Stop_grad = 2 def StopCriter(Stop_name, value, threshold): # 如果迭代條件是迭代次數,返回迭代比較的結果,真或者假 if Stop_name == Stop_iter: return value > threshold # 如果迭代條件是損失值,返回最後兩個損失值之差,如果低於閾值,返回為真 elif Stop_name == Stop_cost: return value[-2] - value[-1] < threshold # 如果迭代條件是梯度下降的方向向量,返回的是梯度下降方向向量的模,如果低於閾值,則返回為真 elif Stop_name == Stop_grad: return np.linalg.norm(value) < threshold # 用於計算梯度下降方向的向量 grad = 1/m np.sum(h(theta) - y) * xj def gradient(X, y, theta): # 初始化梯度值 grad = np.zeros_like(theta) # 計算誤差 ravel()函式將(100, 1)轉換為(100, ) error = (model(X, theta) - y).ravel() # 計算每一個方向上的梯度方向 for j in range(X.shape[1]): term = np.multiply(error, X[:, j]) grad[0, j] = np.sum(term) / len(X) return grad # 在梯度方向上進行theta的引數更新 def descent(data, theta, batchsize, Stop_name, threshold, alpha): # 資料進行洗牌 X, y = shuffle_data(data) k = 0 # 獲得損失值函式 costs = [cost(X, y, theta)] # 迭代次數 i = 0 # 初始時間 init_time = time.time() # 迴圈 while True: # 獲得batchsize的樣本 batch_x, batch_y = X[k:k+batchsize], y[k:k+batchsize] # 更新k k = k + batchsize # 如果k大於樣本數,置0,重新獲得洗牌後的X和y if k >= X.shape[0]: k = 0 X, y = shuffle_data(data) # 計算梯度方向 grad = gradient(batch_x, batch_y, theta) # 更新引數 theta = theta - alpha * grad # 重新計算損失值 costs.append(cost(X, y, theta)) i = i + 1 # 根據迭代的條件獲得當前的value值 if Stop_name == Stop_iter:value = i elif Stop_name == Stop_cost: value=costs elif Stop_name == Stop_grad: value=grad # 將value值輸入,與閾值進行條件比較,滿足即跳出迴圈 if StopCriter(Stop_name, value, threshold): break # 返回 return data, theta, i, batchsize, Stop_name, threshold, alpha, time.time() - init_time, costs # 進行畫圖操作 def runExpe(data, theta, batchsize, Stop_name, threshold, alpha): data, theta, i, batchsize, Stop_name, threshold, alpha, dur, costs = descent(data, theta, batchsize, Stop_name, threshold, alpha) name = "Original" if (data[:, 1] > 2).sum() > 1 else "Scaled" name += " data - learning rate: {} - ".format(alpha) if batchsize == n: strDescType = "Gradient" elif batchsize == 1: strDescType = "Stochastic" else: strDescType = "Mini-batch ({})".format(batchsize) name += strDescType + " descent - Stop: " if Stop_name == Stop_iter: strStop = "{} iterations".format(threshold) elif Stop_name == Stop_cost: strStop = "costs change < {}".format(threshold) else: strStop = "gradient norm < {}".format(threshold) name += strStop print("***{}\nTheta: {} - Iter: {} - Last cost: {:03.2f} - Duration: {:03.2f}s".format( name, theta, iter, costs[-1], dur)) fig, ax = plt.subplots(figsize=(12, 4)) ax.plot(np.arange(len(costs)), costs, 'r') ax.set_xlabel('Iterations') ax.set_ylabel('Cost') ax.set_title(name.upper() + ' - Error vs. Iteration') return theta # 預測函式 def predict(X, theta): # 代入h(theta) 即model中進行樣本預測 pre_y = model(X, theta) # 概率大於0.5的,輸出為1, 小於0.5的輸出為0 pre_y[pre_y >= 0.5] = 1 pre_y[pre_y < 0.5] = 0 # 返回預測結果的向量 return pre_y # 表示樣本的總個數 n = 100 # 獲得迭代好以後的theta theta = runExpe(orig_data, theta, 100, Stop_grad, 0.05, alpha=0.001) # 進行資料歸一化操作 import sklearn.preprocessing as pp scale_data = orig_data.copy() # 對第二列和第三列資料進行歸一化操作 scale_data[:, 1:3] = pp.scale(scale_data[:, 1:3]) # 獲得預測結果的向量 pre_y = predict(X, theta) # 將預測結果與真實結果進行比較,返回0和1的陣列,正確是1,錯誤是0 correct_array = np.array(pre_y == y, dtype=int) # 準確率就是計算正確和錯誤的平均值 accurracy = correct_array.mean() print(accurracy)
迭代次數與損失值cost的作圖