Tensorflow學習: AlexNet完整訓練與測試程式碼
阿新 • • 發佈:2018-11-14
alexnet.py """This is an TensorFLow implementation of AlexNet by Alex Krizhevsky at all. Paper: (http://papers.nips.cc/paper/4824-imagenet-classification-with-deep-convolutional-neural-networks.pdf) Explanation can be found in my blog post: https://kratzert.github.io/2017/02/24/finetuning-alexnet-with-tensorflow.html This script enables finetuning AlexNet on any given Dataset with any number of classes. The structure of this script is strongly inspired by the fast.ai Deep Learning class by Jeremy Howard and Rachel Thomas, especially their vgg16 finetuning script: Link: - https://github.com/fastai/courses/blob/master/deeplearning1/nbs/vgg16.py The pretrained weights can be downloaded here and should be placed in the same folder as this file: - http://www.cs.toronto.edu/~guerzhoy/tf_alexnet/ @author: Frederik Kratzert (contact: f.kratzert(at)gmail.com) """ import tensorflow as tf import numpy as np class AlexNet(object): """Implementation of the AlexNet.""" def __init__(self, x, keep_prob, num_classes, skip_layer, weights_path='DEFAULT'): """Create the graph of the AlexNet model. Args: x: Placeholder for the input tensor. keep_prob: Dropout probability. num_classes: Number of classes in the dataset. skip_layer: List of names of the layer, that get trained from scratch weights_path: Complete path to the pretrained weight file, if it isn't in the same folder as this code """ # Parse input arguments into class variables self.X = x self.NUM_CLASSES = num_classes self.KEEP_PROB = keep_prob self.SKIP_LAYER = skip_layer if weights_path == 'DEFAULT': self.WEIGHTS_PATH = 'bvlc_alexnet.npy' else: self.WEIGHTS_PATH = weights_path # Call the create function to build the computational graph of AlexNet self.create() def create(self): """Create the network graph.""" # 1st Layer: Conv (w ReLu) -> Lrn -> Pool conv1 = conv(self.X, 11, 11, 96, 4, 4, padding='VALID', name='conv1') norm1 = lrn(conv1, 2, 1e-04, 0.75, name='norm1') pool1 = max_pool(norm1, 3, 3, 2, 2, padding='VALID', name='pool1') # 2nd Layer: Conv (w ReLu) -> Lrn -> Pool with 2 groups conv2 = conv(pool1, 5, 5, 256, 1, 1, groups=2, name='conv2') norm2 = lrn(conv2, 2, 1e-04, 0.75, name='norm2') pool2 = max_pool(norm2, 3, 3, 2, 2, padding='VALID', name='pool2') # 3rd Layer: Conv (w ReLu) conv3 = conv(pool2, 3, 3, 384, 1, 1, name='conv3') # 4th Layer: Conv (w ReLu) splitted into two groups conv4 = conv(conv3, 3, 3, 384, 1, 1, groups=2, name='conv4') # 5th Layer: Conv (w ReLu) -> Pool splitted into two groups conv5 = conv(conv4, 3, 3, 256, 1, 1, groups=2, name='conv5') pool5 = max_pool(conv5, 3, 3, 2, 2, padding='VALID', name='pool5') # 6th Layer: Flatten -> FC (w ReLu) -> Dropout flattened = tf.reshape(pool5, [-1, 6*6*256]) fc6 = fc(flattened, 6*6*256, 4096, name='fc6') dropout6 = dropout(fc6, self.KEEP_PROB) # 7th Layer: FC (w ReLu) -> Dropout fc7 = fc(dropout6, 4096, 4096, name='fc7') dropout7 = dropout(fc7, self.KEEP_PROB) # 8th Layer: FC and return unscaled activations self.fc8 = fc(dropout7, 4096, self.NUM_CLASSES, relu=False, name='fc8') def load_initial_weights(self, session): """Load weights from file into network. As the weights from http://www.cs.toronto.edu/~guerzhoy/tf_alexnet/ come as a dict of lists (e.g. weights['conv1'] is a list) and not as dict of dicts (e.g. weights['conv1'] is a dict with keys 'weights' & 'biases') we need a special load function """ # Load the weights into memory weights_dict = np.load(self.WEIGHTS_PATH, encoding='bytes').item() # Loop over all layer names stored in the weights dict for op_name in weights_dict: # Check if layer should be trained from scratch if op_name not in self.SKIP_LAYER: with tf.variable_scope(op_name, reuse=True): # Assign weights/biases to their corresponding tf variable for data in weights_dict[op_name]: # Biases if len(data.shape) == 1: var = tf.get_variable('biases', trainable=False) session.run(var.assign(data)) # Weights else: var = tf.get_variable('weights', trainable=False) session.run(var.assign(data)) def conv(x, filter_height, filter_width, num_filters, stride_y, stride_x, name, padding='SAME', groups=1): """Create a convolution layer. Adapted from: https://github.com/ethereon/caffe-tensorflow """ # Get number of input channels input_channels = int(x.get_shape()[-1]) # Create lambda function for the convolution convolve = lambda i, k: tf.nn.conv2d(i, k, strides=[1, stride_y, stride_x, 1], padding=padding) with tf.variable_scope(name) as scope: # Create tf variables for the weights and biases of the conv layer weights = tf.get_variable('weights', shape=[filter_height, filter_width, input_channels/groups, num_filters]) biases = tf.get_variable('biases', shape=[num_filters]) if groups == 1: conv = convolve(x, weights) # In the cases of multiple groups, split inputs & weights and else: # Split input and weights and convolve them separately input_groups = tf.split(axis=3, num_or_size_splits=groups, value=x) weight_groups = tf.split(axis=3, num_or_size_splits=groups, value=weights) output_groups = [convolve(i, k) for i, k in zip(input_groups, weight_groups)] # Concat the convolved output together again conv = tf.concat(axis=3, values=output_groups) # Add biases bias = tf.reshape(tf.nn.bias_add(conv, biases), tf.shape(conv)) # Apply relu function relu = tf.nn.relu(bias, name=scope.name) return relu def fc(x, num_in, num_out, name, relu=True): """Create a fully connected layer.""" with tf.variable_scope(name) as scope: # Create tf variables for the weights and biases weights = tf.get_variable('weights', shape=[num_in, num_out], trainable=True) biases = tf.get_variable('biases', [num_out], trainable=True) # Matrix multiply weights and inputs and add bias act = tf.nn.xw_plus_b(x, weights, biases, name=scope.name) if relu: # Apply ReLu non linearity relu = tf.nn.relu(act) return relu else: return act def max_pool(x, filter_height, filter_width, stride_y, stride_x, name, padding='SAME'): """Create a max pooling layer.""" return tf.nn.max_pool(x, ksize=[1, filter_height, filter_width, 1], strides=[1, stride_y, stride_x, 1], padding=padding, name=name) def lrn(x, radius, alpha, beta, name, bias=1.0): """Create a local response normalization layer.""" return tf.nn.local_response_normalization(x, depth_radius=radius, alpha=alpha, beta=beta, bias=bias, name=name) def dropout(x, keep_prob): """Create a dropout layer.""" return tf.nn.dropout(x, keep_prob)
datagenerator.py # Created on Wed May 31 14:48:46 2017 # # @author: Frederik Kratzert """Containes a helper class for image input pipelines in tensorflow.""" import tensorflow as tf import numpy as np from tensorflow.contrib.data import Dataset from tensorflow.python.framework import dtypes from tensorflow.python.framework.ops import convert_to_tensor IMAGENET_MEAN = tf.constant([123.68, 116.779, 103.939], dtype=tf.float32) class ImageDataGenerator(object): """Wrapper class around the new Tensorflows dataset pipeline. Requires Tensorflow >= version 1.12rc0 """ def __init__(self, filed, txt_file, mode, batch_size, num_classes, shuffle=True, buffer_size=1000): """Create a new ImageDataGenerator. Recieves a path string to a text file, which consists of many lines, where each line has first a path string to an image and seperated by a space an integer, referring to the class number. Using this data, this class will create TensrFlow datasets, that can be used to train e.g. a convolutional neural network. Args: txt_file: Path to the text file. mode: Either 'training' or 'validation'. Depending on this value, different parsing functions will be used. batch_size: Number of images per batch. num_classes: Number of classes in the dataset. shuffle: Wether or not to shuffle the data in the dataset and the initial file list. buffer_size: Number of images used as buffer for TensorFlows shuffling of the dataset. Raises: ValueError: If an invalid mode is passed. """ self.filed = filed self.txt_file = txt_file self.num_classes = num_classes # retrieve the data from the text file self._read_txt_file() # number of samples in the dataset self.data_size = len(self.labels) # initial shuffling of the file and label lists (together!) if shuffle: self._shuffle_lists() else: self._Noshuffle_lists() # convert lists to TF tensor self.img_paths = convert_to_tensor(self.img_paths, dtype=dtypes.string) self.labels = convert_to_tensor(self.labels, dtype=dtypes.int32) # create dataset data = Dataset.from_tensor_slices((self.img_paths, self.labels)) # distinguish between train/infer. when calling the parsing functions if mode == 'training': data = data.map(self._parse_function_train, num_threads=8, output_buffer_size=100*batch_size) elif mode == 'inference': data = data.map(self._parse_function_inference, num_threads=8, output_buffer_size=100*batch_size) else: raise ValueError("Invalid mode '%s'." % (mode)) # shuffle the first `buffer_size` elements of the dataset if shuffle: data = data.shuffle(buffer_size=buffer_size) # create a new dataset with batches of images data = data.batch(batch_size) self.data = data def _read_txt_file(self): """Read the content of the text file and store it into lists.""" self.img_paths = [] self.labels = [] with open(self.txt_file, 'r') as f: lines = f.readlines() for line in lines: items = line.split(' ') self.img_paths.append(items[0]) self.labels.append(int(items[1])) def _shuffle_lists(self): """Conjoined shuffling of the list of paths and labels.""" path = self.img_paths labels = self.labels permutation = np.random.permutation(self.data_size) self.img_paths = [] self.labels = [] for i in permutation: self.img_paths.append(self.filed+path[i]) self.labels.append(labels[i]) def _Noshuffle_lists(self): """Conjoined shuffling of the list of paths and labels.""" path = self.img_paths labels = self.labels self.img_paths = [] self.labels = [] for i in self.data_size: self.img_paths.append(self.filed+path[i]) self.labels.append(labels[i]) def _parse_function_train(self, filename, label): """Input parser for samples of the training set.""" # convert label number into one-hot-encoding one_hot = tf.one_hot(label, self.num_classes) # load and preprocess the image img_string = tf.read_file(filename) img_decoded = tf.image.decode_png(img_string, channels=3) img_resized = tf.image.resize_images(img_decoded, [227, 227]) """ Dataaugmentation comes here. """ img_centered = tf.subtract(img_resized, IMAGENET_MEAN) # RGB -> BGR img_bgr = img_centered[:, :, ::-1] return img_bgr, one_hot def _parse_function_inference(self, filename, label): """Input parser for samples of the validation/test set.""" # convert label number into one-hot-encoding one_hot = tf.one_hot(label, self.num_classes) # load and preprocess the image img_string = tf.read_file(filename) img_decoded = tf.image.decode_png(img_string, channels=3) img_resized = tf.image.resize_images(img_decoded, [227, 227]) img_centered = tf.subtract(img_resized, IMAGENET_MEAN) # RGB -> BGR img_bgr = img_centered[:, :, ::-1] return img_bgr, one_hot
train.py """Script to finetune AlexNet using Tensorflow. With this script you can finetune AlexNet as provided in the alexnet.py class on any given dataset. Specify the configuration settings at the beginning according to your problem. This script was written for TensorFlow >= version 1.2rc0 and comes with a blog post, which you can find here: https://kratzert.github.io/2017/02/24/finetuning-alexnet-with-tensorflow.html Author: Frederik Kratzert contact: f.kratzert(at)gmail.com """ import os import numpy as np import tensorflow as tf from alexnet import AlexNet from datagenerator import ImageDataGenerator from datetime import datetime from tensorflow.contrib.data import Iterator """ Configuration Part. """ filed='D:/tensorflow/bvlc_alexnet/finetune_alexnet_with_tensorflow-master' # Path to the textfiles for the trainings and validation set train_file = 'D:/tensorflow/bvlc_alexnet/finetune_alexnet_with_tensorflow-master\train.txt' val_file = 'D:/tensorflow\bvlc_alexnet/finetune_alexnet_with_tensorflow-master\val.txt' # Learning params learning_rate = 0.01 num_epochs = 10 batch_size = 20 # Network params dropout_rate = 0.5 num_classes = 2 train_layers = ['fc8', 'fc7', 'fc6'] # How often we want to write the tf.summary data to disk display_step = 20 # Path for tf.summary.FileWriter and to store model checkpoints filewriter_path = 'D:/tensorflow/bvlc_alexnet/finetune_alexnet_with_tensorflow-master\tensorboard' checkpoint_path = 'D:/tensorflow/bvlc_alexnet/finetune_alexnet_with_tensorflow-master\checkpoints' """ Main Part of the finetuning Script. """ # Create parent path if it doesn't exist #if not os.path.isdir(checkpoint_path): #os.mkdir(checkpoint_path) # Place data loading and preprocessing on the cpu with tf.device('/cpu:0'): tr_data = ImageDataGenerator(filed, train_file, mode='training', batch_size=batch_size, num_classes=num_classes, shuffle=True) val_data = ImageDataGenerator(filed, val_file, mode='inference', batch_size=batch_size, num_classes=num_classes, shuffle=False) # create an reinitializable iterator given the dataset structure iterator = Iterator.from_structure(tr_data.data.output_types, tr_data.data.output_shapes) next_batch = iterator.get_next() # Ops for initializing the two different iterators training_init_op = iterator.make_initializer(tr_data.data) validation_init_op = iterator.make_initializer(val_data.data) # TF placeholder for graph input and output x = tf.placeholder(tf.float32, [batch_size, 227, 227, 3]) y = tf.placeholder(tf.float32, [batch_size, num_classes]) keep_prob = tf.placeholder(tf.float32) # Initialize model model = AlexNet(x, keep_prob, num_classes, train_layers) """ AlexNet Args: x: Placeholder for the input tensor. keep_prob: Dropout probability. num_classes: Number of classes in the dataset. skip_layer: List of names of the layer, that get trained from scratch weights_path: Complete path to the pretrained weight file, if it isn't in the same folder as this code """ # Link variable to model output score = model.fc8 # List of trainable variables of the layers we want to train var_list = [v for v in tf.trainable_variables() if v.name.split('/')[0] in train_layers] # Op for calculating the loss with tf.name_scope("cross_ent"): loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=score, labels=y)) # Train op with tf.name_scope("train"): # Get gradients of all trainable variables gradients = tf.gradients(loss, var_list) gradients = list(zip(gradients, var_list)) # Create optimizer and apply gradient descent to the trainable variables optimizer = tf.train.GradientDescentOptimizer(learning_rate) train_op = optimizer.apply_gradients(grads_and_vars=gradients) # Add gradients to summary for gradient, var in gradients: tf.summary.histogram(var.name + '/gradient', gradient) # Add the variables we train to the summary for var in var_list: tf.summary.histogram(var.name, var) # Add the loss to summary tf.summary.scalar('cross_entropy', loss) # Evaluation op: Accuracy of the model with tf.name_scope("accuracy"): correct_pred = tf.equal(tf.argmax(score, 1), tf.argmax(y, 1)) accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) # Add the accuracy to the summary tf.summary.scalar('accuracy', accuracy) # Merge all summaries together merged_summary = tf.summary.merge_all() # Initialize the FileWriter writer = tf.summary.FileWriter(filewriter_path) # Initialize an saver for store model checkpoints saver = tf.train.Saver() # Get the number of training/validation steps per epoch train_batches_per_epoch = int(np.floor(tr_data.data_size/batch_size)) val_batches_per_epoch = int(np.floor(val_data.data_size / batch_size)) # Start Tensorflow session with tf.Session() as sess: # Initialize all variables sess.run(tf.global_variables_initializer()) # Add the model graph to TensorBoard writer.add_graph(sess.graph) # Load the pretrained weights into the non-trainable layer model.load_initial_weights(sess) print("{} Start training...".format(datetime.now())) print("{} Open Tensorboard at --logdir {}".format(datetime.now(), filewriter_path)) # Loop over number of epochs for epoch in range(num_epochs): print("{} Epoch number: {}".format(datetime.now(), epoch+1)) # Initialize iterator with the training dataset sess.run(training_init_op) for step in range(train_batches_per_epoch): # get next batch of data img_batch, label_batch = sess.run(next_batch) # And run the training op sess.run(train_op, feed_dict={x: img_batch, y: label_batch, keep_prob: dropout_rate}) # Generate summary with the current batch of data and write to file if step % display_step == 0: s = sess.run(merged_summary, feed_dict={x: img_batch, y: label_batch, keep_prob: 1.}) writer.add_summary(s, epoch*train_batches_per_epoch + step) # Validate the model on the entire validation set print("{} Start validation".format(datetime.now())) sess.run(validation_init_op) test_acc = 0. test_count = 0 for _ in range(val_batches_per_epoch): img_batch, label_batch = sess.run(next_batch) acc = sess.run(accuracy, feed_dict={x: img_batch, y: label_batch, keep_prob: 1.}) test_acc += acc test_count += 1 test_acc /= test_count print("{} Validation Accuracy = {:.4f}".format(datetime.now(), test_acc)) print("{} Saving checkpoint of model...".format(datetime.now())) # save checkpoint of the model checkpoint_name = os.path.join(checkpoint_path, 'model_epoch'+str(epoch+1)+'.ckpt') save_path = saver.save(sess, checkpoint_name) print("{} Model checkpoint saved at {}".format(datetime.now(), checkpoint_name))
test.py
# -*- coding: utf-8 -*-
"""
Created on Sat Aug 11 22:04:36 2018
@author: Administrator
"""
import tensorflow as tf
from alexnet import AlexNet # import訓練好的網路
import matplotlib.pyplot as plt
class_name = ['cat', 'dog'] # 自定義貓狗標籤
def test_image(path_image, num_class, weights_path='Default'):
# 把新圖片進行轉換
img_string = tf.read_file(path_image)
img_decoded = tf.image.decode_png(img_string, channels=3)
# img_decoded = tf.image.decode_jpeg(img_string, channels=3)
img_resized = tf.image.resize_images(img_decoded, [227, 227])
img_resized = tf.reshape(img_resized, shape=[1, 227, 227, 3])
# 圖片通過AlexNet
model = AlexNet(img_resized, 0.5, 2, skip_layer='', weights_path=weights_path)
score = tf.nn.softmax(model.fc8)
max = tf.arg_max(score, 1)
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver.restore(sess, "D:/tensorflow/bvlc_alexnet/finetune_alexnet_with_tensorflow-master/checkpoints/model_epoch10.ckpt") # 匯入訓練好的引數
# score = model.fc8
print(sess.run(model.fc8))
prob = sess.run(max)[0]
# 在matplotlib中觀測分類結果
plt.imshow(img_decoded.eval())
plt.title("Class:" + class_name[prob])
plt.show()
test_image('./test/20.png', num_class=2) # 輸入一張新圖片
If you do not want to touch the code any further than necessary you have to provide two `.txt` files to the script (`train.txt` and `val.txt`). Each of them list the complete path to your train/val images together with the class number in the following structure.
```
Example train.txt:
/path/to/train/image1.png 0
/path/to/train/image2.png 1
/path/to/train/image3.png 2
/path/to/train/image4.png 0