1. 程式人生 > >TensorFlow(三)之多執行緒

TensorFlow(三)之多執行緒

本博文參考TensorFlow實戰Google深度學習框架(鄭澤宇,顧思宇),僅用作學習

一、TFRecord輸入資料格式

TFRecord是tensorflow中儲存資料的統一格式。可以統一不同的原始資料格式,並更加有效地管理不同的屬性。TFRecord檔案中的資料都是通過tf.train.Example Protocol Buffer的格式儲存的。是一種可將影象的資料和標籤放在一起的二進位制檔案,能節省記憶體,在TensorFlow中快速讀取儲存。

tf.train.Example的定義如下:

message Example{
    Features Features=1;
};

message Features{
    map<string,Feature> feature=1;
};

message Feature{
    oneof kind{
       ByteList bytes_list=1;
       FloatList float_list=2;
       Int64List int64_list=3;
    }
};

tf.train.Example中包含了一個從屬性名稱到取值的字典。屬性名稱為字串,屬性的取值可以為字串(ByteList),實數列表(FloatList)或整數列表(Int64List)。

從檔案中讀取資料一般分為:把樣本資料寫入TFRecords二進位制檔案,再從佇列中讀取。

1、生成TFRecord檔案

需要將資料填到tf.train.Example的協議快取區(Protocol Buffer)中,將協議快取區序列化為一個字串,通過tf.python_io.TFRecordWriter寫入TFRecord檔案中。

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

mnist=input_data.read_data_sets("/path/to/mnist/data",dtype=tf.unit8,one_hot=True)
#訓練資料
image=mnist.train.images
#訓練資料所對應的的正確答案,可以作為一個屬性儲存在TFRecorde中
labels=mnist.train.labels
#訓練資料的影象解析度,可以作為Example中的一個屬性
pixels=image.shape[1]
num_examples=mnist.train.num_examples

#生成整數型的屬性
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
#生成字串型的屬性
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

filename="/path/to/outpput.tfrecords"
writer=tf.python_io.TFRecordWriter(filename)
for i in range(num_examples):
    #將影象矩陣轉化為一個字串
    image_raw=image[i].tostring()
    #將一個樣例轉化為ExamplecProtocol Buffer,並將所有資訊寫入這個資料結構
    example=tf.train.Example(features=tf.train.Features(feature={
        'pixels': _int64_feature(pixels),
        'labels': _int64_feature(np.argmax(labels[i])),
        'image_raw':_bytes_feature(image_raw)
    }))
writer.write(example.SerializeToString()) writer.close()
2、從佇列中讀取

首先建立張量,從二進位制檔案中讀取一個樣本

建立張量,從二進位制檔案中隨機讀取一個mini-batch

把每一批張量傳入網路作為輸入節點

import tensorflow as tf

# 讀取檔案。
files = tf.train.match_filenames_once("/path/to/output.tfrecords")
filename_queue = tf.train.string_input_producer(files, shuffle=False)
reader = tf.TFRecordReader()
#從檔案中讀出一個樣例(也可以永別的函式讀取多個樣例)
_,serialized_example = reader.read(filename_queue)

# 解析讀取的樣例(也可以用別的函式解析多個樣例)
features = tf.parse_single_example(
      serialized_example,
      features={
        'image_raw':FixedLenFeature([],tf.string),
        'labels': FixedLenFeature([],tf.int64),
        'pixels': FixedLenFeature([],tf.int64)
      })
images=tf.decode_raw(features['image_raw'],tf.uint8) labels=tf.cast(features['labels'],tf.int32) pixels=tf.cast(features['pixels'],tf.int32) sess=tf.Session() #啟動多執行緒處理資料 coord=tf.train.Coordinator() threads=tf.train.start_queue_runners(sess=sess,coord=coord) for i in range(10): image,label,pixel=sess.run([images,labels,pixels])

二、佇列

佇列也是圖中的一個節點

佇列主要有FIFOQueue和RandomShuffleQueue。tensorflow可以利用佇列來實現多執行緒輸入資料處理。

FIFOQueue建立一個先入先出佇列。RandomShuffleQueue建立一個隨機佇列,在非同步訓練中很重要

API:tf.FIFOQueue   tf.RandomShuffleQueue

import tensorflow as tf
#建立先進先出佇列,指定佇列最多可以儲存兩個元素
q=tf.FIFOQueue(2,"int32")
#初始化佇列的元素
init=q.enqueue_many(([0,10],))
#將佇列的第一個元素出列,儲存在x裡面
x=q.dequeue()
y=x+1
#將y加入佇列
q_inc=q.enqueue([y])

with tf.Session() as sess:
    #執行初始化佇列的操作
    init.run()
    #執行出佇列,將佇列的元素加1,加1後的元素進隊的迴圈操作5次
    for _ in range(5):
       v,_=sess.run([x,q_inc])
       print(v)

三、佇列管理器&執行緒和協調器

tensorflow提供了tf.Cooradinator和tf.QueueRunner兩個類來完成多執行緒協同的功能。

