tensorflow 佇列管理之非同步操作
阿新 • • 發佈:2018-11-07
# -*- coding: utf-8 -*- """ Created on Thu Oct 25 09:49:34 2018 @author: Grey """ import tensorflow as tf #模擬非同步子執行緒存入樣本,主執行緒讀取樣本 #1.定義一個佇列,100 Q = tf.FIFOQueue(100,tf.float32) #2.定義子執行緒做的操作,迴圈+1 放入佇列 var = tf.Variable(0.0)#變數op #實現自增tf.assign_add() data=tf.assign_add(var,tf.constant(1.0)) en_q=Q.enqueue(data) #3.定義佇列管理器op,指定子執行緒的數量和操作 qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) #定義會話,執行op with tf.Session() as sess: #初始化var變數op tf.global_variables_initializer().run() # 正式開啟子執行緒 threads=qr.create_threads(sess,start=True) # 主執行緒,不斷讀取訓練資料 for i in range(300): print(sess.run(Q.dequeue())) pass
輸出正確,但是還是會報錯
CancelledError (see above for traceback): Enqueue operation was cancelled
[[Node: fifo_queue_57_enqueue = QueueEnqueue[Tcomponents=[DT_FLOAT], _class=["loc:@fifo_queue_57"], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](fifo_queue_57, AssignAdd_31)]]
原因:主執行緒結束,意味著Session結束,資源釋放,但是子執行緒還在執行,所以出錯。
解決方案:執行緒協調器關閉子執行緒
非同步執行的關鍵是 建立子執行緒並及進行關閉
如下圖為新增關閉子執行緒的非同步執行的框架:
程式碼:
# -*- coding: utf-8 -*- """ Created on Thu Oct 25 09:49:34 2018 @author: Grey """ import tensorflow as tf Q = tf.FIFOQueue(1000,dtypes=tf.float32) var = tf.Variable(0.0) data=tf.assign_add(var,tf.constant(1.0)) en_q=Q.enqueue(data) qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) with tf.Session() as sess: tf.global_variables_initializer().run() coord=tf.train.Coordinator() threads=qr.create_threads(sess,coord=coord,start=True) # 主執行緒,不斷讀取訓練資料 for i in range(100): print(sess.run(Q.dequeue())) # 回收子執行緒 coord.request_stop() coord.join(threads) pass
輸出結果正確且無任何報錯: