1. 程式人生 > >TensorFlow讀取二進位制檔案資料到佇列

TensorFlow讀取二進位制檔案資料到佇列

       TensorFlow是一種符號程式設計框架(與theano類似),先構建資料流圖再輸入資料進行模型訓練。Tensorflow支援很多種樣例輸入的方式。最容易的是使用placeholder,但這需要手動傳遞numpy.array型別的資料。第二種方法就是使用二進位制檔案和輸入佇列的組合形式。這種方式不僅節省了程式碼量,避免了進行data augmentation和讀檔案操作,可以處理不同型別的資料, 而且也不再需要人為地劃分開“預處理”和“模型計算”。在使用TensorFlow進行非同步計算時,佇列是一種強大的機制。


佇列使用概述

       正如TensorFlow中的其他元件一樣,佇列就是TensorFlow圖中的節點。這是一種有狀態的節點,就像變數一樣:其他節點可以修改它的內容。具體來說,其他節點可以把新元素插入到佇列後端(rear),也可以把佇列前端(front)的元素刪除。佇列,如FIFOQueue和RandomShuffleQueue(A queue implementation that dequeues elements in a random order.)等物件,在TensorFlow的tensor非同步計算時都非常重要。例如,一個典型的輸入結構是使用一個RandomShuffleQueue來作為模型訓練的輸入,多個執行緒準備訓練樣本,並且把這些樣本壓入佇列,一個訓練執行緒執行一個訓練操作,此操作會從佇列中移除最小批次的樣本(mini-batches),這種結構具有許多優點。 

TensorFlow的Session物件是可以支援多執行緒的,因此多個執行緒可以很方便地使用同一個會話(Session)並且並行地執行操作。然而,在Python程式實現這樣的並行運算卻並不容易。所有執行緒都必須能被同步終止,異常必須能被正確捕獲並報告,會話終止的時候, 佇列必須能被正確地關閉。所幸TensorFlow提供了兩個類來幫助多執行緒的實現:tf.Coordinator和 tf.QueueRunner。從設計上這兩個類必須被一起使用。Coordinator類可以用來同時停止多個工作執行緒並且向那個在等待所有工作執行緒終止的程式報告異常。QueueRunner類用來協調多個工作執行緒同時

將多個tensor壓入同一個佇列中。

(1)讀二進位制檔案資料到佇列中

        同很多其他的深度學習框架一樣,TensorFlow有它自己的二進位制格式。它使用了a mixture of its Records 格式和protobuf。Protobuf是一種序列化資料結構的方式,給出了關於資料的一些描述。TFRecords是tensorflow的預設資料格式,一個record就是一個包含了序列化tf.train.Example 協議快取物件的二進位制檔案,可以使用python建立這種格式,然後便可以使用tensorflow提供的函式來輸入給機器學習模型。
import tensorflow as tf

def read_and_decode_single_example(filename_queue):
# 定義一個空的類物件,類似於c語言裡面的結構體定義
    class Image(self):
    pass
    image = Image()
    image.height = 32
    image.width = 32
    image.depth = 3
    label_bytes = 1
    
    Bytes_to_read = label_bytes+image.heigth*image.width*3
    # A Reader that outputs fixed-length records from a file
    reader = tf.FixedLengthRecordReader(record_bytes=Bytes_to_read) 
    # Returns the next record (key, value) pair produced by a reader, key 和value都是字串型別的tensor
    # Will dequeue a work unit from queue if necessary (e.g. when the
    # Reader needs to start reading from a new file since it has
    # finished with the previous file).
    image.key, value_str = reader.read(filename_queue) 
    # Reinterpret the bytes of a string as a vector of numbers,每一個數值佔用一個位元組,在[0, 255]區間內,因此out_type要取uint8型別
    value = tf.decode_raw(bytes=value_str, out_type=tf.uint8) 
    # Extracts a slice from a tensor, value中包含了label和feature,故要對向量型別tensor進行'parse'操作
    image.label = tf.slice(input_=value, begin=[0], size=[1])
    value = value.slice(input_=value, begin=[1], size=[-1]).reshape((image.depth, image.height, image.width))
    transposed_value = tf.transpose(value, perm=[2, 0, 1]) 
    image.mat = transposed_value 
    return image 