QueueRunner:佇列管理器

coordinator:協調器,協調執行緒間的關係可以視為一種訊號量,用來做同步。主要用於協同多個執行緒一起停止,並提供了should_stop、request_stop和join三個函式

啟動執行緒之前,需要先宣告一個tf.Cooradinator類,並將這個類傳入每一個建立的執行緒中。

啟動的執行緒需要一直查詢tf.Cooradinator類中提供的should_stop函式,返回值為True時,執行緒也退出。

每個執行緒都可以通過request_stop函式來通知其他執行緒退出。當執行緒呼叫request_stop時,should_stop的返回值將被設定為True,這樣其他的執行緒就可以同時終止了。

import tensorflow as tf
import numpy as np
import threading
import time

#執行緒中執行的程式,每隔一秒判斷是否需要停止並列印自己的ID
def MyLoop(coord,worker_id):
    #使用tf.Coordinator類提供的協同工具判斷當前執行緒是否需要停止
    while not coord.should_stop():
        #隨機停止所有的執行緒
        if np.random.rand()<0.1:
            print("stoping from id: %d\n" % worker_id)
            #呼叫coord.request_stop()函式來通知其他執行緒停止
            coord.request_stop()
        else:
            #列印當前執行緒ID
            print("working on id: %d\n" % worker_id)
        #暫停1秒
        time.sleep(1)
#宣告一個協調器來協同多個執行緒
coord=tf.train.Coordinator()

#建立5個執行緒
threads=[
    threading.Thread(target=MyLoop,args=(coord,i,)) for  i in range(5)]
#啟動所有執行緒
for t in threads:
    t.start()
#join操作等待所有執行緒關閉,這一函式才能返回
coord.join(threads)

四、輸入資料處理框架

                                       
import tensorflow as tf
files=tf.train.match_filenames_once("/path/to/output.tfrecords")
filename_queue = tf.train.string_input_producer(files, shuffle=False)
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
      serialized_example,
      features={
          'image': tf.FixedLenFeature([], tf.string),
          'label': tf.FixedLenFeature([], tf.int64),
          'height': tf.FixedLenFeature([], tf.int64),
          'width': tf.FixedLenFeature([], tf.int64),
          'channels': tf.FixedLenFeature([], tf.int64),
      })
image,label=features['image'],features['label']
height,width=features['height'],features['width']
channels=features['channels']

decoded_image=tf.decode_raw(image,tf.uint8)
decoded_image.set_shape([height,width,channels])

image_size=299
distorted_image=preprocess_for_train(decoded_image,image_size,image_size,None)

min_after_dequeue=10000
batch_size=100
capacity=min_after_dequeue+3*batch_size
image_batch,label_batch=tf.train.shuffle_batch(
    [distorted_image,label],batch_size=batch_size,
    capacity=capacity,min_after_dequeue=min_after_dequeue)

logit=inference(image_batch)
loss=calc_loss(logit,label_batch)
train_step=tf.train.GradientDescentOptimizer(learning_rate)\
    .minimize(loss)

with tf.Session() as sess:
        tf.global_variables_initializer().run()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        for i in range(TRAINING_ROUDNS):
            sess.run(train_step)
        coord.request_stop()
        coord.join(threads)

相關推薦

TensorFlow執行

本博文參考TensorFlow實戰Google深度學習框架(鄭澤宇,顧思宇),僅用作學習 一、TFRecord輸入資料格式 TFRecord是tensorflow中儲存資料的統一格式。可以統一不同的原始資料格式,並更加有效地管理不同的屬性。TFRecord檔案中的資料都是通

【Java】執行系列阻塞執行的多種方法

前言: 在某些應用場景下,我們可能需要等待某個執行緒執行完畢,然後才能進行後續的操作。也就是說,主執行緒需要等待子執行緒都執行完畢才能執行後續的任務。 例如,當你在計算利用多執行緒執行幾個比較耗時的任務的時候,主執行緒需要利用這幾個執行緒計算的結果,才能進行後

2018-08-28微服務筆記執行

1.多執行緒 1.1 程序與執行緒 (1)程序:正在執行的程式,是執行緒的集合。主執行緒決定程式碼的執行順序。 (2)執行緒:正在獨立執行的一條執行路徑。 (3)多執行緒:為了提高程式的效率。 1.2 四種方式建立執行緒 (1)繼承Thread類 (2)實現Runnable介面

Python 執行程序 執行、同步、通訊

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊 Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒 一、python多執行緒 對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高

java基礎執行

1.程序、執行緒 程序:正在執行的程式 執行緒:程序中負責程式執行的執行單元 即:程序的範圍>執行緒的範圍。 且:一個程序可以有多個執行緒。 2.多執行緒的意義:多部分程式碼同時執行,提高CPU使用效率 3.多執行緒的特點:CPU的隨機性 4.建立執行緒的兩種方法

Python 執行程序 執行、同步、通訊

