1. 程式人生 > 程式設計 >利用Tensorflow的佇列多執行緒讀取資料方式

利用Tensorflow的佇列多執行緒讀取資料方式

在tensorflow中,有三種方式輸入資料

1. 利用feed_dict送入numpy陣列

2. 利用佇列從檔案中直接讀取資料

3. 預載入資料

其中第一種方式很常用,在tensorflow的MNIST訓練原始碼中可以看到,通過feed_dict={},可以將任意資料送入tensor中。

第二種方式相比於第一種,速度更快,可以利用多執行緒的優勢把資料送入佇列,再以batch的方式出隊,並且在這個過程中可以很方便地對影象進行隨機裁剪、翻轉、改變對比度等預處理,同時可以選擇是否對資料隨機打亂,可以說是非常方便。該部分的原始碼在tensorflow官方的CIFAR-10訓練原始碼中可以看到,但是對於剛學習tensorflow的人來說,比較難以理解,本篇部落格就當成我除錯完成後寫的一篇總結,以防自己再忘記具體細節。

讀取CIFAR-10資料集

按照第一種方式的話,CIFAR-10的讀取只需要寫一段非常簡單的程式碼即可將測試集與訓練集中的影象分別讀取:

path = 'E:\Dataset\cifar-10\cifar-10-batches-py'
# extract train examples
num_train_examples = 50000
x_train = np.empty((num_train_examples,32,3),dtype='uint8')
y_train = np.empty((num_train_examples),dtype='uint8')
for i in range(1,6): 
 fpath = os.path.join(path,'data_batch_' + str(i)) 
 (x_train[(i - 1) * 10000: i * 10000,:,:],y_train[(i - 1) * 10000: i * 10000])   = load_and_decode(fpath)
# extract test examples
fpath = os.path.join(path,'test_batch')
x_test,y_test = load_and_decode(fpath)
return x_train,y_train,x_test,np.array(y_test)

其中load_and_decode函式只需要按照CIFAR-10官網給出的方式decode就行,最終返回的x_train是一個[50000,3]的ndarray,但對於ndarray來說,進行預處理就要麻煩很多,為了取mini-SGD的batch,還自己寫了一個類,通過呼叫train_set.next_batch()函式來取,總而言之就是什麼都要自己動手,效率確實不高

但對於第二種方式,讀取起來就要麻煩很多,但使用起來,又快又方便

首先,把CIFAR-10的測試集檔案讀取出來,生成檔名列表

path = 'E:\Dataset\cifar-10\cifar-10-batches-py'
filenames = [os.path.join(path,'data_batch_%d' % i) for i in range(1,6)]

有了列表以後,利用tf.train.string_input_producer函式生成一個讀取佇列

filename_queue = tf.train.string_input_producer(filenames)

接下來,我們呼叫read_cifar10函式,得到一幅一幅的影象,該函式的程式碼如下:

def read_cifar10(filename_queue):
 label_bytes = 1
 IMAGE_SIZE = 32
 CHANNELS = 3
 image_bytes = IMAGE_SIZE*IMAGE_SIZE*3
 record_bytes = label_bytes+image_bytes

 # define a reader
 reader = tf.FixedLengthRecordReader(record_bytes)
 key,value = reader.read(filename_queue)
 record_bytes = tf.decode_raw(value,tf.uint8)

 label = tf.strided_slice(record_bytes,[0],[label_bytes])
 depth_major = tf.reshape(tf.strided_slice(record_bytes,[label_bytes],[label_bytes + image_bytes]),[CHANNELS,IMAGE_SIZE,IMAGE_SIZE])
 image = tf.transpose(depth_major,[1,2,0])
 return image,label

第9行,定義一個reader,來讀取固定長度的資料,這個固定長度是由CIFAR-10資料集圖片的儲存格式決定的,1byte的標籤加上32 *32 *3長度的影象,3代表RGB三通道,由於圖片的是按[channel,height,width]的格式儲存的,為了變為常用的[height,width,channel]維度,需要在17行reshape一次影象,最終我們提取出了一副完整的影象與對應的標籤

對影象進行預處理

我們取出的image與label均為tensor格式,因此預處理將變得非常簡單

 if not distortion:
  IMAGE_SIZE = 32
 else:
  IMAGE_SIZE = 24
  # 隨機裁剪為24*24大小
  distorted_image = tf.random_crop(tf.cast(image,tf.float32),[IMAGE_SIZE,3])
  # 隨機水平翻轉
  distorted_image = tf.image.random_flip_left_right(distorted_image)
  # 隨機調整亮度
  distorted_image = tf.image.random_brightness(distorted_image,max_delta=63)
  # 隨機調整對比度
  distorted_image = tf.image.random_contrast(distorted_image,lower=0.2,upper=1.8)
  # 對影象進行白化操作,即畫素值轉為零均值單位方差
  float_image = tf.image.per_image_standardization(distorted_image)

