1. 程式人生 > >tensorflow(六)訓練分類自己的圖片(CNN超詳細入門版)

tensorflow(六)訓練分類自己的圖片(CNN超詳細入門版)

之前一直用caffe做影象方面的東西,由於tensorflow環境配置簡單,綜合表現較為出色,因此打算轉戰tensorflow。學習這個框架,最開始還是要跑一跑文件中的mnist小程式(具體請參照tensorflow官方文件)。但是mnist中都是處理好的資料,具體的資料處理過程又沒有講,如果想要處理自己的圖片資料,便有些無從下手,直接看原始碼的話又比較枯燥晦澀。這裡是一份從圖片的預處理到最終測試一張單獨圖片的完整的程式碼,供大家參考。由於一些原因,資料集不能發到網上,大概就是一個具有五種型別圖片的資料集。

這裡我把整個工程檔案放上來:https://pan.baidu.com/s/1SSB8U2-DIqmUsgsI0BRVVw


其中log下是訓練好的模型,可以直接執行程式,如果想要自己訓練模型,可以把log資料夾刪掉,然後講程式碼最後測試圖片的部分註釋掉,再在最後加上一行run_training()呼叫訓練函式即可。

這裡由於只有很少的圖片,很少的訓練次數,因此測試結果不是很理想,大家可以增加大量的資料和增加訓練次數來改善測試結果。

一:資料預處理

import os
import numpy as np
from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt
#匯入必要的包
train_dir = 'D:/picture/train/'
#存放用來訓練的圖片的路徑 def get_files(file_dir): A5 = [] label_A5 = [] A6 = [] label_A6 = [] SEG = [] label_SEG = [] SUM = [] label_SUM = [] LTAX1 = [] label_LTAX1 = [] #定義存放各類別資料和對應標籤的列表,列表名對應你所需要分類的列別名 #A5,A6等是我的資料集中要分類圖片的名字 for file in os.listdir(file_dir): name = file.split(sep='.'
) if name[0]=='A5': A5.append(file_dir+file) label_A5.append(0) elif name[0] == 'A6': A6.append(file_dir+file) label_A6.append(1) elif name[0]=='LTAX1': LTAX1.append(file_dir+file) label_LTAX1.append(2) elif name[0] == 'SEG': SEG.append(file_dir+file) label_SEG.append(3) else: SUM.append(file_dir+file) label_SUM.append(4) #根據圖片的名稱,對圖片進行提取,這裡用.來進行劃分 ###這裡一定要注意,如果是多分類問題的話,一定要將分類的標籤從0開始。這裡是五類,標籤為0,1,2,3,4。我之前以為這個標籤應該是隨便設定的,結果就出現了Target[0] out of range的錯誤。 print('There are %d A5\nThere are %d A6\nThere are %d LTAX1\nThere are %d SEG\nThere are %d SUM' \ %(len(A5),len(A6),len(LTAX1),len(SEG),len(SUM))) #打印出提取圖片的情況,檢測是否正確提取 image_list = np.hstack((A5,A6,LTAX1,SEG,SUM)) label_list = np.hstack((label_A5,label_A6,label_LTAX1,label_SEG,label_SUM)) #用來水平合併陣列 temp = np.array([image_list,label_list]) temp = temp.transpose() np.random.shuffle(temp) image_list = list(temp[:,0]) label_list = list(temp[:,1]) label_list = [int(i) for i in label_list] return image_list,label_list #返回兩個list
def get_batch(image,label,image_W,image_H,batch_size,capacity):
    image = tf.cast(image,tf.string)
    label = tf.cast(label,tf.int32)
    #tf.cast()用來做型別轉換

    input_queue = tf.train.slice_input_producer([image,label])
    #加入佇列

    label = input_queue[1]
    image_contents = tf.read_file(input_queue[0])
    image = tf.image.decode_jpeg(image_contents,channels=3)
    #jpeg或者jpg格式都用decode_jpeg函式,其他格式可以去檢視官方文件

    image = tf.image.resize_image_with_crop_or_pad(image,image_W,image_H)
    #resize

    image = tf.image.per_image_standardization(image)
    #對resize後的圖片進行標準化處理

    image_batch,label_batch = tf.train.batch([image,label],batch_size = batch_size,num_threads=16,capacity = capacity)

    label_batch = tf.reshape(label_batch,[batch_size])
    return image_batch,label_batch
    #獲取兩個batch,兩個batch即為傳入神經網路的資料

對預處理的資料進行視覺化,檢視預處理的效果

BATCH_SIZE = 5
CAPACITY = 64
IMG_W = 208
IMG_H = 208

train_dir = 'D:/picture/train/'

image_list,label_list = get_files(train_dir)
image_batch,label_batch = get_batch(image_list,label_list,IMG_W,IMG_H,BATCH_SIZE,CAPACITY)

