貓狗識別(tensorflow)kaggle練習題
阿新 • • 發佈:2019-01-30
- 匯入貓狗圖片資料並標籤0和1.將圖片資料處理為 tf 能夠識別的資料格式,並將資料設計批次:
1).get_files() 方法讀取圖片,然後根據圖片名,新增貓狗 label,然後再將 image和label 放到陣列中,打亂順序返回.
2).將第一步處理好的圖片和label陣列轉化為tensorflow能夠識別的格式,然後將圖片裁剪和補充進行標準化處理,分批次返回.
input_data.py
import tensorflow as tf
import os
import numpy as np
#資料集的檔名是以type.num.jpg的方式命名,如cat.0.jpg
def get_files(file_dir):
cats = []
label_cats = []
dogs = []
label_dogs = []
for file in os.listdir(file_dir):
name = file.split(sep='.')
if 'cat' in name[0]:
cats.append(file_dir + file)
label_cats.append(0)
else:
if 'dog' in name[0]:
dogs.append(file_dir + file)
label_dogs.append(1)
image_list = np.hstack((cats, dogs))
label_list = np.hstack((label_cats, label_dogs))
# 把標籤和圖片都放倒一個 temp 中 然後打亂順序,然後取出來
temp = np.array([image_list,label_list])#列表轉化為矩陣
temp = temp.transpose()#transpose的操作物件是矩陣,轉置一下
np.random.shuffle(temp)#打亂順序
image_list = list(temp[:, 0])
label_list = list(temp[:, 1])
label_list = [int(i) for i in label_list]
return image_list, label_list
# train_img = image_list[0:int(len(image_list)*0.7)]
# train_label = label_list[0:int(len(image_list)*0.7)]
# valid_img = image_list[int(len(image_list)*0.7):]
# valid_label = label_list[int(len(image_list)*0.7):]
# return train_img,train_label,valid_img,valid_label
def get_batch(image, label, image_w, image_h, batch_size, capacity):#capacity: 佇列中 最多容納圖片的個數
#轉換成tf能識別的格式
image = tf.cast(image, tf.string)
label = tf.cast(label, tf.int32)
input_queue = tf.train.slice_input_producer([image, label])#tf.train.slice_input_producer是一個tensor生成器,作用是
# 按照設定,每次從一個tensor列表中按順序或者隨機抽取出一個tensor放入檔名佇列。
label = input_queue[1]
img_contents = tf.read_file(input_queue[0])#一維
image = tf.image.decode_jpeg(img_contents, channels=3)#解碼成三維矩陣
image = tf.image.resize_image_with_crop_or_pad(image, image_w, image_h)
image = tf.image.per_image_standardization(image)
# 生成批次 num_threads 有多少個執行緒根據電腦配置設定
image_batch, label_batch = tf.train.batch([image, label], batch_size=batch_size, num_threads=64, capacity=capacity)
label_batch = tf.reshape(label_batch, [batch_size]) # 重新排列label,行數為[batch_size]
image_batch = tf.cast(image_batch, tf.float32)
return image_batch, label_batch
以上程式碼可簡化為:
import numpy as np
import tensorflow as tf
import os
import cv2
import matplotlib.pyplot as plt
import os
from PIL import Image
def get_files(file_dir):
cats = []
dogs = []
cats_label = []
dogs_label = []
img_dirs = os.listdir(file_dir)#讀取檔名下所有!目錄名(列表形式)
for img_name in img_dirs:# cat.0.jpg
name = img_name.split(".")# ['cat', '0', 'jpg']
if name[0] == "cat":
cats.append(file_dir + img_name)#此處不可以省為img_name,下個函式tf.train.slice_input_producer讀取的是地址!!
cats_label.append(0)
else:
if name[0] == "dog":
dogs.append(file_dir + img_name)
dogs_label.append(1)
img_list = np.hstack((cats, dogs))#列表(字串形式)
label_list = np.hstack((cats_label, dogs_label))#列表(整數形式)
return img_list, label_list
#############################################
def get_batch(image, label, image_w, image_h, batch_size, capacity):#capacity: 佇列中 最多容納圖片的個數
input_queue = tf.train.slice_input_producer([image, label])#tf.train.slice_input_producer是一個tensor生成器,作用是
# 按照設定,每次從一個tensor列表中按順序或者隨機抽取出一個tensor放入檔名佇列。
label = input_queue[1]
img_contents = tf.read_file(input_queue[0])#一維
image = tf.image.decode_jpeg(img_contents, channels=3)#解碼成三維矩陣
image = tf.image.resize_image_with_crop_or_pad(image, image_w, image_h)
image = tf.cast(image, tf.float32)
image = tf.image.per_image_standardization(image)
# 生成批次 num_threads 有多少個執行緒根據電腦配置設定
image_batch, label_batch = tf.train.batch([image, label], batch_size=batch_size, num_threads=64, capacity=capacity)
return image_batch, label_batch
我們使用tf.train.string_input_producer函式。這個函式需要傳入一個檔名list(檔名地址),系統會自動將它轉為一個檔名佇列。tf.train.slice_input_producer可以傳入檔名的同時傳入標籤。
此外tf.train.string_input_producer還有兩個重要的引數,一個是num_epochs。另外一個就是shuffle,shuffle是指在一個epoch內檔案的順序是否被打亂。shuffle=False(不打亂)。
除了tf.train.string_input_producer外,我們還要額外介紹一個函式:tf.train.start_queue_runners。
在我們使用tf.train.string_input_producer建立檔名佇列後,整個系統其實還是處於“停滯狀態”的,也就是說,我們檔名並沒有真正被加入到佇列中。此時如果我們開始計算,因為記憶體佇列中什麼也沒有,計算單元就會一直等待,導致整個系統被阻塞。而使用tf.train.start_queue_runners之後,計算單元就可以拿到資料並進行計算,整個程式也就跑起來了。
連結:[https://zhuanlan.zhihu.com/p/27238630]
2.卷積神經網路提取特徵 model.py
import tensorflow as tf
def inference(image, batch_size, n_classes):
with tf.variable_scope("conv1") as scope:#課本108,variable_scope控制get_variable是獲取(reuse=True)還是建立變數
weights = tf.get_variable("weights", shape=[3,3,3,16], dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.1, dtype=tf.float32))
biases = tf.get_variable("biases", shape=[16], dtype=tf.float32,
initializer=tf.constant_initializer(0.1))
conv = tf.nn.conv2d(image, weights, strides=[1,1,1,1], padding="SAME")
pre_activation = tf.nn.bias_add(conv, biases)
conv1 = tf.nn.relu(pre_activation, name=scope.name)
with tf.variable_scope("pooling1_lrn") as scope:
pool1 = tf.nn.max_pool(conv1, ksize=[1,3,3,1], strides=[1,2,2,1], padding="SAME", name="pooling1")
norm1 = tf.nn.lrn(pool1, depth_radius=4, bias=1.0, alpha=0.001/9.0,beta=0.75, name="norm1")#區域性響應歸一化??????
with tf.variable_scope("conv2") as scope:
weights = tf.get_variable("weights", shape=[3,3,16,16], dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.1, dtype=tf.float32))
biases = tf.get_variable("biases", shape=[16], dtype=tf.float32,
initializer=tf.constant_initializer(0.1))
conv = tf.nn.conv2d(norm1, weights, strides=[1,1,1,1], padding="SAME")
pre_activation = tf.nn.bias_add(conv, biases)
conv2 = tf.nn.relu(pre_activation, name=scope.name)
with tf.variable_scope("pooling2_lrn") as scope:
norm2 = tf.nn.lrn(conv2, depth_radius=4, bias=1.0, alpha=0.001/9.0,beta=0.75, name="norm2")
pool2 = tf.nn.max_pool(norm2, ksize=[1,3,3,1], strides=[1,2,2,1], padding="SAME", name="pooling2")
with tf.variable_scope("local3") as scope:
reshape = tf.reshape(pool2, shape=[batch_size, -1])
dim = reshape.get_shape()[1].value
weights = tf.get_variable("weights", shape=[dim, 128], dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
biases = tf.get_variable("biases", shape=[128], dtype=tf.float32, initializer=tf.constant_initializer(0.1))
local3 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name=scope.name)
with tf.variable_scope("local4") as scope:
weights = tf.get_variable("weights", shape=[128, 128], dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
biases = tf.get_variable("biases", shape=[128], dtype=tf.float32, initializer=tf.constant_initializer(0.1))
local4 = tf.nn.relu(tf.matmul(local3, weights) + biases,name="local4")
with tf.variable_scope("softmax_linear") as scope:
weights = tf.get_variable("weights", shape=[128, n_classes], dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.005, dtype=tf.float32))
biases = tf.get_variable("biases", shape=[n_classes], dtype=tf.float32, initializer=tf.constant_initializer(0.1))
softmax_linear = tf.nn.relu(tf.matmul(local4, weights) + biases,name="softmax_linear")
return softmax_linear
def loss(logits, labels):#輸出結果和標準答案
with tf.variable_scope("loss") as scope:
cross_entropy= tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=labels, name="entropy_per_example")
loss = tf.reduce_mean(cross_entropy)
tf.summary.scalar(scope.name +"/loss",loss)#對標量資料彙總和記錄使用tf.summary.scalar
return loss
def training(loss, learning_rate):
with tf.name_scope("optimizer"):
global_step = tf.Variable(0, name="global_step", trainable=False)#定義訓練的輪數,為不可訓練的引數
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op= optimizer.minimize(loss, global_step=global_step)
#上兩行等價於train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss,global_step=global_step)
return train_op
def evalution(logits, labels):
with tf.variable_scope("accuracy") as scope:
correct = tf.nn.in_top_k(logits, labels, 1)#下面
correct = tf.cast(correct, tf.float16)
accuracy = tf.reduce_mean(correct)
tf.summary.scalar(scope.name+"/accuracy", accuracy)#用來顯示標量資訊
return accuracy
"""
top_1_op取樣本的最大預測概率的索引與實際標籤對比,top_2_op取樣本的最大和僅次最大的兩個預測概率與實際標籤對比,
如果實際標籤在其中則為True,否則為False。
"""
3.訓練 training.py
import tensorflow as tf
import numpy as np
import os
import _input_data
import model
N_CLASSES = 2
IMG_W = 208
IMG_H = 208
BATCH_SIZE = 32
CAPACITY = 256
STEP = 15000 #訓練步數應當大於10000
LEARNING_RATE = 0.0001
x = tf.placeholder(tf.float32, shape=[None,129792])
y_ = tf.placeholder(tf.float32, shape=[None, 2])
def run_training():
train_dir = "F:/mytest/2.cat_dog/train/train/"
log_train_dir = "F:/mytest/2.cat_dog/train_savenet/"
train,train_labels = _input_data.get_files(train_dir)
train_batch, train_label_batch = _input_data.get_batch(train, train_labels, IMG_W,IMG_H,BATCH_SIZE,CAPACITY)
train_logits= model.inference(train_batch, BATCH_SIZE, N_CLASSES)
train_loss= model.loss(train_logits, train_label_batch)
train_op = model.training(train_loss, LEARNING_RATE)
train_acc = model.evalution(train_logits, train_label_batch)
summary_op = tf.summary.merge_all()#merge_all 可以將所有summary全部儲存到磁碟,以便tensorboard顯示。
# 一般這一句就可顯示訓練時的各種資訊。
sess = tf.Session()
train_writer =tf.summary.FileWriter(log_train_dir, sess.graph)#指定一個檔案用來儲存圖
saver = tf.train.Saver()
sess.run(tf.global_variables_initializer())
# Coordinator 和 start_queue_runners 監控 queue 的狀態,不停的入隊出隊
coord = tf.train.Coordinator()#https://blog.csdn.net/weixin_42052460/article/details/80714539
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
for step in np.arange(STEP):
if coord.should_stop():
break
_, tra_loss, tra_acc = sess.run([train_op, train_loss, train_acc])
if step % 50 == 0:#%.2f表示輸出浮點數並保留兩位小數。%%表示直接輸出一個%
print("step %d, train loss = %.2f, train accuracy = %.2f%%" %(step, tra_loss, tra_acc*100.0))
summary_str = sess.run(summary_op)
train_writer.add_summary(summary_str, step) #?????????????
if step % 2000 == 0 or (step+1) ==STEP:
# 每隔2000步儲存一下模型,模型儲存在 checkpoint_path 中
checkpoint_path = os.path.join(log_train_dir, "model.ckpt")
saver.save(sess, checkpoint_path, global_step=step)
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
coord.request_stop()
coord.join(threads)
sess.close()
run_training()
4.測試一張圖片
import tensorflow as tf
from PIL import Image
import numpy as np
import os
import model
import matplotlib.pyplot as plt
import input_data
def get_one_img(test):#從指定目錄中選取一張圖片
file = os.listdir(test)#os.listdir()返回指定目錄下的所有檔案和目錄名。
n = len(file)
ind = np.random.randint(0, n)
img_dir = os.path.join(test, file[ind])#判斷是否存在檔案或目錄name
image= Image.open(img_dir)
plt.imshow(image)
plt.show()
image = image.resize([208, 208])
image = np.array(image)
return image
def evaluate_one_img():
test = "F:/mytest/2.cat_dog/test/test/"
test_array = get_one_img(test)
with tf.Graph().as_default():#https://www.cnblogs.com/studylyn/p/9105818.html
BATCH_SIZE = 1
N_CLASSES = 2
image = tf.cast(test_array, tf.float32)
image = tf.image.per_image_standardization(image)
image = tf.reshape(image,[1,208,208,3])
logit = model.inference(image, BATCH_SIZE, N_CLASSES)
logit = tf.nn.softmax(logit)
x =tf.placeholder(tf.float32, shape =[208,208,3])
log_test_dir = "F:/mytest/2.cat_dog/train_save"
saver = tf.train.Saver()
with tf.Session() as sess:
print("從指定路徑中載入模型。。。")
#將模型載入到sess中
ckpt = tf.train.get_checkpoint_state(log_test_dir)
if ckpt and ckpt.model_checkpoint_path:#https://blog.csdn.net/u011500062/article/details/51728830/
global_step = ckpt.model_checkpoint_path.split("/")[-1].split("-")[-1]
saver.restore(sess, ckpt.model_checkpoint_path)
print("模型載入成功,訓練的步數為 %s", global_step)
else:
print("模型載入失敗,檔案沒有找到。")
#將圖片輸入到模型計算
prediction = sess.run(logit, feed_dict={x: test_array})
max_index = tf.argmax(prediction) # 將圖片輸入到模型計算
if max_index==0:
print('貓的概率 %.6f' %prediction[:, 0])
else:
print('狗的概率 %.6f' %prediction[:, 1])
# 測試
evaluate_one_img()