1. 程式人生 > 程式設計 >Tensorflow 多執行緒與多程序資料載入例項

Tensorflow 多執行緒與多程序資料載入例項

在專案中遇到需要處理超級大量的資料集,無法載入記憶體的問題就不用說了,單執行緒分批讀取和處理(雖然這個處理也只是特別簡單的首尾相連的操作)也會使瓶頸出現在CPU效能上,所以研究了一下多執行緒和多程序的資料讀取和預處理,都是通過呼叫dataset api實現

1. 多執行緒資料讀取

第一種方法是可以直接從csv裡讀取資料,但返回值是tensor,需要在sess裡run一下才能返回真實值,無法實現真正的並行處理,但如果直接用csv檔案或其他什麼檔案存了特徵值,可以直接讀取後進行訓練,可使用這種方法.

import tensorflow as tf

#這裡是返回的資料型別,具體內容無所謂,型別對應就好了,比如我這個,就是一個四維的向量,前三維是字串型別 最後一維是int型別
record_defaults = [[""],[""],[0]]


def decode_csv(line):
 parsed_line = tf.decode_csv(line,record_defaults)
 label = parsed_line[-1]  # label 
 del parsed_line[-1]   # delete the last element from the list
 features = tf.stack(parsed_line) # Stack features so that you can later vectorize forward prop.,etc.
 #label = tf.stack(label)   #NOT needed. Only if more than 1 column makes the label...
 batch_to_return = features,label
 return batch_to_return

filenames = tf.placeholder(tf.string,shape=[None])
dataset5 = tf.data.Dataset.from_tensor_slices(filenames)
#在這裡設定執行緒數目
dataset5 = dataset5.flat_map(lambda filename: tf.data.TextLineDataset(filename).skip(1).map(decode_csv,num_parallel_calls=15)) 
dataset5 = dataset5.shuffle(buffer_size=1000)
dataset5 = dataset5.batch(32) #batch_size
iterator5 = dataset5.make_initializable_iterator()
next_element5 = iterator5.get_next()

#這裡是需要載入的檔名
training_filenames = ["train.csv"]
validation_filenames = ["vali.csv"]

with tf.Session() as sess:

 for _ in range(2):  
 	#通過檔名初始化迭代器
  sess.run(iterator5.initializer,feed_dict={filenames: training_filenames})
  while True:
   try:
   #這裡獲得真實值
    features,labels = sess.run(next_element5)
    # Train...
   # print("(train) features: ")
   # print(features)
   # print("(train) labels: ")
   # print(labels) 
   except tf.errors.OutOfRangeError:
    print("Out of range error triggered (looped through training set 1 time)")
    break

 # Validate (cost,accuracy) on train set
 print("\nDone with the first iterator\n")

 sess.run(iterator5.initializer,feed_dict={filenames: validation_filenames})
 while True:
  try:
   features,labels = sess.run(next_element5)
   # Validate (cost,accuracy) on dev set
  # print("(dev) features: ")
  # print(features)
  # print("(dev) labels: ")
  # print(labels)
  except tf.errors.OutOfRangeError:
   print("Out of range error triggered (looped through dev set 1 time only)")
   break 

第二種方法,基於生成器,可以進行預處理操作了,sess裡run出來的結果可以直接進行輸入訓練,但需要自己寫一個生成器,我使用的測試程式碼如下:

import tensorflow as tf
import random
import threading
import numpy as np
from data import load_image,load_wave

class SequenceData():
 def __init__(self,path,batch_size=32):
  self.path = path
  self.batch_size = batch_size
  f = open(path)
  self.datas = f.readlines()
  self.L = len(self.datas)
  self.index = random.sample(range(self.L),self.L)
  
 def __len__(self):
  return self.L - self.batch_size
  
 def __getitem__(self,idx):
  batch_indexs = self.index[idx:(idx+self.batch_size)]
  batch_datas = [self.datas[k] for k in batch_indexs]
  img1s,img2s,audios,labels = self.data_generation(batch_datas)
  return img1s,labels

 def gen(self):
  for i in range(100000):
   t = self.__getitem__(i)
   yield t

 def data_generation(self,batch_datas):
 	#預處理操作,資料在引數裡
  return img1s,labels

