三種聚類方法的簡單實現
阿新 • • 發佈:2019-01-05
聚類是機器學習中的無監督學習方法的重要一種,近來看了周志華老師的機器學習,專門研究了有關於聚類的一章,收穫很多,對於其中的演算法也動手實現了一下。主要實現的包括比較常見的k均值聚類、密度聚類和層次聚類,這三種聚類方法上原理都不難,演算法過程也很清晰明白。有關於原理可以參閱周志華老師的機器學習第九章,這裡只做一下程式碼的實現。
執行環境是Python2.7+numpy,說實話,numpy坑還是挺多的,其實用Matlab可能會更簡單。
k均值聚類,核心是是不斷更新簇樣本的質心。
密度聚類,基本思路就是將所有密度可達的點都歸為一簇。#encoding=utf-8 __author__ = 'freedom' from numpy import* import matplotlib.pyplot as plt def loadDataSet(fileName): ''' 本函式用於載入資料 :param fileName: 資料檔名 :return:資料集,具有矩陣形式 ''' fr = open(fileName) dataSet = [] for line in fr.readlines(): curLine = line.strip().split('\t') inLine = map(float,curLine) # 利用map廣播,是的讀入的字串變為浮點型 dataSet.append(inLine) return mat(dataSet) def getDistance(vecA,vecB): ''' 本函式用於計算歐氏距離 :param vecA: 向量A :param vecB: 向量B :return:歐氏距離 ''' return sqrt(sum(power(vecA-vecB,2))) def randCent(dataSet,k): ''' 本函式用於生成k個隨機質心 :param dataSet: 資料集,具有矩陣形式 :param k:指定的質心個數 :return:隨機質心,具有矩陣形式 ''' n = shape(dataSet)[1] # 獲取特徵數目 centRoids = mat(zeros((k,n))) for j in range(n): minJ = min(dataSet[:,j]) # 獲取每個特徵的最小值 rangeJ = float(max(dataSet[:,j]-minJ)) # 獲取每個特徵的範圍 centRoids[:,j] = minJ + rangeJ*random.rand(k,1) # numpy下的rand表示隨機生成k*1的隨機數矩陣,範圍0-1 return centRoids def kMeans(dataSet,k,disMens = getDistance,createCent = randCent): ''' 本函式用於k均值聚類 :param dataSet: 資料集,要求有矩陣形式 :param k: 指定聚類的個數 :param disMens: 求解距離的方式,除歐式距離還可以定義其他距離計算方式 :param createCent: 生成隨機質心方式 :return:隨機質心,簇索引和誤差距離矩陣 ''' m = shape(dataSet)[0] clusterAssment = mat(zeros((m,2))) # 要為每個樣本建立一個簇索引和相對的誤差,所以需要m行的矩陣,m就是樣本數 centRoids = createCent(dataSet,k) # 生成隨機質心 clusterChanged = True while clusterChanged: clusterChanged = False for i in range(m): # 遍歷所有樣本 minDist = inf;minIndex = -1 # 初始化最小值 for j in range(k): # 遍歷所有質心 disJI = disMens(centRoids[j,:],dataSet[i,:]) if disJI < minDist: minDist = disJI;minIndex = j # 找出距離當前樣本最近的那個質心 if clusterAssment[i,0] != minIndex: # 更新當前樣本點所屬於的質心 clusterChanged = True # 如果當前樣本點不屬於當前與之距離最小的質心,則說明簇分配結果仍需要改變 clusterAssment[i,:] = minIndex,minDist**2 for cent in range(k): ptsInClust = dataSet[nonzero(clusterAssment[:,0].A == cent)[0]] # nonzero 返回的是矩陣中所有非零元素的座標,座標的行數與列數個存在一個數組或矩陣當中 # 矩陣支援檢查元素的操作,所有可以寫成matrix == int這種形式,返回的一個布林型矩陣,代表矩陣相應位置有無此元素 # 這裡指尋找當前質心下所聚類的樣本 centRoids[cent,:] = mean(ptsInClust,axis = 0) # 更新當前的質心為所有樣本的平均值,axis = 0代表對列求平均值 return centRoids,clusterAssment def plotKmens(dataSet,k,clusterMeans): ''' 本函式用於繪製kMeans的二維聚類圖 :param dataSet: 資料集 :param k: 聚類的個數 :return:無 ''' centPoids,assment = clusterMeans(dataSet,k) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(dataSet[:,0],dataSet[:,1],c = 'blue') ax.scatter(centRoids[:,0],centRoids[:,1],c = 'red',marker = '+',s = 70) plt.show() def binKMeans(dataSet, k, distMeas = getDistance): ''' 本函式用於二分k均值演算法 :param dataSet: 資料集,要求有矩陣形式 :param k: 指定聚類個數 :param distMeas: 求解距離的方式 :return:質心,簇索引和誤差距離矩陣 ''' m = shape(dataSet)[0] clusterAssment = mat(zeros((m,2))) centRoids0 = mean(dataSet,axis = 0).tolist()[0] # 初始化一個簇,只有一個質心,分量就是就是所有特徵的均值 # 注意,tolist函式用於將矩陣轉化為一個列表,此列表為巢狀列表 #print centRoids0 centList = [centRoids0] for j in range(m): # 遍歷所有樣本,計算所有樣本與當前質心的距離作為誤差 clusterAssment[j,1] = distMeas(mat(centRoids0),dataSet[j,:])**2 while (len(centList) < k): # 迴圈條件為當前質心數目還不夠指定數目 lowestSSE = inf for i in range(len(centList)): # 遍歷所有質心 ptsCurrCluster = dataSet[nonzero(clusterAssment[:,0].A == i)[0],:] # 搜尋到當前質心所聚類的樣本 centroidsMat,splitClusterAss = kMeans(ptsCurrCluster,2,distMeas) # 將當前分割成兩個簇 sseSplit = sum(splitClusterAss[:,1]) # 計算分裂簇後的SSE sseNotSplit = sum(clusterAssment[nonzero(clusterAssment[:,0].A != i)[0],1]) # 計算分裂之前的SSE if (sseSplit + sseNotSplit) < lowestSSE: # 如果分裂之後的SSE小,則更新 bestCent2Split = i bestNewCents = centroidsMat bestClustAss = splitClusterAss.copy() lowestSSE = sseSplit+sseNotSplit #重新編制簇的編號,凡是分裂後編號為1的簇,編號為質心列表長度,編號為0的簇,編號為最佳分裂質心的編號,以此更新 bestClustAss[nonzero(bestClustAss[:,0].A == 1)[0],0] = len(centList) bestClustAss[nonzero(bestClustAss[:,0].A == 0)[0],0] = bestCent2Split centList[bestCent2Split] = bestNewCents[0,:].tolist()[0] # 新增分裂的質心到質心列表中 centList.append(bestNewCents[1,:].tolist()[0]) clusterAssment[nonzero(clusterAssment[:,0].A == bestCent2Split)[0],:] = bestClustAss return mat(centList),clusterAssment def biKmeans(dataSet, k, distMeas=getDistance): m = shape(dataSet)[0] clusterAssment = mat(zeros((m,2))) centroid0 = mean(dataSet, axis=0).tolist()[0] centList =[centroid0] #create a list with one centroid for j in range(m):#calc initial Error clusterAssment[j,1] = distMeas(mat(centroid0), dataSet[j,:])**2 while (len(centList) < k): lowestSSE = inf for i in range(len(centList)): ptsInCurrCluster = dataSet[nonzero(clusterAssment[:,0].A==i)[0],:]#get the data points currently in cluster i centroidMat, splitClustAss = kMeans(ptsInCurrCluster, 2, distMeas) sseSplit = sum(splitClustAss[:,1])#compare the SSE to the currrent minimum sseNotSplit = sum(clusterAssment[nonzero(clusterAssment[:,0].A!=i)[0],1]) print "sseSplit, and notSplit: ",sseSplit,sseNotSplit if (sseSplit + sseNotSplit) < lowestSSE: bestCentToSplit = i bestNewCents = centroidMat bestClustAss = splitClustAss.copy() lowestSSE = sseSplit + sseNotSplit bestClustAss[nonzero(bestClustAss[:,0].A == 1)[0],0] = len(centList) #change 1 to 3,4, or whatever bestClustAss[nonzero(bestClustAss[:,0].A == 0)[0],0] = bestCentToSplit print 'the bestCentToSplit is: ',bestCentToSplit print 'the len of bestClustAss is: ', len(bestClustAss) centList[bestCentToSplit] = bestNewCents[0,:].tolist()[0]#replace a centroid with two best centroids centList.append(bestNewCents[1,:].tolist()[0]) clusterAssment[nonzero(clusterAssment[:,0].A == bestCentToSplit)[0],:]= bestClustAss#reassign new clusters, and SSE return mat(centList), clusterAssment
#encoding=utf-8 import numpy as np import kmeans as km import matplotlib.pyplot as plt def createDisMat(dataMat): m = dataMat.shape[0] n = dataMat.shape[1] distMat = np.mat(np.zeros((m,m))) # 初始化距離矩陣,這裡預設使用歐式距離 for i in range(m): for j in range(m): if i == j: distMat[i,j] = 0 else: dist = km.getDistance(dataMat[i,:],dataMat[j,:]) distMat[i,j] = dist distMat[j,i] = dist return distMat def findCore(dataMat,delta,minPts): core = [] m = dataMat.shape[0] n = dataMat.shape[1] distMat = createDisMat(dataMat) for i in range(m): temp = distMat[i,:] < delta # 單獨抽取矩陣一行做過濾,凡是小於鄰域值的都被標記位True型別 ptsNum = np.sum(temp,1) # 按行加和,統計小於鄰域值的點個數 if ptsNum >= minPts: core.append(i) # 滿足條件,增加核心點 return core def DBSCAN(dataMat,delta,minPts): k = 0 m = dataMat.shape[0] distMat = createDisMat(dataMat) # 獲取距離矩陣 core = findCore(dataMat,delta,minPts) # 獲取核心點列表 unVisit = [1] * m # hash值作為標記,當某一位置的資料位1時,表示還未被訪問,為0表示已經被訪問 Q = [] ck = [] unVistitOld = [] while len(core) != 0: print 'a' unVistitOld = unVisit[:] # 保留原始的未被訪問集 i = np.random.choice(core) # 在核心點集中隨機選擇樣本 Q.append(i) # 加入對列Q unVisit[i] = 0 #剔除當前加入對列的資料,表示已經訪問到了 while len(Q) != 0: print len(Q) temp = distMat[Q[0],:]<delta # 獲取在此核心點鄰域範圍內的點集 del Q[0] ptsNum = np.sum(temp,1) if ptsNum >= minPts: for j in range(len(unVisit)): if unVisit[j] == 1 and temp[0,j] == True: Q.append(j) unVisit[j] = 0 k += 1 ck.append([]) for index in range(m): if unVistitOld[index] == 1 and unVisit[index] == 0: # 上一輪未被訪問到此輪被訪問到的點均要加入當前簇 ck[k-1].append(index) if index in core: # 在核心點集中清除當前簇的點 del core[core.index(index)] return ck def plotAns(dataSet,ck): fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(dataSet[ck[0],0],dataSet[ck[0],1],c = 'blue') ax.scatter(dataSet[ck[1],0],dataSet[ck[1],1],c = 'red') ax.scatter(dataSet[ck[2],0],dataSet[ck[2],1],c = 'green') ax.scatter(dataSet[ck[3],0],dataSet[ck[3],1],c = 'yellow') #ax.scatter(centRoids[:,0],centRoids[:,1],c = 'red',marker = '+',s = 70) plt.show() if __name__ == '__main__': dataMat = km.loadDataSet("testSet.txt") # distMat = createDisMat(dataMat) # core = findCore(dataMat,1,5) # print distMat # print len(core) ck = DBSCAN(dataMat,2,15) print ck print len(ck) plotAns(dataMat,ck)
層次聚類,核心是定義了簇之間的距離衡量,不斷尋找距離最近的簇歸為一簇。
#encoding=utf-8 import numpy as np import DBSCAN as db import kmeans as km def calcDistByMin(dataMat,ck1,ck2): # 最小距離點作為簇間的距離 min = np.inf for vec1 in ck1: for vec2 in ck2: dist = km.getDistance(dataMat[vec1,:],dataMat[vec2,:]) if dist <= min: min = dist return min def calcDistByMax(dataMat,ck1,ck2): # 最大距離點作為簇間的距離 max = 0 for vec1 in ck1: for vec2 in ck2: dist = km.getDistance(dataMat[vec1,:],dataMat[vec2,:]) if dist >= max: max = dist return max def createDistMat(dataMat,calcDistType = calcDistByMin): # 生成初始的距離矩陣 m = dataMat.shape[0] distMat = np.mat(np.zeros((m,m))) for i in range(m): for j in range(m): listI = [i];listJ = [j] # 為配合距離函式的輸入引數形式,在這裡要列表化一下 distMat[i,j] = calcDistType(dataMat,listI,listJ) distMat[j,i] = distMat[i,j] return distMat def findMaxLoc(distMat,q): # 尋找矩陣中最小的元素並返回其位置,注意,這裡不能返回相同的座標 min = np.inf I = J = 0 for i in range(q): for j in range(q): if distMat[i,j] < min and i != j: min = distMat[i,j] I = i J = j return I,J def ANGES(dataMat,k,calcDistType = calcDistByMax): m = dataMat.shape[0] ck = [] for i in range(m): ck.append([i]) distMat = createDistMat(dataMat,calcDistType) q = m # 初始化點集個數 while q > k: i,j = findMaxLoc(distMat,q) #print i,j if i > j: i,j = j,i # 保證i<j,這樣做是為了刪除的是序號較大的簇 ck[i].extend(ck[j]) # 把序號較大的簇併入序號小的簇 del ck[j] # 刪除序號大的簇 distMat = np.delete(distMat,j,0) # 在距離矩陣中刪除該簇的資料,注意這裡delete函式有返回值,否則不會有刪除作用 distMat = np.delete(distMat,j,1) print distMat.shape for index in range(0,q-1): # 重新計算新簇和其餘簇之間的距離 distMat[i,index] = calcDistType(dataMat,ck[i],ck[index]) distMat[i,index] = distMat[index,i] q -= 1 # 一個點被分入簇中,自減 return ck if __name__ == '__main__': dataMat = km.loadDataSet("testSet.txt") ck = ANGES(dataMat,4) print ck db.plotAns(dataMat,ck)