with tf.Session() as sess:
    i=0
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord = coord)
    try:
        while not coord.should_stop() and i<2:
        #提取出兩個batch的圖片並可視化。
            img,label = sess.run([image_batch,label_batch])

            for j in np.arange(BATCH_SIZE):
                print('label: %d'%label[j])
                plt.imshow(img[j,:,:,:])
                plt.show()
            i+=1
    except tf.errors.OutOfRangeError:
        print('done!')
    finally:
        coord.request_stop()
    coord.join(threads)

**

二、設計神經網路模型

在設計神經網路的過程中,一定要對每一層的資料流動比較瞭解,弄清楚圖片size的變化,不然會報錯。
在進行測試網路模型的過程中,如果用的是IPython的話,要經常重新啟動kernel,不然會出現conv1等層scope已經定義的錯誤。剛開始的時候這個問題困擾了很久,以為是定義變數作用域的過程中,語法使用錯誤,後來才知道是需要重新啟動kernel。具體其中的原因我也不太清楚。
**

def inference(images, batch_size, n_classes):
    # conv1, shape = [kernel_size, kernel_size, channels, kernel_numbers]
    with tf.variable_scope("conv1") as scope:
        weights = tf.get_variable("weights",
                                  shape=[3, 3, 3, 16],
                                  dtype=tf.float32,
                                  initializer=tf.truncated_normal_initializer(stddev=0.1, dtype=tf.float32))
        biases = tf.get_variable("biases",
                                 shape=[16],
                                 dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.1))
        conv = tf.nn.conv2d(images, weights, strides=[1, 1, 1, 1], padding="SAME")
        pre_activation = tf.nn.bias_add(conv, biases)
        conv1 = tf.nn.relu(pre_activation, name="conv1")

    # pool1 && norm1
    with tf.variable_scope("pooling1_lrn") as scope:
        pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1],
                               padding="SAME", name="pooling1")
        norm1 = tf.nn.lrn(pool1, depth_radius=4, bias=1.0, alpha=0.001/9.0,
                          beta=0.75, name='norm1')

    # conv2
    with tf.variable_scope("conv2") as scope:
        weights = tf.get_variable("weights",
                                  shape=[3, 3, 16, 16],
                                  dtype=tf.float32,
                                  initializer=tf.truncated_normal_initializer(stddev=0.1, dtype=tf.float32))
        biases = tf.get_variable("biases",
                                 shape=[16],
                                 dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.1))
        conv = tf.nn.conv2d(norm1, weights, strides=[1, 1, 1, 1], padding="SAME")
        pre_activation = tf.nn.bias_add(conv, biases)
        conv2 = tf.nn.relu(pre_activation, name="conv2")

    # pool2 && norm2
    with tf.variable_scope("pooling2_lrn") as scope:
        pool2 = tf.nn.max_pool(conv2, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1],
                               padding="SAME", name="pooling2")
        norm2 = tf.nn.lrn(pool2, depth_radius=4, bias=1.0, alpha=0.001/9.0,
                          beta=0.75, name='norm2')

    # full-connect1
    with tf.variable_scope("fc1") as scope:
        reshape = tf.reshape(norm2, shape=[batch_size, -1])
        dim = reshape.get_shape()[1].value
        weights = tf.get_variable("weights",
                                  shape=[dim, 128],
                                  dtype=tf.float32,
                                  initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
        biases = tf.get_variable("biases",
                                 shape=[128],
                                 dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.1))
        fc1 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name="fc1")

    # full_connect2
    with tf.variable_scope("fc2") as scope:
        weights = tf.get_variable("weights",
                                  shape=[128, 128],
                                  dtype=tf.float32,
                                  initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
        biases = tf.get_variable("biases",
                                 shape=[128],
                                 dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.1))
        fc2 = tf.nn.relu(tf.matmul(fc1, weights) + biases, name="fc2")

    # softmax
    with tf.variable_scope("softmax_linear") as scope:
        weights = tf.get_variable("weights",
                                  shape=[128, n_classes],
                                  dtype=tf.float32,
                                  initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
        biases = tf.get_variable("biases",
                                 shape=[n_classes],
                                 dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.1))
        softmax_linear = tf.add(tf.matmul(fc2, weights), biases, name="softmax_linear")
    return softmax_linear
def losses(logits, labels):
    with tf.variable_scope("loss") as scope:
        cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
                                                                       labels=labels, name="xentropy_per_example")
        loss = tf.reduce_mean(cross_entropy, name="loss")
        tf.summary.scalar(scope.name + "loss", loss)
    return loss
def trainning(loss, learning_rate):
    with tf.name_scope("optimizer"):
        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
        global_step = tf.Variable(0, name="global_step", trainable=False)
        train_op = optimizer.minimize(loss, global_step=global_step)
    return train_op
def evaluation(logits, labels):
    with tf.variable_scope("accuracy") as scope:
        correct = tf.nn.in_top_k(logits, labels, 1)
        correct = tf.cast(correct, tf.float16)
        accuracy = tf.reduce_mean(correct)
        tf.summary.scalar(scope.name + "accuracy", accuracy)
    return accuracy
N_CLASSES = 5
#要分類的類別數,這裡是5分類
IMG_W = 208
IMG_H = 208
#設定圖片的size
BATCH_SIZE = 8
CAPACITY = 64
MAX_STEP = 1000
#迭代一千次,如果機器配置好的話,建議至少10000次以上
learning_rate = 0.0001
#學習率

**

三、訓練

**

def run_training():
    train_dir = 'D:/picture/train/'
    logs_train_dir = 'D:/picture/log/'
    #存放一些模型檔案的目錄
    train,train_label = get_files(train_dir)
    train_batch,train_label_batch = get_batch(train,train_label,
                                                         IMG_W,
                                                         IMG_H,
                                                         BATCH_SIZE,
                                                         CAPACITY)
    train_logits =inference(train_batch,BATCH_SIZE,N_CLASSES)
    train_loss = losses(train_logits,train_label_batch)
    train_op = trainning(train_loss,learning_rate)
    train_acc = evaluation(train_logits,train_label_batch)

    summary_op = tf.summary.merge_all()
    sess = tf.Session()
    train_writer = tf.summary.FileWriter(logs_train_dir,sess.graph)
    saver = tf.train.Saver()

    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess = sess,coord = coord)

    try:
        for step in np.arange(MAX_STEP):
            if coord.should_stop():
                break
            _,tra_loss,tra_acc = sess.run([train_op,train_loss,train_acc])
            if step %  50 == 0:
                print('Step %d,train loss = %.2f,train occuracy = %.2f%%'%(step,tra_loss,tra_acc))
                #每迭代50次,打印出一次結果
                summary_str = sess.run(summary_op)
                train_writer.add_summary(summary_str,step)

            if step % 200 ==0 or (step +1) == MAX_STEP:
                checkpoint_path = os.path.join(logs_train_dir,'model.ckpt')
                saver.save(sess,checkpoint_path,global_step = step)
                #每迭代200次,利用saver.save()儲存一次模型檔案,以便測試的時候使用

    except tf.errors.OutOfRangeError:
        print('Done training epoch limit reached')
    finally:
        coord.request_stop()

    coord.join(threads)
    sess.close()

**

四、測試一張圖片

**

def get_one_image(img_dir):
     image = Image.open(img_dir)
     #Image.open()
     #好像一次只能開啟一張圖片,不能一次開啟一個資料夾,這裡大家可以去搜索一下
     plt.imshow(image)
     image = image.resize([208, 208])
     image_arr = np.array(image)
     return image_arr
def test(test_file):
    log_dir = 'D:/picture/log/'
    image_arr = get_one_image(test_file)

    with tf.Graph().as_default():
        image = tf.cast(image_arr, tf.float32)
        image = tf.image.per_image_standardization(image)
        image = tf.reshape(image, [1,208, 208, 3])
        print(image.shape)
        p = inference(image,1,5)
        logits = tf.nn.softmax(p)
        x = tf.placeholder(tf.float32,shape = [208,208,3])
        saver = tf.train.Saver()
        with tf.Session() as sess:
            ckpt = tf.train.get_checkpoint_state(log_dir)
            if ckpt and ckpt.model_checkpoint_path:
                global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
                saver.restore(sess, ckpt.model_checkpoint_path)
                #呼叫saver.restore()函式,載入訓練好的網路模型

                print('Loading success')
            else:
                print('No checkpoint')
            prediction = sess.run(logits, feed_dict={x: image_arr})
            max_index = np.argmax(prediction) 
            print('預測的標籤為:')
            print(max_index)
            print('預測的結果為:')
            print(prediction)

            if max_index==0:
                print('This is a LTAX with possibility %.6f' %prediction[:, 0])
            elif max_index == 1:
                print('This is a SUM with possibility %.6f' %prediction[:, 1])
            elif max_index == 2:
                print('This is a A5 with possibility %.6f' %prediction[:, 2])
            elif max_index == 3:
                print('This is a A6 with possibility %.6f' %prediction[:, 3])
            else :
                print('This is a SEG with possibility %.6f' %prediction[:, 4])

呼叫test函式測試圖片的預測結果。

test('D:\\picture\\test\\A51.jpeg')
test('D:\\picture\\test\\A52.jpeg')
test('D:\\picture\\test\\A61.jpeg')
test('D:\\picture\\test\\A62.jpeg')
test('D:\\picture\\test\\LTAX1.jpeg')
test('D:\\picture\\test\\LTAX2.jpeg')
test('D:\\picture\\test\\SEG1.jpg')
test('D:\\picture\\test\\SEG2.jpg')
test('D:\\picture\\test\\SUM1.jpeg')
test('D:\\picture\\test\\SUM2.jpeg')