1. 程式人生 > >tensorflow爬坑行:資料讀取

tensorflow爬坑行:資料讀取

tensorflow的資料讀取

tensorflow在讀取像imagenet這種大量影象資料,不能一次性load進記憶體時有幾個坑,Mark一記,以助後來者。關於多GPU和分散式,本文只討論資料並行方式,即每個GPU上面執行一個網路,稱為tower。
* 資料格式TFRecord
* 資料讀取第一層封裝——Queue,Coordinator,QueueRunner
* 常用封裝——tf.train.xxx_input_producer,tf.reader,tf.train.batch

1. 建立TFRecord

TFRecord本身是一個集合,每條記錄是一個tf.train.Example(一種protocol buffer),包含有多個欄位(稱為feature),TFRecord提供的格式一般包含幾個重要的feature,比如代表大小的image/height、image/width,代表類別的image/class/label、image/class/text,代表資料的image/encoded等等。

建立TFRecord檔案就是按照feature名把相應欄位填充,介面為tf.python_io.TFRecordWriter和tf.train.Example,典型程式碼如下:

writer = tf.python_io.TFRecordWriter(output_file)
example = tf.train.Example(features=tf.train.Features(feature={
      'image/height': _int64_feature(height),
      'image/width': _int64_feature(width),
      'image/colorspace'
: _bytes_feature(colorspace), 'image/channels': _int64_feature(channels), 'image/class/label': _int64_feature(label), 'image/class/text': _bytes_feature(text), 'image/format': _bytes_feature(image_format), 'image/filename': _bytes_feature(os.path.basename(filename)), 'image/encoded'
: _bytes_feature(image_buffer)})) writer.write(example.SerializeToString())

讀取則是讀出相應欄位,介面為 tf.TFRecordReader和tf.parse_single_example。他們本身都是op,需要在session中執行,無法過程式執行。對於jpg、png等壓縮圖片格式,讀出image/encoded相應資料後需要用tf.image.decode_jpeg,tf.image.decode_png等op來進行解碼,解析TFRecord的典型程式碼如下(tf.TFRecordReader的程式碼見下節):

 feature_map = {
      'image/encoded': tf.FixedLenFeature([], dtype=tf.string,
                                          default_value=''),
      'image/class/label': tf.FixedLenFeature([1], dtype=tf.int64,
                                              default_value=-1),
      'image/class/text': tf.FixedLenFeature([], dtype=tf.string,
                                             default_value='')}
features = tf.parse_single_example(example_serialized, feature_map)
label = tf.cast(features['image/class/label'], dtype=tf.int32)
image = tf.image.decode_jpeg(features['image/encoded'])

需要特別注意的是,TF中一般一個TFRecord檔案會寫入多個example,這樣可以把原始檔案個數減少很多,從而減輕filename Queue(見下節)的負擔。讀取的方法見下節。

程式碼導讀

tensorflow的model zoo中的inception模型示例了imagenet的讀取方法。

建立TFRecord檔案集只需要略微修改build_image_data或者build_imagenet_data 就好了,他們的主體部分實現了多執行緒讀取影象、轉換和輸出,前提是影象以dir/label/xxx.jpg 的命名規則存放。關鍵程式碼就是每幅影象建立tf.train.Example以及多次呼叫tf.python_io.TFRecordWriter的write方法寫入磁碟。

讀取的程式碼在image_processing中,imagenet_train和imagnet_distributed_train都是呼叫了這個檔案的

2. 單tower讀取流程

如前所述,典型情況下,在目錄dir下有若干TFRecord檔案,每個檔案有若干記錄,每個記錄是一幅影象,可以是編碼後的也可以是raw data。
流程的起點,tf讀取dir下的所有TFRecord檔名並放在記憶體,整個系統有三個佇列緩衝:FilnameQueue,ExampleQueue,BatchQueue。
那麼在訓練和測試模型的時候,資料讀取的流程就是:
* 把檔名shuffle並放入FilenameQueue
* 多執行緒,每個執行緒選擇一個檔案(從FilenameQueue中dequeue),並從該檔案中讀取一條記錄放到ExampleQueue
* 選擇一條記錄(從ExampleQueue中dequeue),把若干記錄合併成一個batch放到BatchQueue中
* 從BatchQueue中取資料給模型訓練或測試

