1. 程式人生 > >TensorFlow使用QueueRunner和Coordinator來管理多執行緒佇列操作

TensorFlow使用QueueRunner和Coordinator來管理多執行緒佇列操作

TensorFlow中,佇列不僅是一種資料結構,還是非同步計算張量取值的一個重要機制。

TensorFlow提供了tf.Coordinatortf.QueueRunner兩個類來完成多執行緒協同的功能。以下程式碼用來簡單的展示兩者一起使用的示例。



import tensorflow as tf

# 先申明佇列
queue = tf.FIFOQueue(100, "float")
# 定義佇列的入隊操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 建立多個執行緒來執行入隊操作
# def __init__(self, queue=None, enqueue_ops=None, close_op=None,
#                cancel_op=None, queue_closed_exception_types=None,
#                queue_runner_def=None, import_scope=None):
# 表示建立了5個執行緒,每個執行緒中執行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# 加入到TensorFlow的計算圖中  使用TensorFlow的預設計算圖
# def add_queue_runner(qr, collection=ops.GraphKeys.QUEUE_RUNNERS):
tf.train.add_queue_runner(qr)
# 定義出隊操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator來協同啟動執行緒
    coord = tf.train.Coordinator()
    # 使用tf.trainQueueRunner 時,需要明確呼叫tf.train.start_queue_runners來啟動所有執行緒。否則會因為沒有執行緒執行入隊操作。
    # 當調用出隊操作時,程式會一直等待入隊操作被執行。(這裡是理解的重點)
    # tf.train.start_queue_runners 函式會預設啟動 tf.GraphKeys.QUEUE_RUNNERS集合總所有的QueueRunner。
    # 該函式只支援啟動指定集合中的QueueRunner,所以在使用tf.train.add_queue_runner()和tf.train.start_queue_runners
    # 時會指定同一個集合。
    # #def start_queue_runners(sess=None, coord=None, daemon=True, start=True,
    #                         collection=ops.GraphKeys.QUEUE_RUNNERS):
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 獲取佇列中的取值
    for _ in range(4):
        print(sess.run(out_tensor)[0])

    # 使用tf.train.Coordinator來停止所有執行緒
    coord.request_stop()
    coord.join()

上述程式將啟動五個執行緒來執行佇列入隊的操作,其中每一個執行緒都將是隨機數寫入佇列。
在每一次執行出隊操作時,可以得到一個隨機數。以下是其中一次執行得到的結果:
1.2411593
0.45116103
0.36767596
-0.15366153