支持向量機手把手實戰(進階版本)
阿新 • • 發佈:2018-04-04
HR 程序 不為 strong 間隔 val creat 數組 wid
概要
先前我們實現了基礎版本的 SVM,現在我們來實現進階版本。和上次比,這次優化的地方在於:
- 啟發式選擇參數 alpha(訓練速度更快)。通過一個外循環來選擇第一個 alpha 值,並且其選擇過程中會在兩種方式間進行交替:一種方式是在所有數據集上進行單遍掃描,另一種方式則是在非邊界(不等於 0 或 C) alpha 中實現單遍掃描。在選擇第一個 alpha 值後,算法會通過一個內循環來選擇第二個 alpha 值。在優化過程中,會通過最大化步長的方式來獲得第二個 alpha 值(Ei-Ej 最大)。
- 引入核函數解決線性不可分的問題。
?
完整版本的支持向量機
數據集
?
同樣選擇機器學習實戰中的數據集,可視化如下:
我們使用的是高斯核函數:
\begin{align}
k(x, y) = \mathrm{exp} \left( \frac{-\lVert x-y\rVert ^2}{2\sigma^2} \right) \notag
\end{align}
其中 \(\sigma>0\) 是用戶定義的用於確定到達率或者說函數值跌落到 \(0\) 的速度參數。該函數將數據從其特征空間映射到更高維的空間,具體來說這裏是映射到一個無窮維的空間。
如果你成功運行了程序,畫出的圖像大概長這樣:
由於一定的隨機性,每次畫出的圖不一樣,我選了一張貌似還行的,有的真慘不忍睹,顯然算法還有一定的上升空間。
?
代碼
?
每一步我已經標的很清楚了,不用再啰嗦了,貼代碼。
# -*- coding: utf-8 -*-
"""
Created on Mon Apr 2 20:20:20 2018
@author: zhoukui
"""
import numpy as np
import random
import matplotlib.pyplot as plt
def loadData(filename):
dataMat = []
yMat = []
fr = open(filename)
for line in fr.readlines():
line = line.strip().split('\t')
dataMat.append([float(line[0]),float(line[1])])
yMat.append(float(line[2]))
return np.array(dataMat), np.array(yMat) # 大小 (100, 2) , (100,)
def showData(filename, line = None):
dataArr, yArr = loadData(filename)
data_class_1_index = np.where(yArr == -1)
data_class_1 = dataArr[data_class_1_index,:].reshape(-1,2)
data_class_2_index = np.where(yArr == 1)
data_class_2 = dataArr[data_class_2_index,:].reshape(-1,2)
fig = plt.figure()
ax = fig.add_subplot(1,1,1)
ax.scatter(data_class_1[:,0], data_class_1[:,1], c = 'r', label = "$-1$")
ax.scatter(data_class_2[:,0], data_class_2[:,1], c = 'g', label = "$+1$")
plt.legend()
if line is not None:
b, alphas, kTup = line
#==============================================================================
# for i in range(dataArr.shape[0]): # 畫支持向量
# wx = np.sum(alphas*yArr[:,np.newaxis]*kernelTrans(dataArr, dataArr[i,:], kTup)[:,np.newaxis], axis=0)
# y = wx + b
# if (abs(y-1) < 0.1): # +1 類
# ax.scatter(dataArr[i,0], dataArr[i,1], marker='o', s = 90, c = 'black')
# if (abs(y+1) < 0.1): # -1 類
# ax.scatter(dataArr[i,0], dataArr[i,1], marker='o', s = 90, c = 'cyan')
#==============================================================================
# 我想畫分界線
points = []
for j in np.arange(0, 0.75, 0.01):
for i in np.arange(-0.75, 0.75, 0.01):
wx = np.sum(alphas*yArr[:,np.newaxis]*kernelTrans(dataArr, np.array([i,j]), kTup)[:,np.newaxis], axis=0)
y = wx + b
if(abs(y-1) < 0.01):
points.append([i,j])
points = np.array(points)
points = points[np.lexsort(points[:,::-1].T)] # 按第一列排序
#print(points)
ax.plot(points[:,0], points[:,1], lw = 5.0 , c = 'black') # 第一二象限
#ax.plot(-points[:,0], points[:,1], c = 'black') # 第二象限
#ax.plot(-points[:,0], -points[:,1], c = 'black') # 第三象限
ax.plot(points[:,0], -points[:,1], lw = 5.0, c = 'black') # 第四象限
plt.show()
def kernelTrans(X, A, kTup):
# 把原先的向量的內積運算,改成核函數運算
# X: 就是樣本,shape= (m,n) A: 向量 shape (n,)
# 這裏我們封裝了兩種情況:線性核 'lin'、高斯核 'rbf'
m, n = X.shape
K = np.zeros((m,1))
if kTup[0] == 'lin':
K = np.sum(X*A, axis = 1)
elif kTup[0] == 'rbf':
K = np.sum((X-A)**2, axis = 1)
K = np.exp(K / (-1.*kTup[1]**2)) # kTup[1] 就是參數 sigma
else:
raise NameError('Houston We Have a Problem -- That Kernel is not recognized')
return K
# 構造一個數據結構,方便
class optStruct:
def __init__(self, dataArr, yArr, C, toler, kTup):
self.X = dataArr
self.Y = yArr # 標簽
self.C = C
self.tol = toler
self.m = dataArr.shape[0]
self.alphas = np.zeros((self.m, 1))
self.b = 0
# 緩存誤差,節省後續的計算時間。第一列給出的是 eCache 是否有效的標誌位,值為 0 或 1
# 第二列給出的是實際的 E 值
self.eCache = np.zeros((self.m, 2))
# 構造數組 K, 存儲訓練過程中所有可能用到的核函數運算,這樣後邊直接調用就行了
self.K = np.zeros((self.m, self.m))
for i in range(self.m):
self.K[:,i] = kernelTrans(self.X, self.X[i,:], kTup)
# 計算 fXi 那一個公式,因為後邊頻繁使用,這裏寫成函數了
def calcEk(oS, k):
fXk = np.sum(oS.alphas*oS.Y[:,np.newaxis]*oS.K[:,k][:,np.newaxis]) + oS.b
Ek = fXk - oS.Y[k]
return Ek
def selectJrand(i,m):
j=i #we want to select any J not equal to i
while (j==i):
j = int(random.uniform(0,m))
return j
# 內循環的啟發式選擇方法:依據最大步長確定 alpha
def selectJ(i, oS, Ei):
"""
i :先前確定的第一個 alpha 值下標
Ei : 誤差嘍
我們要找到一個 j,使得 abs(Ei-Ej) 最大,並返回 j, Ej
"""
maxK = -1 # 這是我們想要的 alpha 下標,先設一個初始值
maxDeltaE = 0 # 差值,先初始化為 0
Ej = 0
# 找到可取的 alpha 值下標列表。這裏的“可取”是說明先前的已經計算好了,E 值非 0
# 如果 E 值等於零,那就沒必要篩選了,隨機選擇一個就好,也就是 else 裏做的事情
validEcacheList = list(np.nonzero(oS.eCache[:,0])[0]) # 返回不為零的下標,元組形式,所以加[0]得到數組
if (len(validEcacheList) > 1): # 多於一個,選擇具有最大的步長
for k in validEcacheList:
if k == i: continue # 當然要選擇與 i 不一樣的
Ek = calcEk(oS, k)
"""
不能這樣寫:Ek= oS.eCache[k, 1], 因為參數 Ek 是在外循環中更新的,看後邊的
遍歷就可知,外循環一次,內循環可能運行多次,不能及時更新,所以得現算獲得最新值
"""
deltaE = abs(Ei-Ek)
if (deltaE > maxDeltaE):
maxK = k
maxDeltaE = deltaE;
Ej = Ek
return maxK, Ej
else: # 如果這是第一次循環,那麽緩存中全為 0, 此時就隨機選擇一個 alpha 值
j = selectJrand(i, oS.m)
Ej = calcEk(oS, j)
return j, Ej
# 計算誤差值並存入緩存當中。在對 alpha 值進行優化之後會用到這個值
def updateEk(oS, k):
Ek = calcEk(oS, k)
oS.eCache[k] = [1, Ek] # 1 表示有效的,可取的
# 對 alpha 的修正函數
def clipAlpha(aj,H,L):
if aj > H:
aj = H
if L > aj:
aj = L
return aj
# 內循環
def innerL(i, oS):
"""
內循環要做的就是找到第二個 alpha,然後看情況是否進行優化
最後返回 0 表示 沒有進行優化,返回 1 表示進行了優化
"""
# 計算 Ei,也相當於誤差
Ei = calcEk(oS, i)
# 同樣先判定是否滿足優化條件
if( ((oS.Y[i]*Ei < -oS.tol) and (oS.alphas[i] < oS.C)) or
((oS.Y[i]*Ei > oS.tol) and (oS.alphas[i] > 0))):
# 到這兒說明滿足了優化的條件,啟發式選擇第二個 alpha
j, Ej = selectJ(i, oS, Ei)
# 更新 alpha 前先復制一下,作為 old
alphaIold = oS.alphas[i].copy()
alphaJold = oS.alphas[j].copy()
# 計算 L 和 H, alpha 和 L,H 的關系是 0 <= L <= alpha <= H <= C
# 異號的情況, alpha 相減, 否則同號,相加
if( oS.Y[i] != oS.Y[j]):
L = max(0, oS.alphas[j] - oS.alphas[i])
H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i])
else:
L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C)
H = min(oS.C, oS.alphas[j] + oS.alphas[i])
# 如果 L == H,那就沒什麽優化的了,continue
if L == H:
print("L == H")
return 0
# 計算 eta,eta 是 alphas[j] 的最優修改量,如果 eta <= 0,需要退出
# for 循環叠代的過程,實際上是比較邊界值,取較小,在此先不處理
#==============================================================================
# eta = np.sum(oS.X[i,:]*oS.X[i,:]) + np.sum(oS.X[j,:]*oS.X[j,:]) - \
# 2. * np.sum(oS.X[i,:]*oS.X[j,:])
#==============================================================================
eta = oS.K[i,i] + oS.K[j,j] - 2.* oS.K[i,j]
if eta <= 0:
print("eta <= 0")
return 0
# 準備好之後,就可以計算出新的 alphas[j] 值
oS.alphas[j] = alphaJold + oS.Y[j]*(Ei-Ej) / eta
# 此時還需要對 alphas[j] 進行修正
oS.alphas[j] = clipAlpha(oS.alphas[j], H, L)
# 更新誤差緩存
updateEk(oS, j)
# 檢查alpha[j]是否只是輕微的改變,如果是的話,就退出for循環
if( abs(oS.alphas[j] - alphaJold) < 0.00001 ):
print("j is not moving enough")
return 0
# 下面對 i 進行修正,修改量與 j 相同,但方向相反
oS.alphas[i] = alphaIold + oS.Y[i]*oS.Y[j]*(alphaJold-oS.alphas[j])
# 更新誤差緩存
updateEk(oS, i)
# 下面計算參數 b
bi = oS.b - Ei - oS.Y[i]*(oS.alphas[i]-alphaIold)*oS.K[i,i] -\
oS.Y[j]*(oS.alphas[j]-alphaJold)*oS.K[i,j]
bj = oS.b - Ej - oS.Y[i]*(oS.alphas[i]-alphaIold)*oS.K[i,j] -\
oS.Y[j]*(oS.alphas[j]-alphaJold)*oS.K[j,j]
# b 的更新條件
if( 0 < oS.alphas[i] and oS.alphas[i] < oS.C):
oS.b = bi
elif( 0 < oS.alphas[j] and oS.alphas[j] < oS.C):
oS.b = bj
else:
oS.b = (bi+bj) / 2.
return 1
else:
return 0
# 外循環
def smoP(dataArr, yArr, C, toler, maxIter, kTup = ('lin', 0)):
"""smoSimple
Args:
dataArr 特征集合
yArr 類別標簽
C 松弛變量(常量值),允許有些數據點可以處於分隔面的錯誤一側。
控制最大化間隔和保證大部分的函數間隔小於 1.0 這兩個目標的權重。
可以通過調節該參數達到不同的結果。
toler 容錯率(是指在某個體系中能減小一些因素或選擇對某個系統產生不穩定的概率。)
maxIter 退出前最大的循環次數(alpha 不發生變化時叠代的次數)
Returns:
b 模型的常量值
alphas 拉格朗日乘子
"""
oS = optStruct(dataArr, yArr, C, toler, kTup)
iterations = 0 # 記錄叠代次數
entireSet = True # 因為要在非邊界循環和完整遍歷之間進行切換,所以做個標誌
"""
設置一個參數 alphaPairsChanged 記錄 alpha 是否已經進行優化,每次循環開始
記為 0,然後對整個集合順序遍歷, 如果沒變化,則記為叠代一次
"""
alphaPairsChanged = 0
# 只有在所有數據集上遍歷 maxIter 次,且不再發生任何 alpha 修改之後,才退出 while 循環
# 這裏的 iteration 與 simple SVM 版本有所不同,這裏不管更新沒更新 alpha,遍歷一次就 +1
while(iterations < maxIter) and ((alphaPairsChanged > 0) or (entireSet)):
alphaPairsChanged = 0
# 在非邊界循環和完整遍歷之間進行切換
if entireSet: # 遍歷所有值
for i in range(oS.m): # 尋找任意可能的 alpha
alphaPairsChanged += innerL(i, oS) # 記錄 alpha 發生變化的次數
print("fullSet, iter: %d , i: %d, pairs changed %d" % (iterations, i, alphaPairsChanged))
iterations += 1
else: # 遍歷非邊界值,即不在邊界 0 或 C 上的值
nonBoundIs = list(np.nonzero((oS.alphas > 0) * (oS.alphas < C))[0])
for i in nonBoundIs:
alphaPairsChanged += innerL(i, oS)
print("non-bound, iter: %d , i: %d, pairs changed %d" % (iterations, i, alphaPairsChanged))
iterations += 1
if entireSet: # 試試加上 and iterations > 200,強制優化
entireSet = False
elif (alphaPairsChanged == 0): # 等於 0 表示沒有做什麽優化
entireSet = True
print("iteration number: %d" % iterations)
return oS.b, oS.alphas
def testSVM(filename):
dataArr, yArr = loadData(filename)
C = 100
toler = 0.0001
maxIter = 10000
kTup = ('rbf', 1.3)
b, alphas = smoP(dataArr, yArr, C, toler, maxIter, kTup)
showData(filename, line = (b, alphas, kTup))
#return b, alphas
if __name__=="__main__":
testSVM("testSetRBF.txt")
#showData("testSet.txt", line = (b, alphas))
#showData("testSetRBF.txt")
?
?
?
支持向量機手把手實戰(進階版本)