1. 程式人生 > >基於TensorFlow影象分類實現

基於TensorFlow影象分類實現

train.py


訓練集圖片檔名稱中存在型別即可。根據需分類型別修改# 訓練集生成# 測試集生成程式碼塊中的讀取方式

import os
import numpy as np
import tensorflow as tf
from network import Network
from datagenerator import ImageDataGenerator
from datetime import datetime
import glob
from tensorflow.contrib.data import Iterator

learning_rate = 1e-4
num_epochs = 1  # 迭代次數
batch_size = 50
dropout_rate = 0.5
num_classes = 5  # 類別數量
display_step = 5

filewriter_path = "tmp/tensorboard_test"  # tensorboard檔案路徑
checkpoint_path = "tmp/checkpoints_test"  # 模型和引數路徑

if not os.path.isdir(checkpoint_path):
    os.mkdir(checkpoint_path)

train_image_path = 'train/'  # 訓練集資料路徑
test_image_path = 'test/'  # 測試集資料路徑

label_path = []
test_label = []

# 訓練集生成
image_path = np.array(glob.glob(train_image_path + '*.jpg')).tolist()
for i in range(len(image_path)):
    if 'Bus' in image_path[i]:
        label_path.append(0)
    elif 'Microbus' in image_path[i]:
        label_path.append(1)
    elif 'Sedan' in image_path[i]:
        label_path.append(2)
    elif 'SUV' in image_path[i]:
        label_path.append(3)
    elif 'Truck' in image_path[i]:
        label_path.append(4)


# 測試集生成
test_image = np.array(glob.glob(test_image_path + '*.jpg')).tolist()
for i in range(len(test_image)):
    if 'Bus' in image_path[i]:
        test_label.append(0)
    elif 'Microbus' in image_path[i]:
        test_label.append(1)
    elif 'Sedan' in image_path[i]:
        test_label.append(2)
    elif 'SUV' in image_path[i]:
        test_label.append(3)
    elif 'Truck' in image_path[i]:
        test_label.append(4)

# 呼叫圖片生成器,把訓練集圖片轉換成三維陣列
tr_data = ImageDataGenerator(
    images=image_path,
    labels=label_path,
    batch_size=batch_size,
    num_classes=num_classes)

# 呼叫圖片生成器,把測試集圖片轉換成三維陣列
test_data = ImageDataGenerator(
    images=test_image,
    labels=test_label,
    batch_size=batch_size,
    num_classes=num_classes,
    shuffle=False)

with tf.name_scope('input'):
    # 定義迭代器
    iterator = Iterator.from_structure(tr_data.data.output_types,
                                   tr_data.data.output_shapes)

    training_initalize=iterator.make_initializer(tr_data.data)
    testing_initalize=iterator.make_initializer(test_data.data)

    # 定義每次迭代的資料
    next_batch = iterator.get_next()

x = tf.placeholder(tf.float32, [batch_size, 224, 224, 3])
y = tf.placeholder(tf.float32, [batch_size, num_classes])
keep_prob = tf.placeholder(tf.float32)

# 圖片資料通過網路處理
model = Network(x, keep_prob, num_classes)

# 執行整個網路圖
score = model.fc8

with tf.name_scope('loss'):
    # 損失函式
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=score, labels=y))
    tf.summary.scalar('loss', loss)


with tf.name_scope('optimizer'):
    # 優化器
    train_op = tf.train.AdamOptimizer(learning_rate).minimize(loss)