一、python多執行緒 對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高。這裡的I/O包括網路I/O和檔案I/O 1、例項 假如利用socket傳送http請求,也就是網路I/O。爬取列表網頁中的寫href連結,然後獲取href連結之後,在爬去連結的網頁詳情。 如果不適用

Java執行知識點總結——進階篇執行下的單例模式

餓漢式 餓漢式多執行緒和單執行緒的程式碼是一樣的,如下: class Single { private static final Single s = new Single(); p

將MySQL去重操作優化到極致彈連發執行並行執行

        上一篇已經將單條查重語句調整到最優,但該語句是以單執行緒方式執行。能否利用多處理器,讓去重操作多執行緒並行執行,從而進一步提高速度呢?比如我的實驗環境是4處理器,如果使用4個執行緒同時執行查重sql,理論上應該接近4倍的效能提升。一、資料分片        我

Python基礎--- Python執行介紹,開啟執行種方式,time模組,join,Daemon,Lock、Rlock,事件機制,Timer

一、多執行緒介紹 --------------------------------------------------------- 1.threading用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。 2.python當前版本的多執行緒庫沒有實現優先順序、執行緒組,執

胡八一Java執行

多執行緒的優勢:多程序執行需要獨立的記憶體空間,而多執行緒可以共享記憶體,從而提高了執行緒的執行效率。 建立執行緒一般使用兩種方式: 1、繼承Thread類: import java.io.IOException; public class Test extends

8Java執行ThreadLocal

1.引言       在前面幾篇部落格中主要是介紹了多個執行緒之間互動,如果在多執行緒中操作同一個變數,那麼如果想要實現每一個執行緒都有著自己的變數,那麼應該如何實現呢?JDK為我們提供一個類ThreadLocal,使用ThreadLocal類我們實

輕鬆學會執行——如何確定執行

一般情況下,在網際網路程式設計中,我們會使用多執行緒來搶奪網路資源。那麼,執行緒數量我們如何來確定呢? 我們都知道,執行緒數量和CPU核數有關。所以有人建議說:執行緒數為核數的兩倍最好。 其實只要這些執行緒不頻繁切換、競爭資源的話。想要最優效能,還是根據具體專案慢慢除錯。

Java併發程式設計8執行環境中安全使用集合API含程式碼

Java併發程式設計(8):多執行緒環境中安全使用集合API(含程式碼)JAVA大資料中高階架構 2018-11-09 14:44:47在集合API中,最初設計的Vector和Hashtable是多執行緒安全的。例如:對於Vector來說,用來新增和刪除元素的方法是同步的。如果只有一個執行緒與Vector的例

Windows網路程式設計執行訊息處理

對於服務端來說,呼叫accept()函式同意客戶端連線的請求後,需要處理完與這個客戶端的通訊後回到accept()繼續等待下一個客戶端的連線,如果一個客戶端請求連線時服務端並沒有在accept()處等待,客戶端是無法成功連上服務端的,因此併發客戶端連線的服務端必然是多執行緒的。 服務

演算法小題:兩執行交替輸出1~99

一、回顧Thread的常用法方法 1.start():啟動執行緒並執行相應的run()方法 2.run():子執行緒要執行的程式碼放入run()方法中 3.currentThread():靜態的,調取當前的執行緒 4.getName():獲取此執行緒的名字 5.se

執行Java執行,啟動四個執行,兩個執行加一,另外兩個執行減一

  public class Test { public static void main(String[] args) { final ShareData data = new ShareData(); for (int i = 0; i < 2; i++) {

VS中的執行/MT執行除錯/MTd執行DLL/MD執行除錯DLL/MDd的區別

一種語言的開發環境往往會附帶有語言庫,這些庫就是對作業系統的API的包裝,我們也稱這些語言庫為執行庫 對於MSVC的執行庫(CRT),按照靜態/動態連結,可以分為靜態版和動態版;按照除錯/釋出,可以分為除錯版本和釋出版本;按照單執行緒/多執行緒,可以分為單執行緒版本和多執行

JAVA基礎22-執行執行的概念,執行狀態及其轉換】

多執行緒 一.概念          1. 程序:程序是表示資源分配的的基本概念,又是排程執行的基本單位,是系統中的併發執行的單位。           2. 執行緒:單個程序中執行中

併發工具類等待執行完成的CountDownLatch

簡介 CountDownLatch 允許一個或多個執行緒等待其他執行緒完成操作。 應用場景 假如有這樣一個需求,當我們需要解析一個Excel裡多個sheet的資料時,可以考慮使用多執行緒,每個執行緒解析一個sheet裡的資料,等到所有的sheet都解析完之後,程式需要提示解析完成。在這個需求中

併發集合使用阻塞執行安全的列表

宣告:本文是《 Java 7 Concurrency Cookbook 》的第六章,作者: Javier Fernández González  譯者:許巧輝 使用阻塞執行緒安全的列表 列表(list)是最基本的集合。一個列表中的元素數量是不確定的,並且你可以新增、讀取和刪除任意位置上的元素。