基於Cora資料集的GCN節點分類-老年痴呆自我回憶手冊
阿新 • • 發佈:2020-12-20
Data的格式
Data = namedtuple('Data', ['x', 'y', 'adjacency', 'train_mask', 'val_mask', 'test_mask'])
Cora資料,包括資料下載,處理,載入等功能。當資料的快取檔案存在時,將使用快取檔案,否則將下載、進行處理,並快取到磁碟
class CoraData(object): download_url = "https://github.com/kimiyoung/planetoid/raw/master/data" filenames = ["ind.cora.{}".format(name) for name in ['x', 'tx', 'allx', 'y', 'ty', 'ally', 'graph', 'test.index']] def __init__(self, data_root="cora", rebuild=False): self.data_root = data_root save_file = osp.join(self.data_root, "processed_cora.pkl") #Python join() 方法用於將序列中的元素以指定的字元連線生成一個新的字串。 if osp.exists(save_file) and not rebuild: print("Using Cached file: {}".format(save_file)) self._data = pickle.load(open(save_file, "rb")) else: self.maybe_download() self._data = self.process_data() with open(save_file, "wb") as f: pickle.dump(self.data, f) print("Cached file: {}".format(save_file)) #Cached file快取檔案 def data(self): """返回Data資料物件,包括x, y, adjacency, train_mask, val_mask, test_mask""" return self._data def process_data(self): """ 處理資料,得到節點特徵和標籤,鄰接矩陣,訓練集、驗證集以及測試集 引用自:https://github.com/rusty1s/pytorch_geometric """ print("Process data ...") _, tx, allx, y, ty, ally, graph, test_index = [self.read_data( osp.join(self.data_root, "raw", name)) for name in self.filenames] train_index = np.arange(y.shape[0]) val_index = np.arange(y.shape[0], y.shape[0] + 500) sorted_test_index = sorted(test_index) x = np.concatenate((allx, tx), axis=0) y = np.concatenate((ally, ty), axis=0).argmax(axis=1) x[test_index] = x[sorted_test_index] y[test_index] = y[sorted_test_index] num_nodes = x.shape[0] train_mask = np.zeros(num_nodes, dtype=np.bool) val_mask = np.zeros(num_nodes, dtype=np.bool) test_mask = np.zeros(num_nodes, dtype=np.bool) train_mask[train_index] = True val_mask[val_index] = True test_mask[test_index] = True adjacency = self.build_adjacency(graph) print("Node's feature shape: ", x.shape) print("Node's label shape: ", y.shape) print("Adjacency's shape: ", adjacency.shape) print("Number of training nodes: ", train_mask.sum()) print("Number of validation nodes: ", val_mask.sum()) print("Number of test nodes: ", test_mask.sum()) return Data(x=x, y=y, adjacency=adjacency, train_mask=train_mask, val_mask=val_mask, test_mask=test_mask) def maybe_download(self): save_path = os.path.join(self.data_root, "raw") for name in self.filenames: if not osp.exists(osp.join(save_path, name)): self.download_data( "{}/{}".format(self.download_url, name), save_path) @staticmethod def build_adjacency(adj_dict): """根據鄰接表建立鄰接矩陣""" edge_index = [] num_nodes = len(adj_dict) for src, dst in adj_dict.items(): edge_index.extend([src, v] for v in dst) edge_index.extend([v, src] for v in dst) # 去除重複的邊 edge_index = list(k for k, _ in itertools.groupby(sorted(edge_index))) edge_index = np.asarray(edge_index) adjacency = sp.coo_matrix((np.ones(len(edge_index)), (edge_index[:, 0], edge_index[:, 1])), shape=(num_nodes, num_nodes), dtype="float32") return adjacency @staticmethod def read_data(path): """使用不同的方式讀取原始資料以進一步處理""" name = osp.basename(path) if name == "ind.cora.test.index": out = np.genfromtxt(path, dtype="int64") return out else: out = pickle.load(open(path, "rb"), encoding="latin1") out = out.toarray() if hasattr(out, "toarray") else out return out @staticmethod def download_data(url, save_path): """資料下載工具,當原始資料不存在時將會進行下載""" if not os.path.exists(save_path): os.makedirs(save_path) data = urllib.request.urlopen(url) filename = os.path.split(url)[-1] with open(os.path.join(save_path, filename), 'wb') as f: f.write(data.read()) return True @staticmethod def normalization(adjacency): """計算 L=D^-0.5 * (A+I) * D^-0.5""" adjacency += sp.eye(adjacency.shape[0]) # 增加自連線 degree = np.array(adjacency.sum(1)) d_hat = sp.diags(np.power(degree, -0.5).flatten()) return d_hat.dot(adjacency).dot(d_hat).tocoo()
# # 基於Cora資料集的GCN節點分類 # In[1]: import itertools #迴圈器每次返回的物件將賦予給i,直到迴圈結束 import os import os.path as osp import pickle #可以將物件以檔案的形式存放在磁碟上。 import urllib #urllib是python內建的http請求庫 from collections import namedtuple #具名元組 import numpy as np import scipy.sparse as sp import torch import torch.nn as nn import torch.nn.functional as F import torch.nn.init as init import torch.optim as optim import matplotlib.pyplot as plt # ## 資料準備 # In[2]: Data = namedtuple('Data', ['x', 'y', 'adjacency', 'train_mask', 'val_mask', 'test_mask']) class CoraData(object): download_url = "https://github.com/kimiyoung/planetoid/raw/master/data" filenames = ["ind.cora.{}".format(name) for name in ['x', 'tx', 'allx', 'y', 'ty', 'ally', 'graph', 'test.index']] def __init__(self, data_root="cora", rebuild=False): """Cora資料,包括資料下載,處理,載入等功能 當資料的快取檔案存在時,將使用快取檔案,否則將下載、進行處理,並快取到磁碟 處理之後的資料可以通過屬性 .data 獲得,它將返回一個數據物件,包括如下幾部分: * x: 節點的特徵,維度為 2708 * 1433,型別為 np.ndarray * y: 節點的標籤,總共包括7個類別,型別為 np.ndarray * adjacency: 鄰接矩陣,維度為 2708 * 2708,型別為 scipy.sparse.coo.coo_matrix * train_mask: 訓練集掩碼向量,維度為 2708,當節點屬於訓練集時,相應位置為True,否則False * val_mask: 驗證集掩碼向量,維度為 2708,當節點屬於驗證集時,相應位置為True,否則False * test_mask: 測試集掩碼向量,維度為 2708,當節點屬於測試集時,相應位置為True,否則False Args: ------- data_root: string, optional 存放資料的目錄,原始資料路徑: {data_root}/raw 快取資料路徑: {data_root}/processed_cora.pkl rebuild: boolean, optional 是否需要重新構建資料集,當設為True時,如果存在快取資料也會重建資料 """ self.data_root = data_root save_file = osp.join(self.data_root, "processed_cora.pkl") #Python join() 方法用於將序列中的元素以指定的字元連線生成一個新的字串。 if osp.exists(save_file) and not rebuild: print("Using Cached file: {}".format(save_file)) self._data = pickle.load(open(save_file, "rb")) else: self.maybe_download() self._data = self.process_data() with open(save_file, "wb") as f: pickle.dump(self.data, f) print("Cached file: {}".format(save_file)) @property def data(self): """返回Data資料物件,包括x, y, adjacency, train_mask, val_mask, test_mask""" return self._data def process_data(self): """ 處理資料,得到節點特徵和標籤,鄰接矩陣,訓練集、驗證集以及測試集 引用自:https://github.com/rusty1s/pytorch_geometric """ print("Process data ...") _, tx, allx, y, ty, ally, graph, test_index = [self.read_data( osp.join(self.data_root, "raw", name)) for name in self.filenames] train_index = np.arange(y.shape[0]) val_index = np.arange(y.shape[0], y.shape[0] + 500) sorted_test_index = sorted(test_index) x = np.concatenate((allx, tx), axis=0) y = np.concatenate((ally, ty), axis=0).argmax(axis=1) x[test_index] = x[sorted_test_index] y[test_index] = y[sorted_test_index] num_nodes = x.shape[0] train_mask = np.zeros(num_nodes, dtype=np.bool) val_mask = np.zeros(num_nodes, dtype=np.bool) test_mask = np.zeros(num_nodes, dtype=np.bool) train_mask[train_index] = True val_mask[val_index] = True test_mask[test_index] = True adjacency = self.build_adjacency(graph) print("Node's feature shape: ", x.shape) print("Node's label shape: ", y.shape) print("Adjacency's shape: ", adjacency.shape) print("Number of training nodes: ", train_mask.sum()) print("Number of validation nodes: ", val_mask.sum()) print("Number of test nodes: ", test_mask.sum()) return Data(x=x, y=y, adjacency=adjacency, train_mask=train_mask, val_mask=val_mask, test_mask=test_mask) def maybe_download(self): save_path = os.path.join(self.data_root, "raw") for name in self.filenames: if not osp.exists(osp.join(save_path, name)): self.download_data( "{}/{}".format(self.download_url, name), save_path) @staticmethod def build_adjacency(adj_dict): """根據鄰接表建立鄰接矩陣""" edge_index = [] num_nodes = len(adj_dict) for src, dst in adj_dict.items(): edge_index.extend([src, v] for v in dst) edge_index.extend([v, src] for v in dst) # 去除重複的邊 edge_index = list(k for k, _ in itertools.groupby(sorted(edge_index))) edge_index = np.asarray(edge_index) adjacency = sp.coo_matrix((np.ones(len(edge_index)), (edge_index[:, 0], edge_index[:, 1])), shape=(num_nodes, num_nodes), dtype="float32") return adjacency @staticmethod def read_data(path): """使用不同的方式讀取原始資料以進一步處理""" name = osp.basename(path) if name == "ind.cora.test.index": out = np.genfromtxt(path, dtype="int64") return out else: out = pickle.load(open(path, "rb"), encoding="latin1") out = out.toarray() if hasattr(out, "toarray") else out return out @staticmethod def download_data(url, save_path): """資料下載工具,當原始資料不存在時將會進行下載""" if not os.path.exists(save_path): os.makedirs(save_path) data = urllib.request.urlopen(url) filename = os.path.split(url)[-1] with open(os.path.join(save_path, filename), 'wb') as f: f.write(data.read()) return True @staticmethod def normalization(adjacency): """計算 L=D^-0.5 * (A+I) * D^-0.5""" adjacency += sp.eye(adjacency.shape[0]) # 增加自連線 degree = np.array(adjacency.sum(1)) d_hat = sp.diags(np.power(degree, -0.5).flatten()) return d_hat.dot(adjacency).dot(d_hat).tocoo() # ## 圖卷積層定義 # In[3]: class GraphConvolution(nn.Module): def __init__(self, input_dim, output_dim, use_bias=True): """圖卷積:L*X*\theta Args: ---------- input_dim: int 節點輸入特徵的維度 output_dim: int 輸出特徵維度 use_bias : bool, optional 是否使用偏置 """ super(GraphConvolution, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.use_bias = use_bias self.weight = nn.Parameter(torch.Tensor(input_dim, output_dim)) if self.use_bias: self.bias = nn.Parameter(torch.Tensor(output_dim)) else: self.register_parameter('bias', None) self.reset_parameters() def reset_parameters(self): init.kaiming_uniform_(self.weight) if self.use_bias: init.zeros_(self.bias) def forward(self, adjacency, input_feature): """鄰接矩陣是稀疏矩陣,因此在計算時使用稀疏矩陣乘法 Args: ------- adjacency: torch.sparse.FloatTensor 鄰接矩陣 input_feature: torch.Tensor 輸入特徵 """ support = torch.mm(input_feature, self.weight) output = torch.sparse.mm(adjacency, support) if self.use_bias: output += self.bias return output def __repr__(self): return self.__class__.__name__ + ' (' + str(self.in_features) + ' -> ' + str(self.out_features) + ')' # ## 模型定義 # In[4]: class GcnNet(nn.Module): """ 定義一個包含兩層GraphConvolution的模型 """ def __init__(self, input_dim=1433): super(GcnNet, self).__init__() self.gcn1 = GraphConvolution(input_dim, 16) self.gcn2 = GraphConvolution(16, 7) def forward(self, adjacency, feature): h = F.relu(self.gcn1(adjacency, feature)) logits = self.gcn2(adjacency, h) return logits # ## 模型訓練 # In[5]: # 超引數定義 learning_rate = 0.1 #學習率或步長因子 weight_decay = 5e-4 #權重衰減(L2懲罰) epochs = 200 # In[6]: # 模型定義:Model, Loss, Optimizer device = "cuda" if torch.cuda.is_available() else "cpu" print(device) model = GcnNet().to(device) criterion = nn.CrossEntropyLoss().to(device) #交叉熵的值越小,兩個概率分佈就越接近(實際輸出(概率)與期望輸出(概率)) optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay) # In[7]: # 載入資料,並轉換為torch.Tensor dataset = CoraData().data x = dataset.x / dataset.x.sum(1, keepdims=True) # 歸一化資料,使得每一行和為1 tensor_x = torch.from_numpy(x).to(device) tensor_y = torch.from_numpy(dataset.y).to(device) tensor_train_mask = torch.from_numpy(dataset.train_mask).to(device) tensor_val_mask = torch.from_numpy(dataset.val_mask).to(device) tensor_test_mask = torch.from_numpy(dataset.test_mask).to(device) normalize_adjacency = CoraData.normalization(dataset.adjacency) # 規範化鄰接矩陣 """計算 L=D^-0.5 * (A+I) * D^-0.5""" indices = torch.from_numpy(np.asarray([normalize_adjacency.row, normalize_adjacency.col]).astype('int64')).long() #索引 values = torch.from_numpy(normalize_adjacency.data.astype(np.float32)) #數值 tensor_adjacency = torch.sparse.FloatTensor(indices, values, (2708, 2708)).to(device) #傳到GPU # In[8]: # 訓練主體函式 def train(): loss_history = [] val_acc_history = [] model.train() #保證BN層用每一批資料的均值和方差 train_y = tensor_y[tensor_train_mask] #取出訓練集 for epoch in range(epochs): #1到200 logits = model(tensor_adjacency, tensor_x) # 前向傳播 train_mask_logits = logits[tensor_train_mask] # 只選擇訓練節點進行監督 loss = criterion(train_mask_logits, train_y) # 計算損失值 optimizer.zero_grad() #是把梯度置零,也就是把loss關於weight的導數變成0. loss.backward() # 反向傳播計算引數的梯度 optimizer.step() # 使用優化方法進行梯度更新 train_acc, _, _ = test(tensor_train_mask) # 計算當前模型訓練集上的準確率 val_acc, _, _ = test(tensor_val_mask) # 計算當前模型在驗證集上的準確率 # 記錄訓練過程中損失值和準確率的變化,用於畫圖 loss_history.append(loss.item()) val_acc_history.append(val_acc.item()) print("Epoch {:03d}: Loss {:.4f}, TrainAcc {:.4}, ValAcc {:.4f}".format( epoch, loss.item(), train_acc.item(), val_acc.item())) return loss_history, val_acc_history # In[9]: # 測試函式 def test(mask): model.eval() #不啟用 BatchNormalization 和 Dropout with torch.no_grad(): #不能進行梯度計算的上下文管理器。 logits = model(tensor_adjacency, tensor_x) test_mask_logits = logits[mask] predict_y = test_mask_logits.max(1)[1] accuarcy = torch.eq(predict_y, tensor_y[mask]).float().mean() return accuarcy, test_mask_logits.cpu().numpy(), tensor_y[mask].cpu().numpy() # In[13]: def plot_loss_with_acc(loss_history, val_acc_history): fig = plt.figure() ax1 = fig.add_subplot(111) ax1.plot(range(len(loss_history)), loss_history, c=np.array([255, 71, 90]) / 255.) plt.ylabel('Loss') ax2 = fig.add_subplot(111, sharex=ax1, frameon=False) ax2.plot(range(len(val_acc_history)), val_acc_history, c=np.array([79, 179, 255]) / 255.) ax2.yaxis.tick_right() ax2.yaxis.set_label_position("right") plt.ylabel('ValAcc') plt.xlabel('Epoch') plt.title('Training Loss & Validation Accuracy') plt.show() # In[ ]: loss, val_acc = train() test_acc, test_logits, test_label = test(tensor_test_mask) print("Test accuarcy: ", test_acc.item()) # In[14]: plot_loss_with_acc(loss, val_acc) # In[ ]: