K-臨近演算法介紹和實踐
K-臨近演算法概述
KNN演算法(K-Nearest Neighbors)演算法是機器學習領域廣泛使用的分類演算法之一,所謂KNN,說的就是每個樣本的分類都可以用它最接近的K個鄰居來代表。
KNN演算法的基本思路是:
1、給定一個訓練集資料,每個訓練集資料都是已經分好類的。
2、設定一個初始的測試資料a,計算a到訓練集所有資料的歐幾里得距離,並排序。
3、選出訓練集中離a距離最近的K個訓練集資料。
4、比較k個訓練集資料,選出裡面出現最多的分類型別,此分類型別即為最終測試資料a的分類。
Python中的KNN相關API
在python中,我們可以很容易使用sklearn中的neighbors module來建立我們的KNN模型。
在API文件中,neighbors模組中的方法如下:
接下來我們用一個簡單的例子只管闡述一下KNN演算法的實現。
K臨近演算法的hello world
首先我們要建立二維空間上的一些點用於我們測試:
from sklearn.neighbors import NearestNeighbors
import numpy as np
X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
由以上程式碼可以看到,我們在二維空間中建立了6個測試點用於我們的實驗,接下來我們便是要使用KNN演算法,訓練出一個合適我們這六個點的一個分類模型。
#NearestNeighbors用到的引數解釋
#n_neighbors=5,預設值為5,表示查詢k個最近鄰的數目
#algorithm='auto',指定用於計算最近鄰的演算法,auto表示試圖採用最適合的演算法計算最近鄰
#fit(X)表示用X來訓練演算法
nbrs = NearestNeighbors(n_neighbors=2, algorithm="ball_tree").fit(X)
這樣,我們最簡單的一個KNN的分類器就完成了,接下來看看效果。
#返回的indices是距離該點較近的k個點的下標,distance則是距離 distances, indices = nbrs.kneighbors(X)
結果如下圖所示:
第一個矩陣是distances矩陣,第二個則表示k個距離該點較近的點的座標。
還可以輸入以下程式碼視覺化結果:
#輸出的是求解n個最近鄰點後的矩陣圖,1表示是最近點,0表示不是最近點
print nbrs.kneighbors_graph(X).toarray()
使用KNN演算法用於監督學習的時候也是非常簡單的:
如下圖所示:
使用K近鄰演算法檢測異常操作
在黑客入侵Web伺服器之後,通常會使用一些系統漏洞進行提權,獲得伺服器的root許可權,在接下來的工作中,我們通過蒐集Linux伺服器的bash操作日誌,通過訓練識別出特定使用者的使用習慣,然後進一步識別出異常操作。
資料情況大概如下:
訓練資料中包含50個使用者的操作日誌,每個日誌包含了15000條操作命令,其中前5000條為正常操作,後面10000條隨機包含有異常操作。為了方便訓練,我們將100個命令視為一個操作序列,在label.txt中是對應每個使用者的後100個操作序列中是否異常的標籤。正常為0,異常為1。
原始碼如下:
# -*- coding:utf-8 -*-
import sys
import urllib
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import nltk
import matplotlib.pyplot as plt
from nltk.probability import FreqDist
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn import metrics
#測試樣本數
N=90
def load_user_cmd(filename):
cmd_list=[]
dist_max=[]
dist_min=[]
dist=[]
with open(filename) as f:
i=0
x=[]
for line in f:
line=line.strip('\n')
x.append(line)
dist.append(line)
i+=1
#每100條命令組成一個操作序列
if i == 100:
cmd_list.append(x)
x=[]
i=0
#統計最頻繁使用的前50個命令和最少使用的50個命令
fdist =list(FreqDist(dist).keys())
dist_max=set(fdist[0:50])
dist_min = set(fdist[-50:])
return cmd_list,dist_max,dist_min
#特徵化使用者使用習慣
def get_user_cmd_feature(user_cmd_list,dist_max,dist_min):
user_cmd_feature=[]
for cmd_block in user_cmd_list:
#以100個命令為統計單元,作為一個操作序列,去重後的操作命令個數作為特徵
#將list轉為set去重
f1=len(set(cmd_block))
#FreqDist轉為統計字典轉化為命令:出現次數的形式
fdist = list(FreqDist(cmd_block).keys())
#最頻繁使用的10個命令
f2=fdist[0:10]
#最少使用的10個命令
f3 = fdist[-10:]
f2 = len(set(f2) & set(dist_max))
f3 = len(set(f3) & set(dist_min))
#返回該統計單元中和總的統計的最頻繁使用的前50個命令和最不常使用的50個命令的重合程度
#f1:統計單元中出現的命令型別數量
#f2:統計單元中最常使用的10個命令和總的最常使用的命令的重合程度
#f3:統計單元中最不常使用的10個命令和總的最不常使用的命令的重合程度
x=[f1,f2,f3]
user_cmd_feature.append(x)
return user_cmd_feature
def get_label(filename,index=0):
x=[]
with open(filename) as f:
for line in f:
line=line.strip('\n')
x.append( int(line.split()[index]))
return x
if __name__ == '__main__':
user_cmd_list,user_cmd_dist_max,user_cmd_dist_min=load_user_cmd("../data/MasqueradeDat/User3")
user_cmd_feature=get_user_cmd_feature(user_cmd_list,user_cmd_dist_max,user_cmd_dist_min)
#index=2 即為User3對應的label
labels=get_label("../data/MasqueradeDat/label.txt",2)
#前5000個記錄為正常操作 即前50個序列為正常操作
y=[0]*50+labels
x_train=user_cmd_feature[0:N]
y_train=y[0:N]
x_test=user_cmd_feature[N:150]
y_test=y[N:150]
neigh = KNeighborsClassifier(n_neighbors=3)
neigh.fit(x_train, y_train)
y_predict=neigh.predict(x_test)
score=np.mean(y_test==y_predict)*100
print(y_test)
print(y_predict)
print(score)
print(classification_report(y_test, y_predict))
print(metrics.confusion_matrix(y_test, y_predict))
執行一下,可以看到準確率約為83.3%,結果不是很理想。
如果我們想提高我們的驗證效果,我們需要更加全盤考慮異常操作的出現。因此接下來,我們使用全量比較,來重新對使用者的是否異常進行訓練。
在資料蒐集和清洗的時候,我們使用dict資料結構,將全部命令去重之後形成一個大型的向量空間,每個命令代表一個特徵,首先通過遍歷全部命令生成對應的詞集。因此我們重寫load_user_cmd方法,如下:
def load_user_cmd_new(filename):
cmd_list=[]
dist=[]
with open(filename) as f:
i=0
x=[]
for line in f:
line=line.strip('\n')
x.append(line)
dist.append(line)
i+=1
if i == 100:
cmd_list.append(x)
x=[]
i=0
fdist = list(FreqDist(dist).keys())
return cmd_list,fdist
得到詞集之後,將命令向量化,方便模型的訓練。因此再重寫get_user_feature方法,重新獲取使用者特徵。
def get_user_cmd_feature_new(user_cmd_list,dist):
user_cmd_feature=[]
for cmd_list in user_cmd_list:
v=[0]*len(dist)
for i in range(0,len(dist)):
if dist[i] in cmd_list:
v[i]+=1
user_cmd_feature.append(v)
return user_cmd_feature
訓練新模型和之前的方法一樣。
效果驗證的時候,使用交叉驗證,通過十次隨機取樣和驗證,提高驗證的可信度:
score=cross_val_score(neigh,user_cmd_feature,y,n_jobs = -1,cv=10)
最後可以得到一個準確率約在93%左右的較好的模型。