tensorflow讀取資料(csv格式)
採用tensorflow訓練深度學習的網路的時候,一般資料集載入都有現成的程式,但是,我們應該知道整個載入流程,在參考一些學習資料之後,對整個流程做個總結。tensorflow有三種資料讀取方式,如下進行闡述:
(一)preloaded data:
直接由tensorflow產生資料,然後執行:
import tensorflow as tf
sess = tf.Session()
#設計計算圖及產生資料
x1 = tf.constant([1,2,3])
x2 = tf.constant([4,5,6])
y = tf.add(x1,x2)
#session計算
print (sess.run (y))
(二)feeding:
由python產生資料,然後由placeholder餵給tensorflow:
import tensorflow as tf
sess = tf.Session()
#設計計算圖
x1 = tf.placeholder(tf.int16)
x2 = tf.placeholder(tf.int16)#佔位符x1,x2
y = tf.add(x1,x2)
#python產生資料
p1 = [1,2,3]
p2 = [4,5,6]
#session計算
print (sess.run(y,feed_dict={x1: p1, x2: p2}))
(三)reading from file:
比較可知,方法(一)直接在圖裡面由tensorflow產生資料並計算,當資料量大的時候會出現圖傳輸錯誤問題。方法(二)當採用佔位符的時候,資料量過大也會是很大的傳輸開銷。故可採用tensorflow直接在從檔案中讀取資料,並解碼成可使用的格式。
(1)在開始之前,先講解幾個函式進行對比區分:
(a)tf.train.batch和tf.train.shuffle_batch
tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)
example, label: 表示樣本和樣本標籤,這個可以是一個樣本和一個樣本標籤
batch_size:返回的一個batch樣本集的樣本個數
capacity:佇列的長度
函式返回值都是一個按照輸入順序的batch的樣本和樣本標籤。
tf.train.shuffle_batch([example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue)
min_after_dequeue:是出隊後,佇列至少剩下的資料個數,小於capacity引數的值,否則會出錯。也就是說這個函式的輸出結果是一個亂序的樣本排列的batch,不是按照順序排列的,返回值都是一個隨機的batch的樣本及其對應的樣本標籤。
(b)tf.train.batch_join和tf.train.shuffle_batch_join
tf.train.batch與tf.train.shuffle_batch函式是單個Reader讀取,但是可以多執行緒。tf.train.batch_join與tf.train.shuffle_batch_join可設定多Reader讀取,每個Reader使用一個執行緒。至於兩種方法的效率,單Reader時,2個執行緒就達到了速度的極限。多Reader時,2個Reader就達到了極限。所以並不是執行緒越多越快,甚至更多的執行緒反而會使效率下降。
(2)程式實現
(a)首先在終端直接構造資料:
##構造3個檔案,A.csv,B.csv,C.csv
echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3">A.csv
echo -e "Bee1,B1\nBee2,B2\nBee3,B3">B.csv
echo -e "Sea1,C1\nSea2,C2\nSea3,C3">C.csv
echo -e "1,1,1,1,1\n0,1,0,1,0\n0,0,1,0,1\n1,0,0,1,0\n0,0,0,0,0">D.csv
(b)在.py檔案中操作
(b11)單個讀取,單個樣本(tf.train.batch)
import tensorflow as tf
#生成一個先入先出佇列和一個Queuerunner,生成檔名佇列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
#定義reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#定義 decoder
example, label = tf.decode_csv(value, record_defaults=[['null'],['null']])#['null']解析為string型別 ,[1]為整型,[1.0]解析為浮點。
example_batch, label_batch = tf.train.batch([example, label], batch_size=1, capacity=200, num_threads=2)#保證樣本和標籤一一對應
#執行圖
with tf.Session() as sess:
coord = tf.train.Coordinator()#建立一個協調器,管理執行緒
threads = tf.train.start_queue_runners(coord=coord)#啟動QueueRunner,此時檔名佇列已經進隊
for i in range(10):
e_val, l_val = sess.run([example_batch, label_batch])
print (e_val, l_val)
coord.request_stop()
coord.join(threads)
結果:按順序輸出
[b'Alpha1'] [b'A1']
[b'Alpha2'] [b'A2']
[b'Alpha3'] [b'A3']
[b'Bee1'] [b'B1']
[b'Bee2'] [b'B2']
[b'Bee3'] [b'B3']
[b'Sea1'] [b'C1']
[b'Sea2'] [b'C2']
[b'Sea3'] [b'C3']
[b'Alpha1'] [b'A1']
(b12)單個讀取,單個樣本(tf.train.shuffle_batch)
import tensorflow as tf
#生成一個先入先出佇列和一個Queuerunner,生成檔名佇列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
#定義reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#定義 decoder
example, label = tf.decode_csv(value, record_defaults=[['null'],['null']])
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=1, capacity=200,min_after_dequeue=100, num_threads=2)
#執行圖
with tf.Session() as sess:
coord = tf.train.Coordinator()#建立一個協調器,管理執行緒
threads = tf.train.start_queue_runners(coord=coord)#啟動QueueRunner,此時檔名佇列已經進隊
for i in range(10):
e_val, l_val = sess.run([example_batch, label_batch])
print (e_val, l_val)
coord.request_stop()
coord.join(threads)
結果:隨機順序輸出
[b'Sea1'] [b'C1']
[b'Sea3'] [b'C3']
[b'Alpha3'] [b'A3']
[b'Sea1'] [b'C1']
[b'Sea3'] [b'C3']
[b'Alpha1'] [b'A1']
[b'Bee1'] [b'B1']
[b'Bee3'] [b'B3']
[b'Sea2'] [b'C2']
[b'Alpha1'] [b'A1']
(b2)單個讀取,多個樣本:只需要修改上上述的 batch_size的值
(b31)多個讀取,多個樣本(tf.train.batch_join)
import tensorflow as tf
#生成一個先入先出佇列和一個Queuerunner,生成檔名佇列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
#定義reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#定義 decoder
record_defaults = [['null'], ['null']]
#定義了多種解碼器,每個解碼器跟一個reader相連
example_list = [tf.decode_csv(value, record_defaults=record_defaults) for _ in range(2)] # Reader設定為2
# 使用tf.train.batch_join(),可以使用多個reader,並行讀取資料。每個Reader使用一個執行緒。
example_batch, label_batch = tf.train.batch_join(example_list, batch_size=5)
#執行圖
with tf.Session() as sess:
coord = tf.train.Coordinator()#建立一個協調器,管理執行緒
threads = tf.train.start_queue_runners(coord=coord)#啟動QueueRunner,此時檔名佇列已經進隊
for i in range(10):
e_val, l_val = sess.run([example_batch, label_batch])
print (e_val, l_val)
coord.request_stop()
coord.join(threads)
結果:按順序輸出
[b'Alpha1' b'Alpha2' b'Alpha3' b'Bee1' b'Bee2'] [b'A1' b'A2' b'A3' b'B1' b'B2']
[b'Bee3' b'Sea1' b'Sea2' b'Sea3' b'Alpha1'] [b'B3' b'C1' b'C2' b'C3' b'A1']
[b'Alpha2' b'Alpha3' b'Bee1' b'Bee2' b'Bee3'] [b'A2' b'A3' b'B1' b'B2' b'B3']
[b'Sea1' b'Sea2' b'Sea3' b'Alpha1' b'Alpha2'] [b'C1' b'C2' b'C3' b'A1' b'A2']
[b'Alpha3' b'Bee1' b'Bee2' b'Bee3' b'Sea1'] [b'A3' b'B1' b'B2' b'B3' b'C1']
[b'Sea2' b'Sea3' b'Alpha1' b'Alpha2' b'Alpha3'] [b'C2' b'C3' b'A1' b'A2' b'A3']
[b'Bee1' b'Bee2' b'Bee3' b'Sea1' b'Sea2'] [b'B1' b'B2' b'B3' b'C1' b'C2']
[b'Sea3' b'Alpha1' b'Alpha2' b'Alpha3' b'Bee1'] [b'C3' b'A1' b'A2' b'A3' b'B1']
[b'Bee2' b'Bee3' b'Sea1' b'Sea2' b'Sea3'] [b'B2' b'B3' b'C1' b'C2' b'C3']
[b'Alpha1' b'Alpha2' b'Alpha3' b'Bee1' b'Bee2'] [b'A1' b'A2' b'A3' b'B1' b'B2']
(b32)多個讀取,多個樣本(tf.train.shuffle_batch_join)
import tensorflow as tf
#生成一個先入先出佇列和一個Queuerunner,生成檔名佇列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
#定義reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#定義 decoder
record_defaults = [['null'], ['null']]
#定義了多種解碼器,每個解碼器跟一個reader相連
example_list = [tf.decode_csv(value, record_defaults=record_defaults) for _ in range(3)] # Reader設定為3
# 使用tf.train.batch_join(),可以使用多個reader,並行讀取資料。每個Reader使用一個執行緒。
example_batch, label_batch = tf.train.shuffle_batch_join(example_list, batch_size=5,capacity=200,min_after_dequeue=100)
#執行圖
with tf.Session() as sess:
coord = tf.train.Coordinator()#建立一個協調器,管理執行緒
threads = tf.train.start_queue_runners(coord=coord)#啟動QueueRunner,此時檔名佇列已經進隊
for i in range(10):
e_val, l_val = sess.run([example_batch, label_batch])
print (e_val, l_val)
coord.request_stop()
coord.join(threads)
結果:隨機順序輸出
[b'Bee3' b'Alpha2' b'Bee2' b'Alpha2' b'Sea1'] [b'B3' b'A2' b'B2' b'A2' b'C1']
[b'Alpha1' b'Alpha3' b'Sea2' b'Alpha1' b'Alpha3'] [b'A1' b'A3' b'C2' b'A1' b'A3']
[b'Bee3' b'Alpha3' b'Alpha3' b'Bee2' b'Bee3'] [b'B3' b'A3' b'A3' b'B2' b'B3']
[b'Sea3' b'Sea2' b'Sea1' b'Bee2' b'Sea3'] [b'C3' b'C2' b'C1' b'B2' b'C3']
[b'Bee2' b'Bee2' b'Sea1' b'Alpha1' b'Alpha3'] [b'B2' b'B2' b'C1' b'A1' b'A3']
[b'Alpha3' b'Alpha1' b'Bee1' b'Bee1' b'Alpha2'] [b'A3' b'A1' b'B1' b'B1' b'A2']
[b'Bee3' b'Alpha2' b'Alpha1' b'Sea3' b'Sea1'] [b'B3' b'A2' b'A1' b'C3' b'C1']
[b'Bee1' b'Bee3' b'Sea1' b'Bee1' b'Bee3'] [b'B1' b'B3' b'C1' b'B1' b'B3']
[b'Sea2' b'Bee2' b'Alpha3' b'Alpha2' b'Alpha1'] [b'C2' b'B2' b'A3' b'A2' b'A1']
[b'Bee2' b'Bee3' b'Bee2' b'Bee3' b'Bee2'] [b'B2' b'B3' b'B2' b'B3' b'B2']
(b4)迭代控制,設定epoch引數,指定我們的樣本在訓練的時候只能被用多少輪
import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
#num_epoch: 設定迭代數
filename_queue = tf.train.string_input_producer(filenames, shuffle=False,num_epochs=3)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
record_defaults = [['null'], ['null']]
#定義了多種解碼器,每個解碼器跟一個reader相連
example_list = [tf.decode_csv(value, record_defaults=record_defaults) for _ in range(2)] # Reader設定為2
# 使用tf.train.batch_join(),可以使用多個reader,並行讀取資料。每個Reader使用一個執行緒。
example_batch, label_batch = tf.train.batch_join(example_list, batch_size=1)
#初始化本地變數
init_local_op = tf.initialize_local_variables()#如果不初始化,執行就會報錯
with tf.Session() as sess:
sess.run(init_local_op) #初始化
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
e_val, l_val = sess.run([example_batch, label_batch])
print (e_val, l_val)
except tf.errors.OutOfRangeError:
print('Epochs Complete!')
finally:
coord.request_stop()
coord.join(threads)
coord.request_stop()
coord.join(threads)
(b5)使用佇列讀取csv
import tensorflow as tf
# 生成一個先入先出佇列和一個QueueRunner,生成檔名佇列
filenames = ['D.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
# 定義Reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# 定義Decoder
record_defaults = [[1], [1], [1], [1], [1]] #解析為整數
col1, col2, col3, col4, col5 = tf.decode_csv(value,record_defaults=record_defaults)
features = tf.stack([col1, col2, col3])#前3列資料,後2列標籤
label = tf.stack([col4,col5])
example_batch, label_batch = tf.train.shuffle_batch([features,label], batch_size=1, capacity=200, min_after_dequeue=100, num_threads=2)
# 執行Graph
with tf.Session() as sess:
coord = tf.train.Coordinator() #建立一個協調器,管理執行緒
threads = tf.train.start_queue_runners(coord=coord)
for i in range(10):
e_val,l_val = sess.run([example_batch, label_batch])
print (e_val,l_val)
coord.request_stop()
coord.join(threads)
結果:
[[0 1 0]] [[1 0]]
[[1 0 0]] [[1 0]]
[[1 1 1]] [[1 1]]
[[1 1 1]] [[1 1]]
[[0 0 1]] [[0 1]]
[[0 0 1]] [[0 1]]
[[1 1 1]] [[1 1]]
[[0 1 0]] [[1 0]]
[[0 1 0]] [[1 0]]
[[1 0 0]] [[1 0]]