Python3.x實現神經網路
阿新 • • 發佈:2018-12-16
本文采用python實現神經網路,並通過實現的神經網路對手寫數字進行分類。
確定隱藏層節點數的公式:
模型的訓練和評估:
示例程式碼:
from functools import reduce import random import struct from datetime import datetime from numpy import * # 啟用函式 def sigmoid(inX): return 1.0 / (1 + exp(-inX)) # 節點類,負責記錄和維護節點自身資訊以及與這個節點相關的上下游連線,實現輸出層和誤差項的計算 class Node(object): def __init__(self, layer_index, node_index): ''' 構造節點物件 :param layer_index: 節點所屬層的編號 :param node_index: 節點的編號 ''' self.layer_index = layer_index self.node_index = node_index self.downstream = [] self.upstream = [] self.output = 0 self.delta = 0 def set_output(self, output): ''' 設定節點的輸出值,如果節點屬於輸入層會用到這個函式 :param output: :return: ''' self.output = output def append_downstream_connection(self, conn): ''' 新增一個到下游節點的連線 :param conn: :return: ''' self.downstream.append(conn) def append_upstream_connection(self, conn): ''' 新增一個到上游節點的連線 :param conn: :return: ''' self.upstream.append(conn) def calc_output(self): ''' 計算節點的輸出值 :return: ''' output = reduce(lambda ret, conn: ret + conn.upstream_node.output * conn.weight, self.upstream, 0) self.output = sigmoid(output) def calc_hidden_layer_delta(self): ''' 節點屬於隱藏層時,計算delta :return: ''' downstream_delta = reduce( lambda ret, conn: ret + conn.downstream_node.delta * conn.weight, self.downstream, 0.0 ) self.delta = self.output * (1 - self.output) * downstream_delta def calc_output_layer_delta(self, label): ''' 節點屬於輸出層時,計算delta :param label: :return: ''' self.delta = self.output * (1 - self.output) * (label - self.output) def __str__(self): ''' 列印節點資訊 :return: ''' node_str = '%u-%u: output: %f delta: %f' % (self.layer_index, self.node_index, self.output, self.delta) downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '') upstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.upstream, '') return node_str + '\n\tdownstream:' + downstream_str + '\n\tupstream:' + upstream_str # ConstNode物件,用於實現一個輸出恆為1的節點(計算偏置項的wb時需要) class ConstNode(object): def __init__(self, layer_index, node_index): ''' 構造節點物件 :param layer_index:節點所屬的層的編號 :param node_index: 節點的編號 ''' self.layer_index = layer_index self.node_index = node_index self.downstream = [] self.output = 1 def append_downstream_connection(self, conn): ''' 新增一個到下游節點的連線 :param conn: :return: ''' self.downstream.append(conn) def calc_hidden_layer_delta(self): ''' 節點屬於隱藏層時,計算delta :return: ''' downstream_delta = reduce( lambda ret, conn: ret + conn.downstream_node.delta * conn.weight, self.downstream, 0.0 ) def __str__(self): ''' 列印節點的資訊 ''' node_str = '%u-%u: output: 1' % (self.layer_index, self.node_index) downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '') return node_str + '\n\tdownstream:' + downstream_str class Layer(object): ''' 負責初始化一個層,此外,作為對Node的集合物件,提供對Node集合的操作 ''' def __init__(self, layer_index, node_count): ''' 初始化一層 :param layer_index: 層編號 :param node_count: 層所包含的節點個數 ''' self.layer_index = layer_index self.nodes = [] for i in range(node_count): self.nodes.append(Node(layer_index, i)) self.nodes.append(ConstNode(layer_index, node_count)) def set_output(self, data): ''' 設定層的輸出,當層是輸入層時會用到 :param delta: :return: ''' for i in range(len(data)): self.nodes[i].set_output(data[i]) def calc_output(self): ''' 計算層的輸出向量 :return: ''' for node in self.nodes[: -1]: node.calc_output() def dump(self): ''' 列印層的資訊 :return: ''' for node in self.nodes: print(node) class Connection(object): ''' 主要職責是記錄連線的權重,以及這個連線所關聯的上下游節點 ''' def __init__(self, upstream_node, downstream_node): ''' 初始化連線,權重初始化為一個很小的隨機數 :param upstream_node: 連線的上游節點 :param downstream_node: 連線的下游節點 ''' self.upstream_node = upstream_node self.downstream_node = downstream_node self.weight = random.uniform(-0.1, 0.1) self.gradient = 0.0 def calc_gradient(self): ''' 計算梯度 :return: ''' self.gradient = self.downstream_node.delta * self.upstream_node.output def get_gradient(self): ''' 獲得當前的梯度 :return: ''' return self.gradient def update_weight(self, rate): ''' 根據梯度下降演算法更新權重 :param rate: :return: ''' self.calc_gradient() self.weight += rate * self.gradient def __str__(self): ''' 列印連線資訊 :return: ''' return '(%u-%u) -> (%u-%u) = %f' % ( self.upstream_node.layer_index, self.upstream_node.node_index, self.downstream_node.layer_index, self.downstream_node.node_index, self.weight) class Connections(object): ''' 提供Connection的集合操作 ''' def __init__(self): self.connections = [] def add_connection(self, connection): self.connections.append(connection) def dump(self): for conn in self.connections: print(conn) class Network(object): def __init__(self, layers): ''' 初始化一個全連線的神經網路 :param layers: 二維陣列,描述神經網路每層節點數 ''' self.connections = Connections() self.layers = [] layer_count = len(layers) nodeC_count = 0 for i in range(layer_count): self.layers.append(Layer(i, layers[i])) for layer in range(layer_count - 1): connections = [Connection(upstream_node, downstream_node) for upstream_node in self.layers[layer].nodes for downstream_node in self.layers[layer + 1].nodes[:-1]] for conn in connections: self.connections.add_connection(conn) conn.downstream_node.append_upstream_connection(conn) conn.upstream_node.append_downstream_connection(conn) def train(self, labels, data_set, rate, iteration): ''' 訓練神經網路 :param labels:陣列,訓練樣本標籤,每個元素是一個樣本的標籤 :param data_set: 二維陣列,訓練樣本特徵,每個元素是一個樣本的特徵 :param rate: 學習率 :param iteration: 迭代次數 :return: ''' for i in range(iteration): for d in range(len(data_set)): self.train_one_sample(labels[d], data_set[d], rate) def train_one_sample(self, label, sample, rate): ''' 內部函式,用一個樣本訓練網路 :param label: :param sample: :param rate: :return: ''' self.predict(sample) self.calc_delta(label) self.update_weight(rate) def calc_delta(self, label): ''' 內部函式,計算每個節點的delta :param label: :return: ''' output_nodes = self.layers[-1].nodes for i in range(len(label)): output_nodes[i].calc_output_layer_delta(label[i]) for layer in self.layers[-2:: -1]: for node in layer.nodes: node.calc_hidden_layer_delta() def update_weight(self, rate): ''' 內部函式,更新每個連線的權重 :param rate: :return: ''' for layer in self.layers[: -1]: for node in layer.nodes: for conn in node.downstream: conn.update_weight(rate) def calc_gradient(self): ''' 內部函式,用於計算每個連線的梯度 :return: ''' for layer in self.layers[:-1]: for node in layer.nodes: for conn in node.downstream: conn.calc_gradient() def get_gradient(self, label, sample): ''' 獲得網路在一個樣本下,每個連線上的梯度 :param label: 樣本標籤 :param sample: 樣本輸入 :return: ''' self.predict(sample) self.calc_delta(label) self.calc_gradient() def predict(self, sample): ''' 根據輸入的樣本預測輸出值 :param sample: 陣列,樣本的特徵,也是網路的輸入向量 :return: ''' self.layers[0].set_output(sample) for i in range(1, len(self.layers)): self.layers[i].calc_output() return map(lambda node: node.output, self.layers[-1].nodes[: -1]) def dump(self): ''' 列印網路資訊 :return: ''' for layer in self.layers: layer.dump() # 梯度檢查 def gradient_check(network, sample_feature, sample_label): ''' 梯度檢查 :param network: 神經網路物件 :param sample_feature: 樣本的特徵 :param sample_label: 樣本的標籤 :return: ''' # 計算網路誤差 network_error = lambda vec1, vec2: \ 0.5 * reduce(lambda a, b: a + b, map(lambda v: (v[0] - v[1]) * (v[0] - v[1]), zip(vec1, vec2))) # 獲取網路在當前樣本下每個連線的梯度 network.get_gradient(sample_feature, sample_label) # 對每個權重對梯度檢查 for conn in network.connections.connections: # 獲取指定連線的梯度 actual_gradient = conn.get_gradient() # 增加一個很小的值,計算網路的誤差 epsilon = 0.0001 conn.weight += epsilon error1 = network_error(network.predict(sample_feature), sample_label) # 減去一個很小的值,計算網路的誤差 conn.weight -= 2 * epsilon # 剛在加過了一次,因此需要減去2倍 error2 = network_error(network.predict(sample_label), sample_label) # 根據式子計算期望的梯度值 expected_gradient = (error2 - error1) / (2 * epsilon) # 列印 print('expected gradient: \t%f\nactual gradient: \t%f' % ( expected_gradient, actual_gradient)) class Loader(object): # 資料載入器基類 def __init__(self, path ,count): ''' 初始化載入器 :param path:資料檔案路徑 :param count: 檔案中的樣本個數 ''' self.path = path self.count = count def get_file_content(self): ''' 讀取檔案內容 :return: ''' f = open(self.path, 'rb') content = f.read() f.close() return content def to_int(self, byte): ''' 將unsigned byte字元轉換為整數 :param byte: :return: ''' # return struct.unpack('B', byte)[0] return byte class ImageLoader(Loader): def get_picture(self, content, index): ''' 內部函式,從檔案中獲取影象 :param content: :param index: :return: ''' start = index * 28 * 28 + 16 picture = [] for i in range(28): picture.append([]) for j in range(28): picture[i].append( self.to_int(content[start + i * 28 + j]) ) return picture def get_one_sample(self, picture): ''' 內部函式,將影象轉化為樣本的輸入向量 :param picture: :return: ''' sample = [] for i in range(28): for j in range(28): sample.append(picture[i][j]) return sample def load(self): ''' 載入資料檔案,獲得全部樣本的輸入向量 :return: ''' content = self.get_file_content() data_set = [] for index in range(self.count): data_set.append( self.get_one_sample( self.get_picture(content, index) ) ) return data_set class LabelLoader(Loader): ''' 標籤載入器 ''' def load(self): ''' 載入資料檔案,獲得全部樣本的標籤向量 :return: ''' content = self.get_file_content() labels = [] for index in range(self.count): labels.append(self.norm(content[index + 8])) return labels def norm(self, label): ''' 內部函式,將一個值轉換為10維標籤向量 :param label: :return: ''' label_vec = [] label_value = self.to_int(label) for i in range(10): if i == label_value: label_vec.append(0.9) else: label_vec.append(0.1) return label_vec def get_training_data_set(): ''' 獲得訓練資料集 :return: ''' image_loader = ImageLoader('MNIST_data/train-images-idx3-ubyte', 60000) label_loader = LabelLoader('MNIST_data/train-labels-idx1-ubyte', 60000) return image_loader.load(), label_loader.load() def get_test_data_set(): ''' 獲得測試資料集 ''' image_loader = ImageLoader('MNIST_data/t10k-images-idx3-ubyte', 10000) label_loader = LabelLoader('MNIST_data/t10k-labels-idx1-ubyte', 10000) return image_loader.load(), label_loader.load() # 獲得輸出結果值 def get_result(vec): max_value_index = 0 max_value = 0 for i in range(len(vec)): if vec[i] > max_value: max_value = vec[i] max_value_index = i return max_value_index # 採用錯誤率評估訓練結果 def evaluate(network, test_data_set, test_labels): error = 0 total = len(test_data_set) for i in range(total): label = get_result(test_labels[i]) predict = get_result(network.predict(test_data_set[i])) if label != predict: error += 1 return float(error) / float(total) # 每訓練10輪,評估一次準確率,當準確率下降時停止訓練 def train_and_evaluate(): last_error_ratio = 1.0 epoch = 0 train_data_set, train_labels = get_training_data_set() test_data_set, test_labels = get_test_data_set() network = Network([784, 300, 10]) while True: epoch += 1 network.train(train_labels, train_data_set, 0.3, 1) print('%s epoch %d finished' % (datetime.now(), epoch)) if epoch % 10 == 0: error_ratio = evaluate(network, test_data_set, test_labels) print('%s after epoch %d, error ratio is %f' % (datetime.now(), epoch, error_ratio)) if error_ratio > last_error_ratio: break else: last_error_ratio = error_ratio # Main if __name__ == '__main__': train_and_evaluate()
執行結果:
沒有GPU,訓練時間太長了,就不給出訓練結果了,需要訓練資料可以發訊息到我郵箱。