1. 程式人生 > >tensorflow 佇列管理之非同步操作

tensorflow 佇列管理之非同步操作

# -*- coding: utf-8 -*-
"""
Created on Thu Oct 25 09:49:34 2018

@author: Grey
"""
import tensorflow as tf
#模擬非同步子執行緒存入樣本,主執行緒讀取樣本
#1.定義一個佇列,100
Q = tf.FIFOQueue(100,tf.float32)

#2.定義子執行緒做的操作,迴圈+1 放入佇列
var = tf.Variable(0.0)#變數op
#實現自增tf.assign_add()
data=tf.assign_add(var,tf.constant(1.0))
en_q=Q.enqueue(data)
#3.定義佇列管理器op,指定子執行緒的數量和操作
qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)



#定義會話,執行op
with tf.Session() as sess:
#初始化var變數op
    tf.global_variables_initializer().run()
#    正式開啟子執行緒
    threads=qr.create_threads(sess,start=True)
#    主執行緒,不斷讀取訓練資料
    for i in range(300):
        print(sess.run(Q.dequeue()))
    pass

輸出正確,但是還是會報錯
CancelledError (see above for traceback): Enqueue operation was cancelled
         [[Node: fifo_queue_57_enqueue = QueueEnqueue[Tcomponents=[DT_FLOAT], _class=["loc:@fifo_queue_57"], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](fifo_queue_57, AssignAdd_31)]]

原因:主執行緒結束,意味著Session結束,資源釋放,但是子執行緒還在執行,所以出錯。

解決方案:執行緒協調器關閉子執行緒

非同步執行的關鍵是 建立子執行緒並及進行關閉

如下圖為新增關閉子執行緒的非同步執行的框架:

程式碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Oct 25 09:49:34 2018

@author: Grey
"""
import tensorflow as tf

Q = tf.FIFOQueue(1000,dtypes=tf.float32)

var = tf.Variable(0.0)
data=tf.assign_add(var,tf.constant(1.0))
en_q=Q.enqueue(data)

qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)

with tf.Session() as sess:

    tf.global_variables_initializer().run()

    coord=tf.train.Coordinator() 
    threads=qr.create_threads(sess,coord=coord,start=True)

#    主執行緒,不斷讀取訓練資料
    for i in range(100):
        print(sess.run(Q.dequeue()))
        
#        回收子執行緒
    coord.request_stop()
    coord.join(threads)
    pass

輸出結果正確且無任何報錯: