1. 程式人生 > >TensorFlow中佇列與執行緒

TensorFlow中佇列與執行緒

1. FIFOQueue : 先入先出的佇列

import tensorflow as tf

# 在使用迴圈神經網路時,希望讀入的訓練樣本是有序的可使用FIFOQueue
# 先建立一個先入先出的佇列,初始化佇列插入0.1,0.2,0.3三個數字
q = tf.FIFOQueue(3,tf.float32)
init = q.enqueue_many(([0.1,0.2,0.3],))    # 此時資料填充並沒有完成,而是做出了一個預備工作,真正的工作要在會話中完成。
# 定義出隊,+1,入隊操作
x = q.dequeue()
y = x+1
q_add = q.enqueue(y)

with tf.Session() as sess:
    sess.run(init)
    quelen = sess.run(q.size())
    for i in range(quelen):
        sess.run(q_add)   # 執行兩次操作,佇列中的值變為0.3,1.1,1.2

    for j in range(quelen):
        print(sess.run(q.dequeue()))      # 輸出佇列的值

可以發現,佇列的操作是在主執行緒的對話中依次完成。

2. RandomShuffleQueue:隨機佇列

# 隨機佇列,在出佇列時以隨機的順序彈出元素
# 例如,我們在訓練一些影象樣本時,使用CNN的網路結構,希望可以無序的讀入訓練樣本
q = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=tf.float32)
# capacity:佇列最大長度,  min_after_dequeue:出隊後最小長度
# 官網推薦:capacity > min_after_dequeue + (num_threads + a smaller safety margin) * batch_size
with tf.Session() as sess :
    for i in range(0,10):    # 10次入隊
        sess.run(q.enqueue(i))

    for i in range(0,8):     # 8次出隊
        print(sess.run(q.dequeue()))

# 在佇列長度等於最小值時,執行出隊操作,會發生阻斷
# 在佇列長度等於最大值時,執行入隊操作,會發生阻斷

# 解除阻斷的一種方法---設定等待時間
# run_options = tf.RunOptions(time_out_in_ms = 100000) # 等待十秒
# try:
#     sess.run(q.dequeue(),options=run_options)
# except tf.errors.DeadlineExceededError:
#     print('out of range')

3. 佇列管理器

Tensorflow提供了QueueRunner函式用以解決非同步操作問題。其可建立一系列的執行緒同時進入主執行緒內進行操作,資料的讀取與操作是同步,即主執行緒在進行訓練模型的工作的同時將資料從硬碟讀入

q = tf.FIFOQueue(1000, tf.float32)
counter = tf.Variable(0.0)  # 計數器
add_op = tf.assign_add(counter, tf.constant(1.0))  # 操作給計數器加一
enquenceData_op = q.enqueue(counter)  # 操作: 讓計數器加入佇列

# 建立一個佇列管理器QueueRunner,用這兩個操作向佇列q中新增元素,目前我們只使用一個執行緒
qr = tf.train.QueueRunner(q, enqueue_ops=[add_op, enquenceData_op] * 1)

# 啟動一個會話,從佇列管理器qr中建立執行緒
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    enquence_threads = qr.create_threads(sess, start=True)  # 啟用入隊執行緒
    # 主執行緒
    for i in range(10):
        print(sess.run(q.dequeue()))

WTF???報錯了???讀者可以自行思考一會再看下文解釋!!!

正如TensorFlow中的其他元件一樣,佇列就是TensorFlow圖中一種有狀態的節點,像變數一樣,其他節點可以修改其內容。在TensorFlow中,佇列不僅僅是一種資料結構,更是“非同步張量取值”的一個重要機制。比如多個執行緒可以同時向一個佇列中寫元素,或者同時讀取一個佇列中的元素。TensorFlow提供了兩個類來幫助多執行緒的實現:tf.Coordinator與tf.QueueRunner。從設計上這兩個類必須被一起使用

看到這裡,你應該知道報錯的原因了!但為什麼呢?QueueRunner建立的執行緒沒有得到很好的協調,所以必須與Coordinator一起使用。

4. 執行緒協調器

Coordinator類用來幫助多個執行緒協同工作,多個執行緒同步終止。 其主要方法有:     should_stop():如果執行緒應該停止則返回True。     request_stop(<exception>):請求該執行緒停止。     join(<list of threads>):等待被指定的執行緒終止。 大概過程:首先建立一個 Coordinator 物件,然後建立一些使用Coordinator物件的執行緒。這些執行緒通常一直迴圈執行,一直到should_stop()返回True時停止。 任何執行緒都可以決定計算什麼時候應該停止,只需要呼叫request_stop()即可,同時其他執行緒的should_stop()將會返回True,最終停止。

import tensorflow as tf

q = tf.FIFOQueue(1000, tf.float32)
counter = tf.Variable(0.0)  # 計數器
add_op = tf.assign_add(counter, tf.constant(1.0))  # 操作給計數器加一
enquenceData_op = q.enqueue(counter)  # 操作: 讓計數器加入佇列

# 第一種情況,在關閉其他執行緒之後(除主執行緒之外的其它執行緒),調用出隊操作
print('第一種情況,在關閉其他執行緒之後(除主執行緒之外的其它執行緒),調用出隊操作')
# 建立一個佇列管理器QueueRunner,用這兩個操作向佇列q中新增元素,目前我們只使用一個執行緒
qr = tf.train.QueueRunner(q, enqueue_ops=[add_op, enquenceData_op] * 1)


# 主執行緒
sess = tf.Session()
sess.run(tf.global_variables_initializer())

# Coordinator: 協調器, 協調執行緒間的關係,可以視為一種訊號量,用來做同步
coord = tf.train.Coordinator()

# 啟動入隊執行緒,協調器是執行緒的引數
enqueue_threads = qr.create_threads(sess,coord=coord,start=True)

# 主執行緒
for i in range(0,10):
    print(sess.run(q.dequeue()))

coord.request_stop() # 通知其他執行緒關閉
coord.join(enqueue_threads) # join 操作等待其他執行緒結束,其他所有執行緒關閉之後,這一函式才能返回


# 第二種情況: 在佇列執行緒關閉之後,調用出隊操作
print('第二種情況: 在佇列執行緒關閉之後,調用出隊操作-->處理tf.errors.OutOfRange錯誤')
# q啟動入隊執行緒
enqueueData_threads = qr.create_threads(sess,coord=coord,start=True)

coord.request_stop()           # 通知其他執行緒關閉

# 主執行緒
for j in range(0,10):
    try:
        print(sess.run(q.dequeue()))
    except tf.errors.OutOfRangeError:
        break

coord.join(enqueueData_threads)     # join 操作等待其他執行緒結束,其他所有執行緒關閉之後,這一函式才能返回