# 定義網路精確度
with tf.name_scope("accuracy"):
    correct_pred = tf.equal(tf.argmax(score, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
    tf.summary.scalar('accuracy', accuracy)

# 把精確度加入到Tensorboard

merged_summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(filewriter_path)
saver = tf.train.Saver()

# 定義一代的迭代次數
train_batches_per_epoch = int(np.floor(tr_data.data_size / batch_size))
test_batches_per_epoch = int(np.floor(test_data.data_size / batch_size))

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    #saver = tf.train.Saver()
    #saver.restore(sess, "./tmp/checkpoints_t18/model_epoch10.ckpt")

    # 把模型圖加入Tensorboard
    writer.add_graph(sess.graph)

    print("{} 訓練開始".format(datetime.now()))
    print("{} Tensorboard at --logdir {}".format(datetime.now(), filewriter_path))

    # 迭代所有次數
    for epoch in range(num_epochs):
        sess.run(training_initalize)
        print("{} 迭代{}次開始".format(datetime.now(), epoch + 1))

        #開始訓練每一代
        for step in range(train_batches_per_epoch):
            img_batch, label_batch = sess.run(next_batch)
            sess.run(train_op, feed_dict={x: img_batch, y: label_batch, keep_prob: dropout_rate})
            if step % display_step == 0:
                s = sess.run(merged_summary, feed_dict={x: img_batch,
                                                        y: label_batch,
                                                        keep_prob: 1.})

                writer.add_summary(s, epoch * train_batches_per_epoch + step)

        # 測試模型精確度
        print("{} 測試精度".format(datetime.now()))
        sess.run(testing_initalize)
        test_acc = 0.
        test_count = 0

        for _ in range(test_batches_per_epoch):
            img_batch, label_batch = sess.run(next_batch)
            acc = sess.run(accuracy, feed_dict={x: img_batch,
                                                y: label_batch,
                                                keep_prob: 1.0})
            test_acc += acc
            test_count += 1

        test_acc /= test_count

        print("{} 精度 = {:.4f}".format(datetime.now(), test_acc))

        # 把訓練好的模型儲存起來
        print("{} 儲存模型".format(datetime.now()))

        checkpoint_name = os.path.join(checkpoint_path, 'model_epoch' + str(epoch + 1) + '.ckpt')
        save_path = saver.save(sess, checkpoint_name)

        print("{} 迭代{}次結束".format(datetime.now(), epoch + 1), save_path)

 

network.py

import tensorflow as tf
import numpy as np


class Network(object):

    def __init__(self, x, keep_prob, num_classes):

        self.X = x
        self.NUM_CLASSES = num_classes
        self.KEEP_PROB = keep_prob

        self.create()

    def create(self):
        #卷積層1
        conv1_1 = conv(self.X, 9, 9, 128, 4, 4, padding='VALID', name='conv1_1')
        pool1 = max_pool(conv1_1, 2, 2, 2, 2, padding='SAME', name='pool1')
        # 卷積層2
        conv2_1 = conv(pool1, 4, 4, 256, 1, 1, padding='VALID', name='conv2_1')
        pool2 = max_pool(conv2_1, 2, 2, 2, 2, padding='SAME', name='pool2')
        # 卷積層3
        conv3_1 = conv(pool2, 3, 3, 512, 1, 1, padding='SAME', name='conv3_1')
        conv3_2 = conv(conv3_1, 3, 3, 512, 1, 1, padding='SAME', name='conv3_2')
        pool3 = max_pool(conv3_2, 2, 2, 2, 2, padding='SAME', name='pool3')
        # 卷積層4
        conv4_1 = conv(pool3, 3, 3, 256, 1, 1, padding='SAME', name='conv4_1')
        #pool4 = max_pool(conv4_2, 2, 2, 2, 2, padding='SAME', name='pool4')

        flattened = tf.reshape(conv4_1, [-1, 6*6*256])
        # 全連結6
        fc6 = fc(flattened, 6*6*256, 4096, name='fc6')
        dropout6 = dropout(fc6, self.KEEP_PROB)
        # 全連結7
        fc7 = fc(dropout6, 4096, 4096, name='fc7')
        dropout7 = dropout(fc7, self.KEEP_PROB)
        # 全連結8
        self.fc8 = fc(dropout7, 4096, self.NUM_CLASSES, name='fc8', relu=False)


def conv(x, filter_height, filter_width, num_filters, stride_y, stride_x, name,
         padding='SAME'):
    input_channels = int(x.get_shape()[-1])
    convolve = lambda i, k: tf.nn.conv2d(i, k,
                                         strides=[1, stride_y, stride_x, 1],
                                         padding=padding)

    with tf.variable_scope(name) as scope:
        weights = tf.get_variable('weights', shape=[filter_height,
                                                    filter_width,
                                                    input_channels,
                                                    num_filters])
        biases = tf.get_variable('biases', shape=[num_filters])

        conv = convolve(x, weights)
        bias = tf.reshape(tf.nn.bias_add(conv, biases), tf.shape(conv))
        relu = tf.nn.relu(bias, name=scope.name)

        return relu


def fc(x, num_in, num_out, name, relu=True):

    with tf.variable_scope(name) as scope:
        weights = tf.get_variable('weights', shape=[num_in, num_out],
                                  trainable=True)
        biases = tf.get_variable('biases', [num_out], trainable=True)
        act = tf.nn.xw_plus_b(x, weights, biases, name=scope.name)

        if relu:
            relu = tf.nn.relu(act)
            return relu
        else:
            return act


def max_pool(x, filter_height, filter_width, stride_y, stride_x, name,
             padding='SAME'):
    return tf.nn.max_pool(x, ksize=[1, filter_height, filter_width, 1],
                          strides=[1, stride_y, stride_x, 1],
                          padding=padding, name=name)


def lrn(x, radius, alpha, beta, name, bias=1.0):
    return tf.nn.local_response_normalization(x, depth_radius=radius,
                                              alpha=alpha, beta=beta,
                                              bias=bias, name=name)


def dropout(x, keep_prob):
    return tf.nn.dropout(x, keep_prob)

datagenerator.py

import tensorflow as tf
import numpy as np

from tensorflow.python.framework import dtypes
from tensorflow.python.framework.ops import convert_to_tensor
from tensorflow.contrib.data import Dataset

VGG_MEAN = tf.constant([123.68, 116.779, 103.939], dtype=tf.float32)


# 把圖片資料轉化為三維矩陣
class ImageDataGenerator(object):
    def __init__(self, images, labels, batch_size, num_classes, shuffle=True):

        self.img_paths = images
        self.labels = labels
        self.num_classes = num_classes
        self.data_size = len(self.labels)
        self.pointer = 0

        if shuffle:
            self._shuffle_lists()

        self.img_paths = convert_to_tensor(self.img_paths, dtype=dtypes.string)
        self.labels = convert_to_tensor(self.labels, dtype=dtypes.int32)
        data = Dataset.from_tensor_slices((self.img_paths, self.labels))
        data = data.map(self._parse_function_train, num_threads=8,
                        output_buffer_size=100 * batch_size)

        data = data.batch(batch_size)

        self.data = data

    # 打亂圖片順序
    def _shuffle_lists(self):
        path = self.img_paths
        labels = self.labels
        permutation = np.random.permutation(self.data_size)
        self.img_paths = []
        self.labels = []
        for i in permutation:
            self.img_paths.append(path[i])
            self.labels.append(labels[i])

    # 把圖片生成三維陣列,以及把標籤轉化為向量
    def _parse_function_train(self, filename, label):
        one_hot = tf.one_hot(label, self.num_classes)
        img_string = tf.read_file(filename)
        img_decoded = tf.image.decode_png(img_string, channels=3)
        img_resized = tf.image.resize_images(img_decoded, [224, 224])
        img_centered = tf.subtract(img_resized, VGG_MEAN)
        img_bgr = img_centered[:, :, ::-1]
        return img_bgr, one_hot

validate_image.py

import tensorflow as tf
from network import Network
import matplotlib.pyplot as plt
import numpy as np
import glob
from tensorflow.python.framework import dtypes
from tensorflow.python.framework.ops import convert_to_tensor
from tensorflow.contrib.data import Dataset
from tensorflow.contrib.data import Iterator

VGG_MEAN = tf.constant([123.68, 116.779, 103.939], dtype=tf.float32)
class_name = ['Bus', 'Microbus', 'Sedan', 'SUV', 'Truck']
validate_image_path = 'validate/'  # 指定驗證集資料路徑(根據實際情況指定驗證資料集的路徑)


x = tf.placeholder(tf.float32, [1, 224, 224, 3])
model = Network(x, 1, 5)
score = tf.nn.softmax(model.fc8)
max = tf.arg_max(score, 1)

t_num = 0
f_num = 0
image_path = np.array(glob.glob(validate_image_path + '*.jpg')).tolist()
fo = open("false.txt", "w")

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    saver.restore(sess, "./tmp/checkpoints_t16/model_epoch7.ckpt")
    for i in range(len(image_path)):
        img_string = tf.read_file(image_path[i])
        img_decoded = tf.image.decode_png(img_string, channels=3)
        img_resized = tf.image.resize_images(img_decoded, [224, 224])
        img_resized = img_resized[:, :, ::-1]
        img_resized = np.asarray(img_resized.eval(), dtype='uint8')
        img_resized = img_resized.reshape((1, 224, 224, 3))
        prob = sess.run(max, feed_dict={x: img_resized})[0]
        t = -1
        if 'Bus' in image_path[i]:
            t = 0
        elif 'Microbus' in image_path[i]:
            t = 1
        elif 'Sedan' in image_path[i]:
            t = 2
        elif 'SUV' in image_path[i]:
            t = 3
        elif 'Truck' in image_path[i]:
            t = 4
        if t == prob:
            t_num += 1
        else:
            f_num += 1
            fo.write(image_path[i] + '_Prediction:' + str(class_name[prob]) + '\n')

print(t_num/(t_num + f_num))