1. 程式人生 > >tensorflow入門學習(2)——tensorflow資料讀取&多執行緒

tensorflow入門學習(2)——tensorflow資料讀取&多執行緒

一、供給資料

TensorFlow的資料供給機制允許你在TensorFlow運算圖中將資料注入到任一張量中。因此,python運算可以把資料直接設定到TensorFlow圖中。
通過給run()或者eval()函式輸入feed_dict引數, 可以啟動運算過程。

with tf.Session():
    input = tf.placeholder(tf.float32)
    classifier = ...
    print classifier.eval(feed_dict={input: my_python_preprocessing_fn()})

二、從檔案讀取資料

首先要知道你要讀取的檔案的格式,選擇對應的檔案讀取器;

CSV 檔案

從CSV檔案中讀取資料, 需要使用TextLineReader和decode_csv 操作, 如下面的例子所示:

1. tf.train.string_input_producer()

選擇要讀取的檔案的名字形成list,用 tf.train.string_input_producer 函式來生成檔名佇列,
這個函式可以設定shuffle = Ture,來打亂佇列,可以設定epoch = 5,過5遍訓練資料

2. tf.TextFileReader(), tf.decode_csv()

選擇的檔案讀取器,解碼器,讀取檔名佇列並解碼
CSV 那樣的文字檔案,用的檔案讀取器和解碼器就是 TextLineReader 和 decode_csv

二進位制檔案

圖片檔案

三、Tensorflow多執行緒

TensorFlow的Session物件是支援多執行緒的,可以在同一個會話(Session)中建立多個執行緒,並行執行。在Session中的所有執行緒都必須能被同步終止,異常必須能被正確捕獲並報告,會話終止的時候, 佇列必須能被正確地關閉。

TensorFlow提供了兩個類來實現對Session中多執行緒的管理:tf.Coordinator和 tf.QueueRunner,這兩個類往往一起使用。

3. tf.train.Coordinator(): tensorflow協調器

Coordinator類用來管理在Session中的多個執行緒,可以用來同時停止多個工作執行緒並且向那個在等待所有工作執行緒終止的程式報告異常,該執行緒捕獲到這個異常之後就會終止所有執行緒。使用 tf.train.Coordinator()來建立一個執行緒管理器(協調器)物件。

4. tf.train.start_queue_runners()

QueueRunner類用來啟動tensor的入隊執行緒,可以用來啟動多個工作執行緒同時將多個tensor(訓練資料)推送入檔名稱佇列中,具體執行函式是 tf.train.start_queue_runners , 只有呼叫 tf.train.start_queue_runners 之後,才會真正把tensor推入記憶體序列中,供計算單元呼叫,否則會由於記憶體序列為空,資料流圖會處於一直等待狀態。

tf中的資料讀取機制如下圖:

1.呼叫 tf.train.slice_input_producer,從 本地檔案裡抽取tensor,準備放入Filename Queue(檔名佇列)中;
2.呼叫 tf.train.batch,從檔名佇列中提取tensor,使用單個或多個執行緒,準備放入檔案佇列;
3.呼叫 tf.train.Coordinator() 來建立一個執行緒協調器,用來管理之後在Session中啟動的所有執行緒;
4.呼叫tf.train.start_queue_runners, 啟動入隊執行緒,由多個或單個執行緒,按照設定規則,把檔案讀入Filename Queue中。函式返回執行緒ID的列表,一般情況下,系統有多少個核,就會啟動多少個入隊執行緒(入隊具體使用多少個執行緒在tf.train.batch中定義);
5.檔案從 Filename Queue中讀入記憶體佇列的操作不用手動執行,由tf自動完成;
6.呼叫sess.run 來啟動資料出列和執行計算;
7.使用coord.should_stop()來查詢是否應該終止所有執行緒,當檔案佇列(queue)中的所有檔案都已經讀取出列的時候,會丟擲一個 OutofRangeError 的異常,這時候就應該停止Sesson中的所有執行緒了;
8.使用coord.request_stop()來發出終止所有執行緒的命令,使用coord.join(threads)把執行緒加入主執行緒,等待threads結束。

示例:

這裡寫圖片描述
這裡寫圖片描述

5. tf.cast() 型別轉化函式

tf.cast(x, dtype, name=None)
輸入:
x:輸入
dtype:轉換目標型別
name:名稱

返回:Tensor

6. tf.train.slice_input_producer()

tf.train.slice_input_producer是一個tensor生成器,作用是按照設定,每次從一個tensor列表中按順序或者隨機抽取出一個tensor放入檔名佇列。

    slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,  
                             capacity=32, shared_name=None, name=None)  