前兩個操作如下圖所示,注意到在用多個reader的時候一般是多執行緒的,每個執行緒有一個reader讀取影象,還有一個enqueue的op把處理後的影象放進example enqueue

多執行緒讀取

tf.train.XXXbatchXXX()封裝了從一個op–examplequeue.dequeue到讀出batchsize個example的過程。在這個呼叫中,tf建立一個新的BatchQueue,並建立多個執行緒用來讀取ExampleQueue並填到BatchQueue中,典型程式碼如下:

filename_queue = tf.train.string_input_producer(data_files,shuffle=True,capacity=16)
examples_queue = tf.RandomShuffleQueue(...)
for _ in range(num_readers):
    reader = tf.TFRecordReader()
    _, value = reader.read(filename_queue)
    enqueue_ops.append(examples_queue.enqueue([value]))
runner = tf.train.queue_runner.QueueRunner(examples_queue, enqueue_ops)
tf.train.queue_runner.add_queue_runner(runner)
example_serialized = examples_queue.dequeue()   
for thread_id in range(num_preprocess_threads):
    image_buffer, label_index, ... = parse_example_proto(example_serialized)
    image = image_preprocessing(image_buffer, ...)
    images_and_labels.append([image, label_index])
images, label_index_batch = tf.train.batch_join(images_and_labels,...)

此處有諸多值得推敲的細節:
* reader.read(filename_queue)方法在讀取檔案的時候每次讀出一個記錄,並在一個檔案的所有記錄都被讀出過之後,filename_queue才會dequeue
* 建立num_readers個reader和examples_queue.enqueue(),並放進QueueRunner中之後,一旦執行tf.train.start_queue_runners(),QueueRunner將會建立num_readers個執行緒獨立執行read和enqueue
* images_and_labels是一個長度為num_preprocess_threads的list,而tf.train.batch_join會在執行時,為每個元素建立一個執行緒來獲取其資料,他們會共用examples_queue.dequeue() ,而獨立執行解析和image_preprocessing。
* 把example_serialized = examples_queue.dequeue()放在下面的迴圈中不會更好,因為這樣每個執行緒雖然可以獨立dequeue,但是ExampleQueue本身會在dequeue的時候加鎖以保證資料一致性,所以效率並不能提升。
* images並不是真正的資料,只是定義了計算流圖,而要想拿到真正的資料需要在session中呼叫eval方法:

img = tf.placeholder(...)
loss = xxmodel.loss(img,...)
with tf.Session():
    tf.initialize_all_variables().run()
    tf.train.start_queue_runners()
    imagedata = images.eval()
    print loss.eval(feed_dict={img:imagedata,...})

或者直接輸入到模型中訓練或測試:

loss = xxmodel.loss(images)
with tf.Session():
    tf.initialize_all_variables().run()
    tf.train.start_queue_runners()
    print loss.eval()

以上程式碼是整個資料讀取pipeline的核心,計算流圖的時候回溯執行,有以下幾個步驟:
* tf.train.start_queue_runners()被執行之後,每個執行緒都要監測自己enqueue的queue是不是滿的,不滿的話就會執行enqueue
* 計算loss的時候需要往前回溯整個網路,一直到網路的輸入,就是從BatchQueue獲取的那個Tensor,獲取它意味著BatchQueue執行dequeue,從而使得BatchQueue不滿
* 負責從ExampleQueue獲取、加工資料的執行緒會把EampleQueue中的資料讀出,填到BatchQueue,使得ExampleQueue不滿
* 負責填充ExampleQueue的執行緒會呼叫TFRecordReader.read()方法來獲得硬碟檔案的資料。如果該檔案中的記錄已經全都被讀出過了,則使FilenameQueue出隊,不滿,否則不變
* 如果FilenameQueue不滿的情況發生,則自動填充檔名。

