tensorflow 程式掛起的原因,即整個程序不報錯又不執行的原因
一、說明:在使用tensorflow的過程中,出現過程式不報錯又不接下去執行的錯誤,後來分析了原因是tf的資料執行緒沒有啟動,導致資料流圖沒辦法計算,整個程式就卡在哪裡。
更深層次的原因是tensorflow的計算和資料讀入是非同步的,合理的方式是主執行緒進行模型的訓練,然後開一個數據讀入執行緒非同步讀入資料.tensorflow會在記憶體中維護一個佇列,然後資料執行緒非同步從磁碟中將樣本推入隊列當中。並且,因為tensorflow的訓練和讀資料是非同步的,故即使當前沒有資料進來,tensorflow也沒辦法報錯,因為可能接下來會有資料進佇列,所以,tensorflow就一直處於等待的狀態
說明:我是在修改Tensorflow的原始碼ptb_word_lm.py的時候遇到上述的問題的。下面就該原始碼來解釋說明這個問題:
tensorflow的reader.py檔案:
"""Utilities for parsing PTB text files.""" #-*- coding:utf-8 -*- from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import os import tensorflow as tf #將檔案中所有的word收集起來 def _read_words(filename): with tf.gfile.GFile(filename, "r") as f: return f.read().decode("utf-8").replace("\n", "<eos>").split() #將收集到的word對映到id def _build_vocab(filename): data = _read_words(filename) counter = collections.Counter(data) count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0])) words, _ = list(zip(*count_pairs)) word_to_id = dict(zip(words, range(len(words)))) return word_to_id #使用訓練集的word建立word的對映表 # def _file_to_word_ids(filename, word_to_id): data = _read_words(filename) return [word_to_id[word] for word in data if word in word_to_id] def ptb_raw_data(data_path=None): """Load PTB raw data from data directory "data_path". Reads PTB text files, converts strings to integer ids, and performs mini-batching of the inputs. The PTB dataset comes from Tomas Mikolov's webpage: http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz Args: data_path: string path to the directory where simple-examples.tgz has been extracted. Returns: tuple (train_data, valid_data, test_data, vocabulary) where each of the data objects can be passed to PTBIterator. """ train_path = os.path.join(data_path, "ptb.train.txt") valid_path = os.path.join(data_path, "ptb.valid.txt") test_path = os.path.join(data_path, "ptb.test.txt") word_to_id = _build_vocab(train_path) train_data = _file_to_word_ids(train_path, word_to_id) valid_data = _file_to_word_ids(valid_path, word_to_id) test_data = _file_to_word_ids(test_path, word_to_id) vocabulary = len(word_to_id) return train_data, valid_data, test_data, vocabulary def ptb_producer(raw_data, batch_size, num_steps, name=None): """Iterate on the raw PTB data. This chunks up raw_data into batches of examples and returns Tensors that are drawn from these batches. Args: raw_data: one of the raw data outputs from ptb_raw_data. batch_size: int, the batch size. num_steps: int, the number of unrolls. name: the name of this operation (optional). Returns: A pair of Tensors, each shaped [batch_size, num_steps]. The second element of the tuple is the same data time-shifted to the right by one. Raises: tf.errors.InvalidArgumentError: if batch_size or num_steps are too high. """ with tf.name_scope(name, "PTBProducer", [raw_data, batch_size, num_steps]): raw_data = tf.convert_to_tensor(raw_data, name="raw_data", dtype=tf.int32) data_len = tf.size(raw_data) batch_len = data_len // batch_size data = tf.reshape(raw_data[0 : batch_size * batch_len], [batch_size, batch_len]) epoch_size = (batch_len - 1) // num_steps assertion = tf.assert_positive( epoch_size, message="epoch_size == 0, decrease batch_size or num_steps") with tf.control_dependencies([assertion]): epoch_size = tf.identity(epoch_size, name="epoch_size") i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue() x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps]) y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps]) return x, y
說明:詳解這個reader.py檔案:
1、產生一個佇列,裡面的數是0到epoch_size-1.然後定義了一個出隊操作,說明佇列也是資料流圖中的一個結點.使用了range_input_producer之後,會自動產生一個QueueRunner. A QueueRunner
for
the Queue is added to the current Graph
'sQUEUE_RUNNER
collection.
i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
2、定義了切片操作,返回訓練樣本的x和y
3、具體使用說明:x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps]) y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
在使用的過程中,只要每次迭代的時候,我們取一下x,y。那麼,就會觸發跟x,y相關聯的操作,也即出隊操作和切片操作,為我們生成資料.但是,通過佇列的方式來讀入資料都是一種多執行緒讀入資料的方式,要在session當中將該執行緒開啟,不然就會掛起。
二、分析錯誤的情況&相應的修改辦法
1、錯誤的情況
#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf
from tensorflow.models.rnn.ptb import reader
class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#為何要進行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)
class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"
#wrong,使用session就會出現讀不出資料的錯誤,讀不出資料,整個資料流圖就無法計算,整個程式就處於掛起的狀態
#使用session會出錯
with tf.Session() as sess:
for step in range(1):
print sess.run(train_input.input_data)
說明:在Session當中,沒有啟動資料讀入執行緒。所以,sess.run(train_input.input_data)就是無資料可取,程式就處於一種掛起的狀態。
2、解決方案
#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf
from tensorflow.models.rnn.ptb import reader
class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#為何要進行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)
class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"
#right,使用Supervisor()
#sv = tf.train.Supervisor()
#with sv.managed_session() as sess:
# for step in range(1):
# print sess.run(train_input.input_data)
#right
# Create a session for running operations in the Graph.
sess = tf.Session()
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# Run training steps or whatever
try:
for step in range(2):
print sess.run(train_input.input_data)
except Exception,e:
#Report exceptions to the coordinator
coord.request_stop(e)
coord.request_stop()
# Terminate as usual. It is innocuous to request stop twice.
coord.join(threads)
sess.close()
說明:使用tf.train.range_input_producer(epoch_size, shuffle=False),會預設將
QueueRunner
新增到全域性圖中,我們必須使用tf.train.start_queue_runners(sess=sess),去啟動該執行緒。然後使用coord
= tf.train.Coordinator()去做一些執行緒的同步工作。
3、解決方案:
#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf
from tensorflow.models.rnn.ptb import reader
class PTBInput(object):
"""The input data."""
def __init__(self, config, data, name=None):
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
#為何要進行-1操作
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)
class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
if __name__ == '__main__':
config = SmallConfig()
data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'
raw_data = reader.ptb_raw_data(data_path)
train_data, valid_data, test_data, _ = raw_data
train_input = PTBInput(config=config, data=train_data, name="TrainInput")
print "end--------------------------------"
#right,使用Supervisor()
sv = tf.train.Supervisor()
with sv.managed_session() as sess:
for step in range(1):
print sess.run(train_input.input_data)
說明:使用sv = tf.train.Supervisor()會比較方便,文件上說,The Supervisor is a small wrapper around a
Coordinator
,
a Saver
, and a SessionManager
也即使用了Supervisor(),那麼儲存模型,執行緒同步的事情都不用我們去幹涉了。