#這裡的type要和實際返回的資料型別對應,如果在自己的處理程式碼裡已經考慮的batchszie,那這裡的batch設為1即可
dataset = tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen,output_types= (tf.float32,tf.float32,tf.int64))
dataset = dataset.map(lambda x,y,z,w : (x,w),num_parallel_calls=32).prefetch(buffer_size=1000)
X,w = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:
 for _ in range(100000):
  a,b,c,d = sess.run([X,w])
  print(a.shape)

不過python的多執行緒並不是真正的多執行緒,雖然看起來我是啟動了32執行緒,但執行時的CPU佔用如下所示:

Tensorflow 多執行緒與多程序資料載入例項

還剩這麼多核心空著,然後就是第三個版本了,使用了queue來快取資料,訓練需要資料時直接從queue中進行讀取,是一個到多程序的過度版本(vscode沒法debug多程序,坑啊,還以為程式碼寫錯了,在vscode裡多程序直接就沒法執行),在初始化時啟動多個執行緒進行資料的預處理:

import tensorflow as tf
import random
import threading
import numpy as np
from data import load_image,load_wave
from queue import Queue

class SequenceData():
 def __init__(self,self.L)
  self.queue = Queue(maxsize=20)

  for i in range(32):
   threading.Thread(target=self.f).start()
 def __len__(self):
  return self.L - self.batch_size
 def __getitem__(self,labels
 
 def f(self):
  for i in range(int(self.__len__()/self.batch_size)):
   t = self.__getitem__(i)
   self.queue.put(t)

 def gen(self):
  while 1:
   yield self.queue.get()

 def data_generation(self,batch_datas):
  #資料預處理操作
  return img1s,num_parallel_calls=1).prefetch(buffer_size=1000)
X,w])
  print(a.shape)

2. 多程序資料讀取

這裡的程式碼和多執行緒的第三個版本非常類似,修改為啟動程序和程序類裡的Queue即可,但千萬不要在vscode裡直接debug!在vscode裡直接f5執行程序並不能啟動.

from __future__ import unicode_literals
from functools import reduce
import tensorflow as tf
import numpy as np
import warnings
import argparse
import skimage.io
import skimage.transform
import skimage
import scipy.io.wavfile
from multiprocessing import Process,Queue

class SequenceData():
 def __init__(self,batch_size=32):
  self.path = path
  self.batch_size = batch_size
  f = open(path)
  self.datas = f.readlines()
  self.L = len(self.datas) 
  self.index = random.sample(range(self.L),self.L)
  self.queue = Queue(maxsize=30)
  
  self.Process_num=32
  for i in range(self.Process_num):
   print(i,'start')
   ii = int(self.__len__()/self.Process_num)
   t = Process(target=self.f,args=(i*ii,(i+1)*ii))
   t.start()
 def __len__(self):
  return self.L - self.batch_size
 def __getitem__(self,labels
 
 def f(self,i_l,i_h):
  for i in range(i_l,i_h):
   t = self.__getitem__(i)
   self.queue.put(t)

 def gen(self):
  while 1:
   t = self.queue.get()
   yield t[0],t[1],t[2],t[3]

 def data_generation(self,labels

epochs = 2

data_g = SequenceData('train_1.csv',batch_size=48)
dataset = tf.data.Dataset().batch(1).from_generator(data_g.gen,tf.float32))
X,w = dataset.make_one_shot_iterator().get_next()

with tf.Session() as sess:

 tf.global_variables_initializer().run()
 for i in range(epochs):
  for j in range(int(len(data_g)/(data_g.batch_size))):
   face1,face2,voice,labels = sess.run([X,w])
   print(face1.shape)

然後,最後實現的效果

Tensorflow 多執行緒與多程序資料載入例項

以上這篇Tensorflow 多執行緒與多程序資料載入例項就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。