注意就讀取任務而言,單機上單GPU(單tower)、多GPU(tower)是有區別的,而是否運行於多機叢集環境,事實上並不重要,關鍵還是每臺機器上面是多GPU還是單GPU。因為叢集中ps並不讀取影象資料,而每個worker獨立讀取各自的檔案,並不相互影響。多tower的讀取留待下篇,與tensorflow的分散式並行一起討論。

3. 低層封裝

以上介紹了高層API介面,本節其底層原理及介面,下節在此基礎上重新看待上述高層介面。本節介紹的介面主要用於理解,以筆者的經驗,實際使用中高層介面已經足夠了。本節主要解釋:

  • tf.train.queue_runner.QueueRunner的多執行緒原理
  • tf.train.batch_join的多執行緒原理
  • tf.TFRecordReader讀取細節

QueueRunner

典型的多執行緒FIFO緩衝,
Coordinator的核心實體是一個python標準庫裡的threading Event Object,然後封裝了一些操作方法來同步各個執行緒一起stop。所以要在建立執行緒的時候把Coordinator傳進去用,這樣的用法需要顯式開闢和管理執行緒,會比較麻煩,而這些執行緒主要用來進行enqueue,所以tf提供了QueueRunner來管理這些用於enqueue的執行緒。

QueueRunner在初始化時需要提供queue和一個包含enqueue操作的list,然後呼叫create_thread,提供Coordinator來建立和同步執行緒。也可以不顯式呼叫create_thread,可以使用tf.train.start_queue_runner()一次性啟動所有runner,該函式內部會自己維護一個coordinatior,從而簡化了程式設計。

QueueRunner的執行緒就是在loop執行enqueue,而enqueue時如果queue已經滿了就直接返回,而dequeue時候如果queue空了則丟擲異常OutOfRange,見FIFOQueue的原始碼。

注意,QueueRunner的enqueue一般需要dequeue前一個queue,所以QueueRunner的enqueue依賴於前一個queue的dequeue操作,即QueueRunner的每個執行緒不僅對本queue執行了enqueue一般也會對前一個queue執行dequeue

整個過程就是,初始化的時候在主執行緒中建立各個queue,然後通過Runner和Coordinator建立相應的enqueue子執行緒。啟動session後,子執行緒start,填滿queue之後的loop實際上無操作,然後主執行緒執行train或者inference,反向遍歷網路中的op,到達資料讀取那個op,即對example batch queue的dequeue操作,就會產生連鎖反應使得各個queue中的entry出隊,一直在loop的子執行緒就會馬上讀取entry並enqueue進去。

batch_join()

tf.train中的batch族函式有四個,呼叫他們時都會建立一個Queue和QueueRunner,就像上節所描述的那樣,只是做了封裝。

tf.train.batch_join()會把tensors_list中的每個元素都開一個執行緒來單獨獲取資料,並把這些得到的資料按照batch_size大小組成新的張量放進Queue,然後再呼叫Queue的dequeue來返回。

reader.read()

一個重要的問題是,shuffle讀取batch的時候如何在一個epoch內不重複?一個epoch之後queue是不是就空了?

第二個問題很好回答,因為呼叫tf.train.string_input_producer的時候會建立相應的QueueRunner來持續填充FilenameQueue。在一開始,producer會儲存下檔名集合,它內部有epoch的概念:epoch的大小也就是集合元素數目,每次enqueue都是順序從集合中取值,並且會計數,夠一個epoch之後會把集合shffule,這樣就既保證了一個epoch內的隨機性,又保證了一個epoch之內所有樣本全部遍歷一遍且不重複。

程式碼導讀

Coordinator的原始碼中可以看出,其主要介面join、request_stop、should_stop都是對自身成員變數_stop_event的操作。

tf的queue系統是用C++實現的