1. 程式人生 > >tensorflow載入資料的幾種方式

tensorflow載入資料的幾種方式

http://wiki.jikexueyuan.com/project/tensorflow-zh/how_tos/reading_data.html#QueueRunner

http://wiki.jikexueyuan.com/project/tensorflow-zh/how_tos/threading_and_queues.html

https://haosdent.gitbooks.io/tensorflow-document/content/how_tos/

建立執行緒並使用QueueRunner物件來預取

簡單來說:使用上面列出的許多tf.train函式新增QueueRunner到你的資料流圖中。在你執行任何訓練步驟之前,需要呼叫

tf.train.start_queue_runners函式,否則資料流圖將一直掛起。tf.train.start_queue_runners 這個函式將會啟動輸入管道的執行緒,填充樣本到佇列中,以便出隊操作可以從佇列中拿到樣本。這種情況下最好配合使用一個tf.train.Coordinator,這樣可以在發生錯誤的情況下正確地關閉這些執行緒。如果你對訓練迭代數做了限制,那麼需要使用一個訓練迭代數計數器,並且需要被初始化。推薦的程式碼模板如下:

# Create the graph, etc.
init_op = tf.initialize_all_variables()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)

except tf.errors.OutOfRangeError:
    print 'Done training -- epoch limit reached'
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()

疑問: 這是怎麼回事?

首先,我們先建立資料流圖,這個資料流圖由一些流水線的階段組成,階段間用佇列連線在一起。第一階段將生成檔名,我們讀取這些檔名並且把他們排到檔名佇列中。第二階段從檔案中讀取資料(使用Reader),產生樣本,而且把樣本放在一個樣本佇列中。根據你的設定,實際上也可以拷貝第二階段的樣本,使得他們相互獨立,這樣就可以從多個檔案中並行讀取。在第二階段的最後是一個排隊操作,就是入隊到佇列中去,在下一階段出隊。因為我們是要開始執行這些入隊操作的執行緒,所以我們的訓練迴圈會使得樣本佇列中的樣本不斷地出隊。

tf.train中要建立這些佇列和執行入隊操作,就要新增tf.train.QueueRunner

到一個使用tf.train.add_queue_runner函式的資料流圖中。每個QueueRunner負責一個階段,處理那些需要線上程中執行的入隊操作的列表。一旦資料流圖構造成功,tf.train.start_queue_runners函式就會要求資料流圖中每個QueueRunner去開始它的執行緒執行入隊操作。

如果一切順利的話,你現在可以執行你的訓練步驟,同時佇列也會被後臺執行緒來填充。如果您設定了最大訓練迭代數,在某些時候,樣本出隊的操作可能會得到一個tf.OutOfRangeError的錯誤。這其實是TensorFlow的“檔案結束”(EOF) ———— 這就意味著已經達到了最大訓練迭代數,已經沒有更多可用的樣本了。

最後一個因素是Coordinator。這是負責在收到任何關閉訊號的時候,讓所有的執行緒都知道。最常用的是在發生異常時這種情況就會呈現出來,比如說其中一個執行緒在執行某些操作時出現錯誤(或一個普通的Python異常)。

想要了解更多的關於threading, queues, QueueRunners, and Coordinators的內容可以看這裡.

疑問: 在達到最大訓練迭代數的時候如何清理關閉執行緒?

想象一下,你有一個模型並且設定了最大訓練迭代數。這意味著,生成檔案的那個執行緒將只會在產生OutOfRange錯誤之前執行許多次。該QueueRunner會捕獲該錯誤,並且關閉檔名的佇列,最後退出執行緒。關閉佇列做了兩件事情:

  • 如果還試著對檔名佇列執行入隊操作時將發生錯誤。任何執行緒不應該嘗試去這樣做,但是當佇列因為其他錯誤而關閉時,這就會有用了。
  • 任何當前或將來出隊操作要麼成功(如果佇列中還有足夠的元素)或立即失敗(發生OutOfRange錯誤)。它們不會防止等待更多的元素被新增到佇列中,因為上面的一點已經保證了這種情況不會發生。

關鍵是,當在檔名佇列被關閉時候,有可能還有許多檔名在該佇列中,這樣下一階段的流水線(包括reader和其它預處理)還可以繼續執行一段時間。 一旦檔名佇列空了之後,如果後面的流水線還要嘗試從檔名佇列中取出一個檔名(例如,從一個已經處理完檔案的reader中),這將會觸發OutOfRange錯誤。在這種情況下,即使你可能有一個QueueRunner關聯著多個執行緒。如果這不是在QueueRunner中的最後那個執行緒,OutOfRange錯誤僅僅只會使得一個執行緒退出。這使得其他那些正處理自己的最後一個檔案的執行緒繼續執行,直至他們完成為止。 (但如果假設你使用的是tf.train.Coordinator,其他型別的錯誤將導致所有執行緒停止)。一旦所有的reader執行緒觸發OutOfRange錯誤,然後才是下一個佇列,再是樣本佇列被關閉。

同樣,樣本佇列中會有一些已經入隊的元素,所以樣本訓練將一直持續直到樣本佇列中再沒有樣本為止。如果樣本佇列是一個RandomShuffleQueue,因為你使用了shuffle_batch 或者shuffle_batch_join,所以通常不會出現以往那種佇列中的元素會比min_after_dequeue 定義的更少的情況。 然而,一旦該佇列被關閉,min_after_dequeue設定的限定值將失效,最終佇列將為空。在這一點來說,當實際訓練執行緒嘗試從樣本佇列中取出資料時,將會觸發OutOfRange錯誤,然後訓練執行緒會退出。一旦所有的培訓執行緒完成,tf.train.Coordinator.join會返回,你就可以正常退出了。

篩選記錄或產生每個記錄的多個樣本

舉個例子,有形式為[x, y, z]的樣本,我們可以生成一批形式為[batch, x, y, z]的樣本。 如果你想濾除這個記錄(或許不需要這樣的設定),那麼可以設定batch的大小為0;但如果你需要每個記錄產生多個樣本,那麼batch的值可以大於1。 然後很簡單,只需呼叫批處理函式(比如:shuffle_batch or shuffle_batch_join)去設定enqueue_many=True就可以實現。