TensorFlow中佇列與執行緒
1. FIFOQueue : 先入先出的佇列
import tensorflow as tf # 在使用迴圈神經網路時,希望讀入的訓練樣本是有序的可使用FIFOQueue # 先建立一個先入先出的佇列,初始化佇列插入0.1,0.2,0.3三個數字 q = tf.FIFOQueue(3,tf.float32) init = q.enqueue_many(([0.1,0.2,0.3],)) # 此時資料填充並沒有完成,而是做出了一個預備工作,真正的工作要在會話中完成。 # 定義出隊,+1,入隊操作 x = q.dequeue() y = x+1 q_add = q.enqueue(y) with tf.Session() as sess: sess.run(init) quelen = sess.run(q.size()) for i in range(quelen): sess.run(q_add) # 執行兩次操作,佇列中的值變為0.3,1.1,1.2 for j in range(quelen): print(sess.run(q.dequeue())) # 輸出佇列的值
可以發現,佇列的操作是在主執行緒的對話中依次完成。
2. RandomShuffleQueue:隨機佇列
# 隨機佇列,在出佇列時以隨機的順序彈出元素 # 例如,我們在訓練一些影象樣本時,使用CNN的網路結構,希望可以無序的讀入訓練樣本 q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=tf.float32) # capacity:佇列最大長度, min_after_dequeue:出隊後最小長度 # 官網推薦:capacity > min_after_dequeue + (num_threads + a smaller safety margin) * batch_size with tf.Session() as sess : for i in range(0,10): # 10次入隊 sess.run(q.enqueue(i)) for i in range(0,8): # 8次出隊 print(sess.run(q.dequeue())) # 在佇列長度等於最小值時,執行出隊操作,會發生阻斷 # 在佇列長度等於最大值時,執行入隊操作,會發生阻斷 # 解除阻斷的一種方法---設定等待時間 # run_options = tf.RunOptions(time_out_in_ms = 100000) # 等待十秒 # try: # sess.run(q.dequeue(),options=run_options) # except tf.errors.DeadlineExceededError: # print('out of range')
3. 佇列管理器
Tensorflow提供了QueueRunner函式用以解決非同步操作問題。其可建立一系列的執行緒同時進入主執行緒內進行操作,資料的讀取與操作是同步,即主執行緒在進行訓練模型的工作的同時將資料從硬碟讀入。
q = tf.FIFOQueue(1000, tf.float32) counter = tf.Variable(0.0) # 計數器 add_op = tf.assign_add(counter, tf.constant(1.0)) # 操作給計數器加一 enquenceData_op = q.enqueue(counter) # 操作: 讓計數器加入佇列 # 建立一個佇列管理器QueueRunner,用這兩個操作向佇列q中新增元素,目前我們只使用一個執行緒 qr = tf.train.QueueRunner(q, enqueue_ops=[add_op, enquenceData_op] * 1) # 啟動一個會話,從佇列管理器qr中建立執行緒 with tf.Session() as sess: sess.run(tf.global_variables_initializer()) enquence_threads = qr.create_threads(sess, start=True) # 啟用入隊執行緒 # 主執行緒 for i in range(10): print(sess.run(q.dequeue()))
WTF???報錯了???讀者可以自行思考一會再看下文解釋!!!
正如TensorFlow中的其他元件一樣,佇列就是TensorFlow圖中一種有狀態的節點,像變數一樣,其他節點可以修改其內容。在TensorFlow中,佇列不僅僅是一種資料結構,更是“非同步張量取值”的一個重要機制。比如多個執行緒可以同時向一個佇列中寫元素,或者同時讀取一個佇列中的元素。TensorFlow提供了兩個類來幫助多執行緒的實現:tf.Coordinator與tf.QueueRunner。從設計上這兩個類必須被一起使用。
看到這裡,你應該知道報錯的原因了!但為什麼呢?QueueRunner建立的執行緒沒有得到很好的協調,所以必須與Coordinator一起使用。
4. 執行緒協調器
Coordinator類用來幫助多個執行緒協同工作,多個執行緒同步終止。 其主要方法有: should_stop():如果執行緒應該停止則返回True。 request_stop(<exception>):請求該執行緒停止。 join(<list of threads>):等待被指定的執行緒終止。 大概過程:首先建立一個 Coordinator 物件,然後建立一些使用Coordinator物件的執行緒。這些執行緒通常一直迴圈執行,一直到should_stop()返回True時停止。 任何執行緒都可以決定計算什麼時候應該停止,只需要呼叫request_stop()即可,同時其他執行緒的should_stop()將會返回True,最終停止。
import tensorflow as tf
q = tf.FIFOQueue(1000, tf.float32)
counter = tf.Variable(0.0) # 計數器
add_op = tf.assign_add(counter, tf.constant(1.0)) # 操作給計數器加一
enquenceData_op = q.enqueue(counter) # 操作: 讓計數器加入佇列
# 第一種情況,在關閉其他執行緒之後(除主執行緒之外的其它執行緒),調用出隊操作
print('第一種情況,在關閉其他執行緒之後(除主執行緒之外的其它執行緒),調用出隊操作')
# 建立一個佇列管理器QueueRunner,用這兩個操作向佇列q中新增元素,目前我們只使用一個執行緒
qr = tf.train.QueueRunner(q, enqueue_ops=[add_op, enquenceData_op] * 1)
# 主執行緒
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# Coordinator: 協調器, 協調執行緒間的關係,可以視為一種訊號量,用來做同步
coord = tf.train.Coordinator()
# 啟動入隊執行緒,協調器是執行緒的引數
enqueue_threads = qr.create_threads(sess,coord=coord,start=True)
# 主執行緒
for i in range(0,10):
print(sess.run(q.dequeue()))
coord.request_stop() # 通知其他執行緒關閉
coord.join(enqueue_threads) # join 操作等待其他執行緒結束,其他所有執行緒關閉之後,這一函式才能返回
# 第二種情況: 在佇列執行緒關閉之後,調用出隊操作
print('第二種情況: 在佇列執行緒關閉之後,調用出隊操作-->處理tf.errors.OutOfRange錯誤')
# q啟動入隊執行緒
enqueueData_threads = qr.create_threads(sess,coord=coord,start=True)
coord.request_stop() # 通知其他執行緒關閉
# 主執行緒
for j in range(0,10):
try:
print(sess.run(q.dequeue()))
except tf.errors.OutOfRangeError:
break
coord.join(enqueueData_threads) # join 操作等待其他執行緒結束,其他所有執行緒關閉之後,這一函式才能返回