distortion是定義的一個輸入布林型變數,預設為True,表示是否對影象進行處理

填充佇列與隨機打亂

呼叫tf.train.shuffle_batch或tf.train.batch函式,以tf.train.shuffle_batch為例,函式的定義如下:

def shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,num_threads=1,seed=None,enqueue_many=False,shapes=None,allow_smaller_final_batch=False,shared_name=None,name=None):

tensors表示輸入的張量(tensor),batch_size表示要輸出的batch的大小,capacity表示佇列的容量,即大小,min_after_dequeue表示出隊操作後佇列中的最小元素數量,這個值是要小於佇列的capacity的,通過調整min_after_dequeue與capacity兩個變數,可以改變資料被隨機打亂的程度,num_threads表示使用的執行緒數,只要取大於1的數,佇列的效率就會高很多。

通常情況下,我們只需要輸入以上幾個變數即可,在CIFAR-10_input.py中,谷歌給出的程式碼是這樣寫的:

if shuffle:
 images,label_batch = tf.train.shuffle_batch([image,label],min_queue_examples+3*batch_size,min_queue_examples,num_preprocess_threads)
else:
 images,label_batch = tf.train.batch([image,num_preprocess_threads,min_queue_examples + 3 * batch_size)

min_queue_examples由以下方式得到:

min_fraction_of_examples_in_queue = 0.4
min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN 
       *min_fraction_of_examples_in_queue)

當然,這些值均可以自己隨意設定,

最終得到的images,labels(label_batch),即為shape=[128,3]的tensor,其中128為預設batch_size。

啟用佇列與處理異常

得到了images和labels兩個tensor後,我們便可以把這兩個tensor送入graph中進行運算了

# input tensor
img_batch,label_batch = cifar10_input.tesnsor_shuffle_input(batch_size)

# build graph that computes the logits predictions from the inference model
logits,predicts = train.inference(img_batch,keep_prob)

# calculate loss
loss = train.loss(logits,label_batch)

定義sess=tf.Session()後,執行sess.run(),然而你會發現並沒有輸出,程式直接掛起了,彷彿死掉了一樣

原因是這樣的,雖然我們在資料流圖中加入了佇列,但只有呼叫tf.train.start_queue_runners()函式後,資料才會動起來,被負責輸入管道的執行緒填入佇列,否則佇列將會掛起。

OK,我們呼叫函式,讓佇列執行起來

with tf.Session(config=run_config) as sess:
 sess.run(init_op) # intialization
 queue_runner = tf.train.start_queue_runners(sess)
 for i in range(10):
  b1,b2 = sess.run([img_batch,label_batch])
  print(b1.shape)

在這裡為了測試,我們取10次輸出,看看輸出的batch1的維度是否正確

利用Tensorflow的佇列多執行緒讀取資料方式

10個batch的維度均為正確的,但是tensorflow卻報了錯,錯誤的文字內容如下:

2017-12-19 16:40:56.429687: W C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\kernels\queue_base.cc:295] _ 0 _ input_producer: Skipping cancelled enqueue attempt with queue not closed

簡單地看一下,大致意思是說我們的佇列裡還有資料,但是程式結束了,丟擲了異常,因此,我們還需要定義一個Coordinator,也就是協調器來處理異常

Coordinator有3個主要方法:

1. tf.train.Coordinator.should_stop() 如果執行緒應該停止,返回True

2. tf.train.Coordinator.request_stop() 請求停止執行緒

3. tf.train.Coordinator.join() 等待直到指定執行緒停止

首先,定義協調器

coord = tf.train.Coordinator()

將協調器應用於QueueRunner

queue_runner = tf.train.start_queue_runners(sess,coord=coord)

結束資料的訓練或測試後,關閉執行緒

coord.request_stop()
coord.join(queue_runner)

最終的sess程式碼段如下:

coord = tf.train.Coordinator()
with tf.Session(config=run_config) as sess:
 sess.run(init_op)
 queue_runner = tf.train.start_queue_runners(sess,coord=coord)
 for i in range(10):
  b1,label_batch])
  print(b1.shape)
 coord.request_stop()
 coord.join(queue_runner)

得到的輸出結果為:

利用Tensorflow的佇列多執行緒讀取資料方式

完美解決,利用img_batch與label_batch,把tensor送入graph中,就可以享受tensorflow帶來的訓練樂趣了

以上這篇利用Tensorflow的佇列多執行緒讀取資料方式就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。