tensorflow實現貓狗大戰(分類算法)
阿新 • • 發佈:2018-06-06
sse sin output 行操作 ogr cast bytes 序列 raw
本次使用了tensorflow高級API在規範化網絡編程做出了嘗試。
第一步:準備好需要的庫
- tensorflow-gpu 1.8.0
- opencv-python 3.3.1
- numpy
- ubantu 16.4
第二步:準備數據集:
https://www.kaggle.com/c/dogs-vs-cats
我們使用了kaggle的貓狗大戰數據集
我們可以看到數據集中,文件名使用了類.個數.文件類型的標註
為了通用以及方便起見,我們對該數據集進行分文件夾放置:
下面是分類放置的代碼:
import os import shutil output_train_path = ‘/home/a/Datasets/cat&dog/class/cat‘ output_test_path = ‘/home/a/Datasets/cat&dog/class/dog‘ if not os.path.exists(output_train_path): os.makedirs(output_train_path) if not os.path.exists(output_test_path): os.makedirs(output_test_path) def scanDir_lable_File(dir,flag = True): if not os.path.exists(output_train_path): os.makedirs(output_train_path)if not os.path.exists(output_test_path): os.makedirs(output_test_path) for root, dirs, files in os.walk(dir, True, None, False): # 遍列目錄 # 處理該文件夾下所有文件: for f in files: if os.path.isfile(os.path.join(root, f)): a = os.path.splitext(f)# print(a) # lable = a[0].split(‘.‘)[1] lable = a[0].split(‘.‘)[0] print(lable) if lable == ‘cat‘: img_path = os.path.join(root, f) mycopyfile(img_path, os.path.join(output_train_path, f)) else: img_path = os.path.join(root, f) mycopyfile(img_path, os.path.join(output_test_path, f)) def mycopyfile(srcfile,dstfile): if not os.path.isfile(srcfile): print ("%s not exist!"%(srcfile)) else: fpath,fname=os.path.split(dstfile) #分離文件名和路徑 if not os.path.exists(fpath): os.makedirs(fpath) #創建路徑 shutil.copyfile(srcfile,dstfile) #復制文件 print ("copy %s -> %s"%( srcfile,dstfile)) root_path = ‘/home/a/Datasets/cat&dog‘ train_path = root_path+‘/train/‘ test_path = root_path+‘/test/‘ scanDir_lable_File(train_path)
接著為了有效使用內存資源,我們使用tfrecord來對圖片進行存儲
from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import random from tqdm import tqdm import numpy as np import tensorflow as tf from skimage import io, transform, color, util flags = tf.flags flags.DEFINE_string(flag_name=‘directory‘, default_value=‘/home/a/Datasets/cat&dog/class‘, docstring=‘數據地址‘) flags.DEFINE_string(flag_name=‘save_dir‘, default_value=‘./tfrecords‘, docstring=‘保存地址‘) flags.DEFINE_integer(flag_name=‘test_size‘, default_value=350, docstring=‘測試集大小‘) FLAGS = flags.FLAGS MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT] def _float_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(int64_list=tf.train.FloatList(value=value)) def _int_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) def _bytes_feature(value): if not isinstance(value, list): value = [value] return tf.train.Feature(bytes_list=tf.train.BytesList(value=value)) def convert_to_tfrecord(mode, anno): """轉換為TfRecord""" assert mode in MODES, "模式錯誤" filename = os.path.join(FLAGS.save_dir, mode + ‘.tfrecords‘) with tf.python_io.TFRecordWriter(filename) as writer: for fnm, cls in tqdm(anno): # 讀取圖片、轉換 img = io.imread(fnm) img = color.rgb2gray(img) img = transform.resize(img, [224, 224]) # 獲取轉換後的信息 if 3 == img.ndim: rows, cols, depth = img.shape else: rows, cols = img.shape depth = 1 # 創建Example對象 example = tf.train.Example( features=tf.train.Features( feature={ ‘image/height‘: _int_feature(rows), ‘image/width‘: _int_feature(cols), ‘image/depth‘: _int_feature(depth), ‘image/class/label‘: _int_feature(cls), ‘image/encoded‘: _bytes_feature(img.astype(np.float32).tobytes()) } ) ) # 序列化並保存 writer.write(example.SerializeToString()) def get_folder_name(folder): """不遞歸,獲取特定文件夾下所有文件夾名""" fs = os.listdir(folder) fs = [x for x in fs if os.path.isdir(os.path.join(folder, x))] return sorted(fs) def get_file_name(folder): """不遞歸,獲取特定文件夾下所有文件名""" fs = os.listdir(folder) fs = map(lambda x: os.path.join(folder, x), fs) fs = [x for x in fs if os.path.isfile(x)] return fs def get_annotations(directory, classes): """獲取所有圖片路徑和標簽""" files = [] labels = [] for ith, val in enumerate(classes): fi = get_file_name(os.path.join(directory, val)) files.extend(fi) labels.extend([ith] * len(fi)) assert len(files) == len(labels), "圖片和標簽數量不等" # 將圖片路徑和標簽拼合在一起 annotation = [x for x in zip(files, labels)] # 隨機打亂 random.shuffle(annotation) return annotation def main(_): class_names = get_folder_name(FLAGS.directory) annotation = get_annotations(FLAGS.directory, class_names) convert_to_tfrecord(tf.estimator.ModeKeys.TRAIN, annotation[FLAGS.test_size:]) convert_to_tfrecord(tf.estimator.ModeKeys.EVAL, annotation[:FLAGS.test_size]) if __name__ == ‘__main__‘: tf.logging.set_verbosity(tf.logging.INFO) tf.app.run()
再生成tfrecord文件之後
我們選擇對於tfrecord文件進行讀取
def input_fn(mode, batch_size=1): """輸入函數""" def parser(serialized_example): """如何處理數據集中的每一個數據""" # 解析單個example對象 features = tf.parse_single_example( serialized_example, features={ ‘image/height‘: tf.FixedLenFeature([], tf.int64), ‘image/width‘: tf.FixedLenFeature([], tf.int64), ‘image/depth‘: tf.FixedLenFeature([], tf.int64), ‘image/encoded‘: tf.FixedLenFeature([], tf.string), ‘image/class/label‘: tf.FixedLenFeature([], tf.int64), }) # 獲取參數 height = tf.cast(features[‘image/height‘], tf.int32) width = tf.cast(features[‘image/width‘], tf.int32) depth = tf.cast(features[‘image/depth‘], tf.int32) # 還原image image = tf.decode_raw(features[‘image/encoded‘], tf.float32) image = tf.reshape(image, [height, width, depth]) image = image - 0.5 # 還原label label = tf.cast(features[‘image/class/label‘], tf.int32) return image, tf.one_hot(label, FLAGS.classes) if mode in MODES: tfrecords_file = os.path.join(FLAGS.data_dir, mode + ‘.tfrecords‘) else: raise ValueError("Mode 未知") assert tf.gfile.Exists(tfrecords_file), (‘TFRrecords 文件不存在‘) # 創建數據集 dataset = tf.data.TFRecordDataset([tfrecords_file]) # 創建映射 dataset = dataset.map(parser, num_parallel_calls=1) # 設置batch dataset = dataset.batch(batch_size) # 如果是訓練,那麽就永久循環下去 if mode == tf.estimator.ModeKeys.TRAIN: dataset = dataset.repeat() # 創建叠代器 iterator = dataset.make_one_shot_iterator() # 獲取 feature 和 label images, labels = iterator.get_next() return images, labels
接著構建自己的網絡:我們使用tf.layer來進行構建,該方法對於構建網絡十分友好。我們創建一個簡單的CNN網絡
def my_model(inputs, mode): """寫一個網絡""" net = tf.reshape(inputs, [-1, 224, 224, 1]) net = tf.layers.conv2d(net, 32, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) net = tf.layers.conv2d(net, 32, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) net = tf.layers.conv2d(net, 64, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.conv2d(net, 64, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) # print(net) net = tf.reshape(net, [-1, 28 * 28 * 64]) net = tf.layers.dense(net, 1024, activation=tf.nn.relu) net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN)) net = tf.layers.dense(net, FLAGS.classes) return net
對該網絡進行操作
def my_model_fn(features, labels, mode): """模型函數""" # 可視化輸入 tf.summary.image(‘images‘, features) # 創建網絡 logits = my_model(features, mode) predictions = { ‘classes‘: tf.argmax(input=logits, axis=1), ‘probabilities‘: tf.nn.softmax(logits, name=‘softmax_tensor‘) } # 如果是PREDICT,那麽只需要predictions就夠了 if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) # 創建Loss loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits, scope=‘loss‘) tf.summary.scalar(‘train_loss‘, loss) # 設置如何訓練 if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.AdamOptimizer(learning_rate=1e-3) train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step()) else: train_op = None # 獲取訓練精度 accuracy = tf.metrics.accuracy( tf.argmax(labels, axis=1), predictions[‘classes‘], name=‘accuracy‘) accuracy_topk = tf.metrics.mean( tf.nn.in_top_k(predictions[‘probabilities‘], tf.argmax(labels, axis=1), 2), name=‘accuracy_topk‘) metrics = { ‘test_accuracy‘: accuracy, ‘test_accuracy_topk‘: accuracy_topk } # 可視化訓練精度 tf.summary.scalar(‘train_accuracy‘, accuracy[1]) tf.summary.scalar(‘train_accuracy_topk‘, accuracy_topk[1]) return tf.estimator.EstimatorSpec( mode=mode, predictions=predictions, loss=loss, train_op=train_op, eval_metric_ops=metrics)
訓練該網絡
def main(_): # 監視器 logging_hook = tf.train.LoggingTensorHook( every_n_iter=100, tensors={ ‘accuracy‘: ‘accuracy/value‘, ‘accuracy_topk‘: ‘accuracy_topk/value‘, ‘loss‘: ‘loss/value‘ }, ) # 創建 Estimator model = tf.estimator.Estimator( model_fn=my_model_fn, model_dir=FLAGS.model_dir) for i in range(20): # 訓練 model.train( input_fn=lambda: input_fn(tf.estimator.ModeKeys.TRAIN, FLAGS.batch_size), steps=FLAGS.steps, hooks=[logging_hook]) # 測試並輸出結果 print("=" * 10, "Testing", "=" * 10) eval_results = model.evaluate( input_fn=lambda: input_fn(tf.estimator.ModeKeys.EVAL)) print(‘Evaluation results:\n\t{}‘.format(eval_results)) print("=" * 30) if __name__ == ‘__main__‘: tf.logging.set_verbosity(tf.logging.INFO) tf.app.run()
下面是main的總體代碼:
from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import tensorflow as tf flags = tf.app.flags flags.DEFINE_integer(flag_name=‘batch_size‘, default_value=16, docstring=‘Batch 大小‘) flags.DEFINE_string(flag_name=‘data_dir‘, default_value=‘./tfrecords‘, docstring=‘數據存放位置‘) flags.DEFINE_string(flag_name=‘model_dir‘, default_value=‘./cat&dog_model‘, docstring=‘模型存放位置‘) flags.DEFINE_integer(flag_name=‘steps‘, default_value=1000, docstring=‘訓練步數‘) flags.DEFINE_integer(flag_name=‘classes‘, default_value=2, docstring=‘類別數量‘) FLAGS = flags.FLAGS MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT] def input_fn(mode, batch_size=1): """輸入函數""" def parser(serialized_example): """如何處理數據集中的每一個數據""" # 解析單個example對象 features = tf.parse_single_example( serialized_example, features={ ‘image/height‘: tf.FixedLenFeature([], tf.int64), ‘image/width‘: tf.FixedLenFeature([], tf.int64), ‘image/depth‘: tf.FixedLenFeature([], tf.int64), ‘image/encoded‘: tf.FixedLenFeature([], tf.string), ‘image/class/label‘: tf.FixedLenFeature([], tf.int64), }) # 獲取參數 height = tf.cast(features[‘image/height‘], tf.int32) width = tf.cast(features[‘image/width‘], tf.int32) depth = tf.cast(features[‘image/depth‘], tf.int32) # 還原image image = tf.decode_raw(features[‘image/encoded‘], tf.float32) image = tf.reshape(image, [height, width, depth]) image = image - 0.5 # 還原label label = tf.cast(features[‘image/class/label‘], tf.int32) return image, tf.one_hot(label, FLAGS.classes) if mode in MODES: tfrecords_file = os.path.join(FLAGS.data_dir, mode + ‘.tfrecords‘) else: raise ValueError("Mode 未知") assert tf.gfile.Exists(tfrecords_file), (‘TFRrecords 文件不存在‘) # 創建數據集 dataset = tf.data.TFRecordDataset([tfrecords_file]) # 創建映射 dataset = dataset.map(parser, num_parallel_calls=1) # 設置batch dataset = dataset.batch(batch_size) # 如果是訓練,那麽就永久循環下去 if mode == tf.estimator.ModeKeys.TRAIN: dataset = dataset.repeat() # 創建叠代器 iterator = dataset.make_one_shot_iterator() # 獲取 feature 和 label images, labels = iterator.get_next() return images, labels def my_model(inputs, mode): """寫一個網絡""" net = tf.reshape(inputs, [-1, 224, 224, 1]) net = tf.layers.conv2d(net, 32, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) net = tf.layers.conv2d(net, 32, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) net = tf.layers.conv2d(net, 64, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.conv2d(net, 64, [3, 3], padding=‘same‘, activation=tf.nn.relu) net = tf.layers.max_pooling2d(net, [2, 2], strides=2) # print(net) net = tf.reshape(net, [-1, 28 * 28 * 64]) net = tf.layers.dense(net, 1024, activation=tf.nn.relu) net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN)) net = tf.layers.dense(net, FLAGS.classes) return net def my_model_fn(features, labels, mode): """模型函數""" # 可視化輸入 tf.summary.image(‘images‘, features) # 創建網絡 logits = my_model(features, mode) predictions = { ‘classes‘: tf.argmax(input=logits, axis=1), ‘probabilities‘: tf.nn.softmax(logits, name=‘softmax_tensor‘) } # 如果是PREDICT,那麽只需要predictions就夠了 if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) # 創建Loss loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits, scope=‘loss‘) tf.summary.scalar(‘train_loss‘, loss) # 設置如何訓練 if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.AdamOptimizer(learning_rate=1e-3) train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step()) else: train_op = None # 獲取訓練精度 accuracy = tf.metrics.accuracy( tf.argmax(labels, axis=1), predictions[‘classes‘], name=‘accuracy‘) accuracy_topk = tf.metrics.mean( tf.nn.in_top_k(predictions[‘probabilities‘], tf.argmax(labels, axis=1), 2), name=‘accuracy_topk‘) metrics = { ‘test_accuracy‘: accuracy, ‘test_accuracy_topk‘: accuracy_topk } # 可視化訓練精度 tf.summary.scalar(‘train_accuracy‘, accuracy[1]) tf.summary.scalar(‘train_accuracy_topk‘, accuracy_topk[1]) return tf.estimator.EstimatorSpec( mode=mode, predictions=predictions, loss=loss, train_op=train_op, eval_metric_ops=metrics) def main(_): # 監視器 logging_hook = tf.train.LoggingTensorHook( every_n_iter=100, tensors={ ‘accuracy‘: ‘accuracy/value‘, ‘accuracy_topk‘: ‘accuracy_topk/value‘, ‘loss‘: ‘loss/value‘ }, ) # 創建 Estimator model = tf.estimator.Estimator( model_fn=my_model_fn, model_dir=FLAGS.model_dir) for i in range(20): # 訓練 model.train( input_fn=lambda: input_fn(tf.estimator.ModeKeys.TRAIN, FLAGS.batch_size), steps=FLAGS.steps, hooks=[logging_hook]) # 測試並輸出結果 print("=" * 10, "Testing", "=" * 10) eval_results = model.evaluate( input_fn=lambda: input_fn(tf.estimator.ModeKeys.EVAL)) print(‘Evaluation results:\n\t{}‘.format(eval_results)) print("=" * 30) if __name__ == ‘__main__‘: tf.logging.set_verbosity(tf.logging.INFO) tf.app.run()
在訓練完成後,我們對結果進行預測:
"""Run inference a DeepLab v3 model using tf.estimator API.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse import os import sys import tensorflow as tf import train from skimage import io, transform, color, util mode = tf.estimator.ModeKeys.PREDICT _NUM_CLASSES = 2 image_size = [224,224] image_files = ‘/home/a/Datasets/cat&dog/test/44.jpg‘ model_dir = ‘./cat&dog_model/‘ def main(unused_argv): # Using the Winograd non-fused algorithms provides a small performance boost. os.environ[‘TF_ENABLE_WINOGRAD_NONFUSED‘] = ‘1‘ # model = tf.estimator.Estimator( model_fn=train.my_model_fn, model_dir=model_dir) def predict_input_fn(image_path): img = io.imread(image_path) img = color.rgb2gray(img) img = transform.resize(img, [224, 224]) image = img - 0.5 # preprocess image: scale pixel values from 0-255 to 0-1 images = tf.image.convert_image_dtype(image, dtype=tf.float32) dataset = tf.data.Dataset.from_tensors((images,)) return dataset.batch(1).make_one_shot_iterator().get_next() def predict(image_path): result = model.predict(input_fn=lambda: predict_input_fn(image_path=image_path)) for r in result: print(r) if r[‘classes‘] ==1: print(‘dog‘,r[‘probabilities‘][1]) else: print(‘cat‘,r[‘probabilities‘][0]) predict(image_files) if __name__ == ‘__main__‘: tf.logging.set_verbosity(tf.logging.INFO) tf.app.run(main=main)
因為網絡非常簡單,所以測試精度大概在75%左右
這個是最終網絡圖:
tensorflow實現貓狗大戰(分類算法)