接下來我們便可以呼叫這個函數了,
filenames =[os.path.join(data_dir, 'test_batch.bin')]
# Output strings (e.g. filenames) to a queue for an input pipeline
filename_queue = tf.train.string_input_producer(string_tensor=filenames) 
# returns symbolic label and image
img_obj = read_and_decode_single_example("filename_queue")
Label = img_obj.label
Image = img_obj.mat
sess = tf.Session()
# 初始化tensorflow圖中的所有狀態,如待讀取的下一個記錄tfrecord的位置,variables等
init = tf.initialize_all_variables()
sess.run(init)
tf.train.start_queue_runners(sess=sess)
# grab examples back.
# first example from file
label_val_1, image_val_1 = sess.run([label, image])
# second example from file
label_val_2, image_val_2 = sess.run([label, image])
      值得一提的是,TFRecordReader總是作用於檔名佇列。它將會從佇列中彈出檔名並使用該檔名,直到tfrecord為空時停止,此時它將從檔名佇列中彈出下一個filename。然而,檔名佇列又是怎麼得來的呢?起初這個佇列是空的,QueueRunners的概念即源於此。QueueRunners本質上就是一個執行緒thread,這個執行緒負責使用會話session並不斷地呼叫enqueue操作。Tensorflow把這個模式封裝在tf.train.QueueRunner物件裡面。入佇列操作99%的時間都可以被忽略掉,因為這個操作是由後臺負責執行。(比如在上面的例子中,tf.train.string_input_producer建立了一個這樣的執行緒,新增QueueRunner到資料流圖中)。

      可想而知,在你執行任何訓練步驟之前,我們要告知tensorflow去啟動這些執行緒,否則這些佇列會因為等待資料入隊而被堵塞,導致資料流圖將一直處於掛起狀態。我們可以呼叫tf.train.start_queue_runners(sess=sess)來啟動所有的QueueRunners。這個呼叫並不是符號化的操作,它會啟動輸入管道的執行緒,填充樣本到佇列中,以便出隊操作可以從佇列中拿到樣本。另外,必須要先執行初始化操作再建立這些執行緒。如果這些佇列未被初始化,tensorflow會丟擲錯誤。

(2)從二進位制檔案中讀取mini-batchs

       在訓練機器學習模型時,使用單個樣例更新引數屬於“online learning”,然而線上下環境下,我們通常採用基於mini-batchs 隨機梯度下降法(SGD),但是在tensorflow中如何利用queuerunners返回訓練塊資料呢?請參見下面的程式:

image_batch, label_batch = tf.train.shuffle_batch(tensor_list=[image, label]], 
                                                  batch_size=batch_size, 
                                                  num_threads=24, 
                                                  min_after_dequeue=min_samples_in_queue,
                                                  capacity=min_samples_in_queue+3*batch_size)  

讀取batch資料需要使用新的佇列queues和QueueRunners(大致流程圖如下)。Shuffle_batch構建了一個RandomShuffleQueue,並不斷地把單個的(image,labels)對送入佇列中,這個入隊操作是通過QueueRunners啟動另外的執行緒來完成的。這個RandomShuffleQueue會順序地壓樣例到佇列中,直到佇列中的樣例個數達到了batch_size+min_after_dequeue個。它然後從佇列中選擇batch_size個隨機的元素進行返回。事實上,shuffle_batch返回的值就是RandomShuffleQueue.dequeue_many()的結果。有了這個batches變數,就可以開始訓練機器學習模型了。



函式 tf.train.shuffle_batch(tensor_list, batch_size, capacity, min_after_dequeue, num_threads=1, seed=None, enqueue_many=False, shapes=None, shared_name=None, name=None)的使用說明:
作用:Creates batches by randomly shuffling tensors.(從佇列中隨機篩選多個樣例返回給image_batch和label_batch);
引數說明:
tensor_list: The list of tensors to enqueue.(待入隊的tensor list);
batch_size: The new batch size pulled from the queue;
capacity: An integer. The maximum number of elements in the queue(佇列長度);
min_after_dequeue: Minimum number elements in the queue after a dequeue, used to ensure a level of mixing of elements.(隨機取樣的樣本總體最小值,用於保證所取mini-batch的隨機性);
num_threads: The number of threads enqueuing `tensor_list`.(session會話支援多執行緒,這裡可以設定多執行緒加速樣本的讀取)
seed: Seed for the random shuffling within the queue.
enqueue_many: Whether each tensor in `tensor_list` is a single example.(為False時表示tensor_list是一個樣例,壓入時佔用佇列中的一個元素;為True時表示 tensor_list中的每一個元素都是一個樣例,壓入時佔用佇列中的一個元素位置,可以看作為一個batch);
shapes: (Optional) The shapes for each example. Defaults to the inferred shapes for `tensor_list`.
shared_name: (Optional) If set, this queue will be shared under the given name across multiple sessions.

name: (Optional) A name for the operations.


參考資料:https://www.tensorflow.org/versions/r0.11/how_tos/reading_data/index.html#creating-threads-to-prefetch-using-queuerunner-objects

https://indico.io/blog/tensorflow-data-inputs-part1-placeholders-protobufs-queues/