聚類——標籤傳播演算法以及Python實現
阿新 • • 發佈:2019-02-11
標籤傳播演算法(label propagation)是典型的半監督聚類演算法。半監督是指訓練資料集中小部分樣本點已知標籤,大部分樣本點未知標籤。
核心思想
相似性較大的樣本點間應該具有相同的標籤,將已知標籤通過相似性矩陣傳播到未知的標籤。
演算法簡介
基本概念
轉化矩陣:用來更新標籤,實質就是度量樣本點間相似性程度的矩陣(圖的邊的權重)。
通常使用高斯徑向基以及k近鄰方法度量。
高斯徑向基計算兩樣本點間權重:採用徑向基時,的影響非常大,且不好設定。一種啟發式的方法就是找到距離最近(
k近鄰方法計算兩樣本點間權重:其中,為樣本i的K近鄰節點集合。
clamp:標籤矩陣更新過程中,原資料集中已知標籤樣本點的標籤不能改變,故需要將其”夾住”不讓其改變。
演算法流程
- Input: 訓練資料集X_data,y_data(未知標籤的設為-1), 閾值epsilon, 最大迭代次數maxstep, 權值計算函式即相應引數
- Output: 樣本標籤陣列
- Step1: 計算轉換矩陣T。
- Step2: 更新標籤,然後clamp。若標籤矩陣更新前後差異小於epsilon或超出迭代超出maxstep則終止。
程式碼
"""
標籤傳播聚類演算法, 典型的半監督學習演算法
核心思想:相似的資料應該具有相同的標籤,構建節點間的相似性矩陣(邊的權重)
"""
import numpy as np
class LablePropagation:
def __init__(self, epsilon=1e-3, maxstep=500, kernel_option='rbf', sigma=1.0, k=10):
self.epsilon = epsilon
self.maxstep = maxstep
self.kernel_option = kernel_option
self.sigma = sigma # rbf 核函式的引數
self.k = k # knn 核函式引數
self.T = None # 未標記點間的轉換矩陣
self.Y = None # 標籤數矩陣
self.Y_clamp = None # 已知標籤資料點的標籤矩陣
self.N = None
self.labeled_inds = None # 已知標籤樣本的索引
self.labels = None
def init_param(self, X_data, y_data):
# 初始化引數
self.N = X_data.shape[0]
self.labeled_inds = np.where(y_data >= 0)[0] # 未知標籤設為-1
n_class = len(np.unique(y_data[self.labeled_inds]))
self.Y = np.zeros((self.N, n_class))
for i in self.labeled_inds:
self.Y[i][int(y_data[i])] = 1.0 # 啞編碼,對應標籤設為1
self.Y_clamp = self.Y[self.labeled_inds] # n*l
self.T = self.cal_tran_mat(X_data) # n*n
return
def cal_dis2(self, node1, node2):
# 計算節點間的歐式距離平方
return (node1 - node2) @ (node1 - node2)
def cal_tran_mat(self, data):
# 計算轉換矩陣, 即構建圖
dis_mat = np.zeros((self.N, self.N))
for i in range(self.N):
for j in range(i + 1, self.N):
dis_mat[i, j] = self.cal_dis2(data[i], data[j])
dis_mat[j, i] = dis_mat[i, j]
if self.kernel_option == 'rbf':
assert (self.sigma is not None)
T = np.exp(-dis_mat / self.sigma ** 2)
normalizer = T.sum(axis=0)
T = T / normalizer
elif self.kernel_option == 'knn':
assert (self.k is not None)
T = np.zeros((self.N, self.N))
for i in range(self.N):
inds = np.argpartition(dis_mat[i], self.k + 1)[:self.k + 1]
T[i][inds] = 1.0 / self.k # 最近的k個擁有相同的權重
T[i][i] = 0
else:
raise ValueError('kernel is not supported')
return T
def fit(self, X_data, y_data):
# 訓練主函式
self.init_param(X_data, y_data)
step = 0
while step < self.maxstep:
step += 1
new_Y = self.T @ self.Y # 更新標籤矩陣
new_Y[self.labeled_inds] = self.Y_clamp # clamp
if np.abs(new_Y - self.Y).sum() < self.epsilon:
break
self.Y = new_Y
self.labels = np.argmax(self.Y, axis=1)
return
if __name__ == '__main__':
from sklearn.datasets import make_circles
n_samples = 100
X, y = make_circles(n_samples=n_samples, shuffle=False)
outer, inner = 0, 1
labels = -np.ones(n_samples)
labels[0] = outer
labels[-1] = inner
LPA = LablePropagation(maxstep=1000, kernel_option='knn', k=2, sigma=0.07)
LPA.fit(X, labels)
labels = LPA.labels
import matplotlib.pyplot as plt
def visualize(data, labels):
color = 'bg'
unique_label = np.unique(labels)
for col, label in zip(color, unique_label):
partial_data = data[np.where(labels == label)]
plt.scatter(partial_data[:, 0], partial_data[:, 1], color=col, alpha=1)
plt.scatter(data[0, 0], data[0, 1], color='b', marker='*', s=200, alpha=0.5) # outer
plt.scatter(data[-1, 0], data[-1, 1], color='g', marker='*', s=200, alpha=0.5) # inner
plt.show()
return
visualize(X, labels)
我的GitHub
注:如有不當之處,請指正。