機器學習之 Logistic 迴歸(邏輯迴歸)
目錄
Logistic迴歸
部落格園地址:https://www.cnblogs.com/chenyoude/
git 地址:https://github.com/nickcyd/machine_learning
微信:a1171958281
程式碼中涉及的數學公式可以自己下載 Typora 這款軟體後,把內容複製到.md檔案內通過 Typora 開啟
Logistic 迴歸
本章內容
- Sigmoid 函式和 Logistic 迴歸分類器
- 最優化理論初步
- 梯度下降最優化演算法
資料中的缺失項處理
迴歸演算法
迴歸演算法:假設現在有一些資料點,我們用一條直線對這些點進行擬合(該線稱為最佳擬合直線),這個擬合過程就稱作迴歸。與分類演算法一樣同屬於監督學習。
Logistic 迴歸的一般過程
- 收集資料:採用任意方法收集資料。
- 準備資料:由於需要進行距離計算,因此要求資料型別為數值型。
- 分析資料:採用任意方法對資料進行分析。
- 訓練演算法:大部分時間講用於訓練,訓練的目的是為了找到最佳的分類迴歸係數。
- 測試演算法:一旦訓練步驟完成,分類將會很快。
使用演算法:基於訓練好的迴歸係數對這些數值進行簡單的迴歸計算,判定他們屬於哪個類別,在此基礎上做一些其他分析工作。
Logistic的優缺點
- 優點:計算代價不高,易於理解和實現。
- 缺點:容易欠擬合,分類精度可能不高。
適用資料型別:數值型和標稱型。
基於 Logistic 迴歸和 Sigmoid 函式的分類
Sigmoid 函式
- 海維賽德階躍函式(單位階躍函式):輸出只有0或1的函式,並且0到1的過程屬於跳躍過程,即非0即1。
- Sigmoid 函式:x=0時,sigmoid 值為0.5;隨著 x 的增大,對應值將逼近1;隨著 x 的減小,對應值將逼近0。
Sigmoid 函式公式:$\sigma(z)={\frac{1}{1+e^{-z}}}$。
Logistic 迴歸分類器
Logistic 迴歸分類器:我們在每個特徵上都乘以一個迴歸係數 之後詳細介紹,然後把所有的結果值相加,將這個總和代入 sigmoid 函式,進而得到一個範圍在0~1之間的數值。大於0.5的資料被分入1類,小於0.5即被歸入0類。
圖5-1 兩種座標尺度下的 Sigmoid 函式圖
通過圖5-1 下面一張圖可以看出,如果橫座標的尺度足夠大,在 x=0出 sigmoid 函式看起來很像階躍函式。
基於最優化方法的最佳迴歸係數確定
- Sigmoid函式的輸入記為 z,可由該公式得出:$z=w_0x_0+w_1x_1+w_2x_2+\cdots+w_nx_n$。
上述公式向量寫法:$z=w^Tx$ 向量 x 是分類器的輸入資料,向量 w 是我們需要找的最佳引數(係數)。
梯度上升法
- 梯度上升法:沿著函式的梯度方向探尋某函式的最大值。即求函式的最大值。
- 如果梯度記為\nebla,則函式$f(x,y)$的梯度公式:$\nabla f(x,y)=\begin{pmatrix} {\frac{\part f(x,y)}{\part x}} \ {\frac{\part f(x,y)}{\part y}} \ \end{pmatrix}$。
- ${\frac{\part f(x,y)}{\part x}}$:沿 x 的方向移動${\frac{\part f(x,y)}{\part x}}$,函式$f(x,y)$必須要在待計算的點上有定義並且可微。
${\frac{\part f(x,y)}{\part y}}$:沿 x 的方向移動${\frac{\part f(x,y)}{\part y}}$,函式$f(x,y)$必須要在待計算的點上有定義並且可微。
圖5-2 梯度上升圖
- 通過圖5-2 可以看出梯度上升演算法到達每個點後都會重新估計移動的方向。
- 梯度上升演算法的迭代公式:$w:=w+\alpha \nabla_wf(w)$,該公式將一直被迭代執行,直至達到某個停止條件為止。
$\alpha$:移動量的大小,稱為步長。
梯度下降演算法
- 梯度下降演算法:沿著函式的梯度方向探尋某函式的最小值。即求函式的最小值。
梯度下降演算法的迭代公式:$w:=w-\alpha \nabla_wf(w)$
訓練演算法:使用梯度上升找到最佳引數
圖5-3 資料集圖
圖5-3中有100個樣本點,每個點包含兩個數值型特徵 X1和X2。
梯度上升演算法的虛擬碼
每個迴歸係數初始化為1
重複 R 次:
計算整個資料集的梯度
使用 alpha*gradient 更新迴歸係數的向量
返回迴歸係數
程式5-1 Logistic 迴歸梯度上升優化演算法
import os
import numpy as np
import matplotlib.pyplot as plt
from path_settings import machine_learning_PATH
data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set')
testSet_path = os.path.join(data_set_path, 'testSet.txt')
horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt')
horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt')
def load_data_set():
"""匯入資料集"""
data_mat = []
label_mat = []
# 迴圈匯入.txt文字資料構造成列表
fr = open(testSet_path)
for line in fr.readlines():
line_arr = line.strip().split()
data_mat.append([1, float(line_arr[0]), float(line_arr[1])])
label_mat.append(int(line_arr[2]))
return data_mat, label_mat
def sigmoid(in_x):
return 1 / (1 + np.exp(-in_x))
def grad_ascent(data_mat_in, class_labels):
# 生成特徵矩陣
data_matrix = np.mat(data_mat_in)
# 生成標記矩陣並反置
label_mat = np.mat(class_labels).transpose()
# 計算data_matrix的行列
m, n = np.shape(data_matrix)
# 設定移動的步長為0.001
alpha = 0.001
# 設定最大遞迴次數500次
max_cycles = 500
# 初始化係數為1*3的元素全為1的矩陣
weights = np.ones((n, 1))
# 迴圈迭代梯度上升演算法
for k in range(max_cycles):
# 計算真實類別與預測類別的差值
h = sigmoid(data_matrix * weights)
error = (label_mat - h)
# 調整迴歸係數
weights = weights + alpha * data_matrix.transpose() * error
return weights
def test_grad_ascent():
data_mat, label_mat = load_data_set()
weights = grad_ascent(data_mat, label_mat)
print(weights)
"""
[[ 4.12414349]
[ 0.48007329]
[-0.6168482 ]]
"""
if __name__ == '__main__':
test_grad_ascent()
分析資料:畫出決策邊界
該節將通過程式碼畫出決策邊界
程式5-2 畫出資料集和 Logistic 迴歸最佳擬合直線的函式
def plot_best_fit(wei):
# getA==np.asarrayz(self)
# 使用__class__.__name__為了判斷是梯度上升和隨機梯度上升
if wei.__class__.__name__ == 'matrix':
weights = wei.getA()
elif wei.__class__.__name__ == 'ndarray':
weights = wei
else:
weights = wei
data_mat, label_mat = load_data_set()
# 把特徵集轉換成陣列
data_arr = np.array(data_mat)
n = np.shape(data_arr)[0]
# 迴圈資料集分類
xcord1 = []
ycord1 = []
xcord2 = []
ycord2 = []
for i in range(n):
if int(label_mat[i]) == 1:
xcord1.append(data_arr[i, 1])
ycord1.append(data_arr[i, 2])
else:
xcord2.append(data_arr[i, 1])
ycord2.append(data_arr[i, 2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
ax.scatter(xcord2, ycord2, s=30, c='green')
# 0.1是步長
x = np.arange(-3, 3, 0.1)
# 假設 sigmoid 函式為0,並且這裡的 x,y 相當於上述的 x1和x2即可得出 y 的公式
y = (-weights[0] - weights[1] * x) / weights[2]
ax.plot(x, y)
plt.xlabel('X1')
plt.ylabel('X2')
plt.show()
def test_plot_best_fit():
data_mat, label_mat = load_data_set()
weights = grad_ascent(data_mat, label_mat)
plot_best_fit(weights)
if __name__ == '__main__':
# test_grad_ascent()
test_plot_best_fit()
圖5-4 梯度上升演算法500次迭代後的結果
通過圖5-4 可以看出我們只分錯了2-4個點。
訓練演算法:隨機梯度上升
- 梯度上升法每次更新迴歸係數時都需要遍歷整個資料集,如果樣本或者特徵數過多就應該考慮使用隨機梯度上升演算法。
隨機梯度上升:一次僅用一個樣本點來更新迴歸係數,不需要重新讀取整個資料集。
隨機梯度上升演算法虛擬碼
所有迴歸係數初始化為1
對資料集中每個樣本
計算該樣本的梯度
使用 alpha*gradient 更新迴歸係數值
返回迴歸係數值
程式5-3 隨機梯度上升演算法
def stoc_grad_ascent0(data_matrix, class_labels):
"""隨機梯度上升演算法"""
m, n = np.shape(data_matrix)
alpha = 0.01
weights = np.ones(n)
for i in range(m):
# 使用 sum 函式得出一個值,只用計算一次
h = sigmoid(sum(data_matrix[i] * weights))
error = class_labels[i] - h
weights = weights + alpha * error * data_matrix[i]
return weights
def test_stoc_grad_ascent0():
data_arr, label_mat = load_data_set()
weights = stoc_grad_ascent0(np.array(data_arr), label_mat)
plot_best_fit(weights)
if __name__ == '__main__':
# test_grad_ascent()
# test_plot_best_fit()
test_stoc_grad_ascent0()
- 梯度上升和隨機梯度上升:從程式碼中我們可以看到前者變數 h 和誤差 error 都是向量,而後者全是數值;前者是矩陣轉換,後者則是 numpy 陣列。
圖5-5 隨機梯度上升演算法圖
圖5-5可以看出隨機梯度上升演算法的最佳擬合直線並非最佳分類線
程式5-4 改進的隨機梯度上升演算法
def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150):
"""改進隨機梯度上升演算法,預設迭代150次"""
m, n = np.shape(data_matrix)
weights = np.ones(n)
for j in range(num_iter):
data_index = list(range(m))
for i in range(m):
# 每次迭代減小 alpha 的值,但最小為0.01,確保新資料依然有影響。緩解係數波動的情況
alpha = 4 / (1 + j + i) + 0.01
# 隨機選取值進行更新
rand_index = int(np.random.uniform(0, len(data_index)))
h = sigmoid(sum(data_matrix[rand_index] * weights))
error = class_labels[rand_index] - h
weights = weights + alpha * error * data_matrix[rand_index]
# 刪除更新後的值
del (data_index[rand_index])
return weights
def test_stoc_grad_ascent1():
data_arr, label_mat = load_data_set()
weights = stoc_grad_ascent1(np.array(data_arr), label_mat)
plot_best_fit(weights)
if __name__ == '__main__':
# test_grad_ascent()
# test_plot_best_fit()
# test_stoc_grad_ascent0()
test_stoc_grad_ascent1()
圖5-6 改進隨機梯度上升演算法圖
圖5-6可以看出150次的跌打就能得到一條很好的分類線,而梯度上升演算法需要迭代500次。
示例:從疝氣病預測病馬的死亡率
- 疝氣病:描述馬胃腸痛的術語
資料集中包含368個樣本和28個特徵,並且有30%的值是缺失的
示例:使用 Logistic 迴歸估計馬疝病的死亡率
- 收集資料:給定資料檔案
- 準備資料:用 Python 解析文字檔案並填充缺失值
- 分析資料:視覺化並觀察資料
- 訓練演算法:使用優化演算法,找到最佳的係數
- 測試演算法:觀察錯誤率,根據錯誤率決定是否會退到訓練階段;改變迭代的次數和步長等引數來得到更好的迴歸係數
使用演算法:實現一個簡單的程式來手機馬的症狀並輸出預測結果
準備資料:處理資料中的缺失值
- 資料的獲取是相當昂貴的,扔掉和重新獲取都是不可取的
- 以下幾種方法可以解決資料的缺失的問題
- 使用可用特徵的均值來填補缺失值
- 使用特殊值來填補缺失值
- 忽略有缺失值的樣本
- 使用相似樣本的均值填補缺失值
- 使用另外的機器學習演算法預測缺失值
- 預處理第一件事:用0替代所有的缺失值,因為缺失值為0時迴歸係數的更新公式不會更新並且 sigmoid(0)=0.5,他對結果的預測不具有任何傾向性
- 預處理第二件事:對於資料標記缺失的資料捨棄,因為標記很難確定採用某個合適的值來替換。
預處理後的檔案:對於原始資料檔案可以去 http://archive.ics.uci.edu/ml/datasets/Horse+Colic 獲取,此處只提供預處理之後的檔案
測試演算法:用 Logistic 迴歸進行分類
def classify_vector(in_x, weights):
prob = sigmoid(sum(in_x * weights))
if prob > 0.5:
return 1
else:
return 0
def colic_test():
"""馬疝病造成馬死亡概率預測"""
fr_train = open(horseColicTraining_path)
fr_test = open(horseColicTest_path)
training_set = []
training_labels = []
for line in fr_train.readlines():
# 切分所有特徵並把特徵加入 line_arr 列表中
curr_line = line.strip().split('\t') # type:list
line_arr = []
for i in range(21):
line_arr.append(float(curr_line[i]))
# 分開處理特徵和標記
training_set.append(line_arr)
training_labels.append(float(curr_line[21]))
train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500)
print(train_weights)
error_count = 0
num_test_vec = 0
for line in fr_test.readlines():
num_test_vec += 1
curr_line = line.strip().split('\t') # type:list
line_arr = []
for i in range(21):
line_arr.append(float(curr_line[i]))
# 通過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函式得到的標記判斷是否預測失誤
if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]):
error_count += 1
error_rate = (float(error_count) / num_test_vec)
print('測試集的錯誤率: {}'.format(error_rate))
# 測試集的錯誤率: 0.373134328358209
return error_rate
def multi_test():
num_tests = 10
error_sum = 0
for k in range(num_tests):
error_sum += colic_test()
print('迭代 {} 次後平均錯誤率為: {}'.format(num_tests, error_sum / float(num_tests)))
# 迭代 10 次後平均錯誤率為: 0.3656716417910448
if __name__ == '__main__':
# test_grad_ascent()
# test_plot_best_fit()
# test_stoc_grad_ascent0()
# test_stoc_grad_ascent1()
multi_test()
完整程式碼logRegres.py
import os
import numpy as np
import matplotlib.pyplot as plt
from path_settings import machine_learning_PATH
data_set_path = os.path.join(machine_learning_PATH, '第五章/data-set')
testSet_path = os.path.join(data_set_path, 'testSet.txt')
horseColicTraining_path = os.path.join(data_set_path, 'horseColicTraining.txt')
horseColicTest_path = os.path.join(data_set_path, 'horseColicTest.txt')
def load_data_set():
"""匯入資料集"""
data_mat = []
label_mat = []
# 迴圈匯入.txt文字資料構造成列表
fr = open(testSet_path)
for line in fr.readlines():
line_arr = line.strip().split()
data_mat.append([1, float(line_arr[0]), float(line_arr[1])])
label_mat.append(int(line_arr[2]))
return data_mat, label_mat
def sigmoid(in_x):
"""構造 sigmoid 函式"""
return 1 / (1 + np.exp(-in_x))
def grad_ascent(data_mat_in, class_labels):
"""梯度上升演算法"""
# 生成特徵矩陣
data_matrix = np.mat(data_mat_in)
# 生成標記矩陣並反置
label_mat = np.mat(class_labels).transpose()
# 計算data_matrix的行列
m, n = np.shape(data_matrix)
# 設定移動的步長為0.001
alpha = 0.001
# 設定最大遞迴次數500次
max_cycles = 500
# 初始化係數為1*3的元素全為1的矩陣
weights = np.ones((n, 1))
# 迴圈迭代梯度上升演算法
for k in range(max_cycles):
# 計算真實類別與預測類別的差值
h = sigmoid(data_matrix * weights)
error = (label_mat - h)
# 調整迴歸係數
weights = weights + alpha * data_matrix.transpose() * error
return weights
def test_grad_ascent():
data_mat, label_mat = load_data_set()
weights = grad_ascent(data_mat, label_mat)
print(weights)
"""
[[ 4.12414349]
[ 0.48007329]
[-0.6168482 ]]
"""
def plot_best_fit(wei):
"""畫出被分割的資料集"""
# getA==np.asarrayz(self)
# 使用__class__.__name__為了判斷是梯度上升和隨機梯度上升
if wei.__class__.__name__ == 'matrix':
weights = wei.getA()
elif wei.__class__.__name__ == 'ndarray':
weights = wei
else:
weights = wei
data_mat, label_mat = load_data_set()
# 把特徵集轉換成陣列
data_arr = np.array(data_mat)
n = np.shape(data_arr)[0]
# 迴圈資料集分類
xcord1 = []
ycord1 = []
xcord2 = []
ycord2 = []
for i in range(n):
if int(label_mat[i]) == 1:
xcord1.append(data_arr[i, 1])
ycord1.append(data_arr[i, 2])
else:
xcord2.append(data_arr[i, 1])
ycord2.append(data_arr[i, 2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
ax.scatter(xcord2, ycord2, s=30, c='green')
# 0.1是步長
x = np.arange(-3, 3, 0.1)
# 假設 sigmoid 函式為0,並且這裡的 x,y 相當於上述的 x1和x2即可得出 y 的公式
y = (-weights[0] - weights[1] * x) / weights[2]
ax.plot(x, y)
plt.xlabel('X1')
plt.ylabel('X2')
plt.show()
def test_plot_best_fit():
data_mat, label_mat = load_data_set()
weights = grad_ascent(data_mat, label_mat)
plot_best_fit(weights)
def stoc_grad_ascent0(data_matrix, class_labels):
"""隨機梯度上升演算法"""
m, n = np.shape(data_matrix)
alpha = 0.01
weights = np.ones(n)
for i in range(m):
# 使用 sum 函式得出一個值,只用計算一次
h = sigmoid(sum(data_matrix[i] * weights))
error = class_labels[i] - h
weights = weights + alpha * error * data_matrix[i]
return weights
def test_stoc_grad_ascent0():
data_arr, label_mat = load_data_set()
weights = stoc_grad_ascent0(np.array(data_arr), label_mat)
plot_best_fit(weights)
def stoc_grad_ascent1(data_matrix, class_labels, num_iter=150):
"""改進隨機梯度上升演算法,預設迭代150次"""
m, n = np.shape(data_matrix)
weights = np.ones(n)
for j in range(num_iter):
data_index = list(range(m))
for i in range(m):
# 每次迭代減小 alpha 的值,但最小為0.01,確保新資料依然有影響。緩解係數波動的情況
alpha = 4 / (1 + j + i) + 0.01
# 隨機選取值進行更新
rand_index = int(np.random.uniform(0, len(data_index)))
h = sigmoid(sum(data_matrix[rand_index] * weights))
error = class_labels[rand_index] - h
weights = weights + alpha * error * data_matrix[rand_index]
# 刪除更新後的值
del (data_index[rand_index])
return weights
def test_stoc_grad_ascent1():
data_arr, label_mat = load_data_set()
weights = stoc_grad_ascent1(np.array(data_arr), label_mat)
plot_best_fit(weights)
def classify_vector(in_x, weights):
prob = sigmoid(sum(in_x * weights))
if prob > 0.5:
return 1
else:
return 0
def colic_test():
"""馬疝病造成馬死亡概率預測"""
fr_train = open(horseColicTraining_path)
fr_test = open(horseColicTest_path)
training_set = []
training_labels = []
for line in fr_train.readlines():
# 切分所有特徵並把特徵加入 line_arr 列表中
curr_line = line.strip().split('\t') # type:list
line_arr = []
for i in range(21):
line_arr.append(float(curr_line[i]))
# 分開處理特徵和標記
training_set.append(line_arr)
training_labels.append(float(curr_line[21]))
train_weights = stoc_grad_ascent1(np.array(training_set), training_labels, 500)
print(train_weights)
error_count = 0
num_test_vec = 0
for line in fr_test.readlines():
num_test_vec += 1
curr_line = line.strip().split('\t') # type:list
line_arr = []
for i in range(21):
line_arr.append(float(curr_line[i]))
# 通過比較樣本標記與輸入係數與特徵相乘值 sigmoid 函式得到的標記判斷是否預測失誤
if int(classify_vector(np.array(line_arr), train_weights)) != int(curr_line[21]):
error_count += 1
error_rate = (float(error_count) / num_test_vec)
print('測試集的錯誤率: {}'.format(error_rate))
# 測試集的錯誤率: 0.373134328358209
return error_rate
def multi_test():
num_tests = 10
error_sum = 0
for k in range(num_tests):
error_sum += colic_test()
print('迭代 {} 次後平均錯誤率為: {}'.format(num_tests, error_sum / float(num_tests)))
# 迭代 10 次後平均錯誤率為: 0.3656716417910448
if __name__ == '__main__':
# test_grad_ascent()
# test_plot_best_fit()
# test_stoc_grad_ascent0()
# test_stoc_grad_ascent1()
multi_test()
總結
- Logistic 迴歸:尋找一個非線性函式 Sigmoid 的最佳擬合引數。
- 求解過程:通過最優化演算法(常用的梯度上升演算法),通過簡化梯度上升演算法得到隨機梯度上升演算法
對缺失資料的處理:機器學習中最後只能更要的問題之一,主要還是取決於實際應用中的需求。
支援向量機 coding……
==尊重原創==
==可以伸出你的小手點個關注,謝謝!==
部落格園地址:https://www.cnblogs.com/chenyoude/
git 地址:https://github.com/nickcyd/machine_learning
微信:a1171958281