第一個引數tensor_list:包含一系列tensor的列表,表中tensor的第一維度的值必須相等,即個數必須相等,有多少個影象,就應該有多少個對應的標籤。
第二個引數num_epochs: 可選引數,是一個整數值,代表迭代的次數,如果設定 num_epochs=None,生成器可以無限次遍歷tensor列表,如果設定為 num_epochs=N,生成器只能遍歷tensor列表N次。
第三個引數shuffle: bool型別,設定是否打亂樣本的順序。一般情況下,如果shuffle=True,生成的樣本順序就被打亂了,在批處理的時候不需要再次打亂樣本,使用 tf.train.batch函式就可以了;如果shuffle=False,就需要在批處理時候使用 tf.train.shuffle_batch函式打亂樣本。
第四個引數seed: 可選的整數,是生成隨機數的種子,在第三個引數設定為shuffle=True的情況下才有用。
第五個引數capacity:設定tensor列表的容量。
第六個引數shared_name:可選引數,如果設定一個‘shared_name’,則在不同的上下文環境(Session)中可以通過這個名字共享生成的tensor。
第七個引數name:可選,設定操作的名稱。

tf.train.slice_input_producer 函式中設定了 num_epochs 的數量, 所以在檔案佇列末尾有結束標誌,讀到這個結束標誌的時候丟擲 OutofRangeError 異常,就可以結束各個執行緒了。
按照0,1,2,3,4,0,1,2,3…
如果不設定 num_epochs 的數量,則檔案佇列是無限迴圈的,沒有結束標誌,程式會一直執行下去。

7. tf.train.batch()

tf.train.batch是一個tensor佇列生成器,作用是按照給定的tensor順序,把batch_size個tensor推送到檔案佇列,作為訓練一個batch的資料,等待tensor出隊執行計算。

    batch(tensors, batch_size, num_threads=1, capacity=32,  
              enqueue_many=False, shapes=None, dynamic_pad=False,  
              allow_smaller_final_batch=False, shared_name=None, name=None)  
第一個引數tensors:tensor序列或tensor字典,可以是含有單個樣本的序列;
第二個引數batch_size: 生成的batch的大小;
第三個引數num_threads:執行tensor入隊操作的執行緒數量,可以設定使用多個執行緒同時並行執行,提高執行效率,但也不是數量越多越好;
第四個引數capacity: 定義生成的tensor序列的最大容量;
第五個引數enqueue_many: 定義第一個傳入引數tensors是多個tensor組成的序列,還是單個tensor;
第六個引數shapes: 可選引數,預設是推測出的傳入的tensor的形狀;
第七個引數dynamic_pad: 定義是否允許輸入的tensors具有不同的形狀,設定為True,會把輸入的具有不同形狀的tensor歸一化到相同的形狀;
第八個引數allow_smaller_final_batch: 設定為True,表示在tensor佇列中剩下的tensor數量不夠一個batch_size的情況下,允許最後一個batch的數量少於batch_size, 設定為False,則不管什麼情況下,生成的batch都擁有batch_size個樣本;
第九個引數shared_name: 可選引數,設定生成的tensor序列在不同的Session中的共享名稱;
第十個引數name: 操作的名稱;

如果tf.train.batch的第一個引數 tensors 傳入的是tensor列表或者字典,返回的是tensor列表或字典,如果傳入的是隻含有一個元素的列表,返回的是單個的tensor,而不是一個列表。

8. tf.Session().run()

tf.session.run()單函式執行和多函式執行區別

sess.run([a,b]) # (1)同時執行a,b兩個函式

sess.run(a)
sess.run(b)     # (2)執行完a函式後再執行b函式

這兩個語句初看時沒有任何區別,但是如果a,b函式恰好是讀取example_batch和label_batch這種需要使用到 資料批次輸入輸出函式時 例如(tf.train.shuffle_batch.tf.reader.read).

(1)式只會呼叫一次輸入資料函式,則得到的example_batch和label_batch來自同一批次。
(2)式會單獨呼叫兩次輸入資料函式,則得到的example_batch來自上一批次而label_batch來自下一批次。

這個需要十分注意,因為如果我們想要實時打印出label_batch和inference(example_batch)時,即將輸入資料的標籤和經過模型預測推斷的結果進行比較時.如果我們使用(2)中的寫法,則label_batch和inference(example_batch)並不是來自與同一批次資料。

所以tf.train.batch() 得到的 image_batch 和 label_batch 要一起放入sess.run()中

PS: 一般在session結束後,最好要加一個sess.close()不然很有可能忘記關掉,一直執行,佔用資源
可能會報錯:Tensorflow+InternalError: Blas GEMM launch failed
因為session用畢沒有及時close, 導致系統和GPU的很大部分被佔用過卻沒有歸還, 當前資源便不夠了; 或是多個session爭用GPU.