1. 程式人生 > >Tensorflow計算加速

Tensorflow計算加速

將深度學習應用到實際 問題中, 一個非常大的問題在於訓練深度 學習模型需要的計算量太大。但幸好我們可以通過 TensorFlow 利用 GPU 或/和分散式計算進行模型訓練。我們可以使用單個 GPU 進行計算加速,但是,在很多情況下,單個 GPU 的加速效率無法滿足訓練大型深度學習模型 的計算量需求,這時將需要利用更多的計算資源。這時,我們便可以採用並行方式使用多個GPU來訓練深度學習模型。

Tensor Flow 程式可以通過 tf.device 函式來指定執行每一 個操作的裝置,這個裝置可以是本地的 CPU 或者 GPU ,也可以是某一臺遠端的伺服器。TensorFlow會給每一個可用的裝置 一個名稱, tf.device 函式可以通過裝置的名稱來指定執行運算的裝置。比如 CPU 在 TensorFlow 中的名稱為 /cpu : 0...在預設情況下,即使機器有多個 CPU, Tensor Flow 也不會區分它們,所有的 CPU 都使用/cpu:0作為名稱。而 一 臺機器上不同 GPU 的名稱是不同的,第 n 個 GPU 在 TensorFlow 中的名稱為/gpu :n。TensorFlow 提供了 一個快捷的方式來檢視執行每一個運算的裝置。在生成會話時,可以通過設定 log_ device _placement 引數來列印執行每一個運算的裝置。

# coding=utf-8
import tensorflow as tf

a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
c = a + b

# 通過log_device_placement引數來記錄執行每一個運算的裝置。
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print sess.run(c)


# 通過tf.device將運算指定到特定的裝置上。
with tf.device('/cpu:0'):
	a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
	b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
with tf.device('/gpu:1'):
    c = a + b

sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print sess.run(c)


a_cpu = tf.Variable(0, name="a_cpu")
with tf.device('/gpu:0'):
	a_gpu = tf.Variable(0, name="a_gpu")
	# 通過allow_soft_placement引數自動將無法放在GPU上的操作放回CPU上。
sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True))
sess.run(tf.global_variables_initializer())

在配置好 GPU 環境的 TensorF low 中, TensorFlow 會自動優先將運算放置在 GPU 上。 如果需要將某些運算放到不同的 GPU 或者 CPU 上,就需要通過 tf.device 來手工指定 。在 TensorFlow 中,不是所有的操作都可以被放在 GPU 上,如果強行將無法放在 GPU 上的操作指定到 GPU 上,那麼程式將會報錯。在 TensorFlow 的 kernel中定義了哪些操作可以跑在 GPU 上。TensorFlow 在生成會話時可以指定 allow_ soft _placement 引數。當 allow_soft_p l acement 引數設定為 True 時,如果運算無法由 GPU 執行,那麼 TensorF low 會自動將它放到 CPU 上執行。

雖然 GPU 可以加速 TensorF l ow 的計算,但一般來說不會把所有的操作全部放在 GPU上。 一個比較好的實踐是將計算密集型的運算放在 GPU 上,而把其他操作放到 CPU 上 。GPU 是機器中相對獨立的資源,將計算放入或者轉出 GPU 都需要額外的時間 。 而且 GPU需要將計算時用到的資料從記憶體複製到 GPU 裝置上,這也需要額外的時間。TensorFlow 預設會佔用裝置上的所有 GPU 以及每個 GPU 的所有視訊記憶體 。 如果在 一個Ten sorFlow 程式中只需要使用部分 GPU ,可以通過設定 CUDA VISIBLE DEVICES 環境變 量來控制。

如下:

只使用第一、二塊GPU

CUDA_VISIBLE_DEVICES=0,1 python demo_code.py

或者設定環境變數來設定

只用第三塊

import os


os.environ["CUDA_VISIBLE_DEVICES"]="2"

雖然 TensorF low 預設會一 次性佔用 一個 GPU 的所有視訊記憶體,但是 TensorFlow 也支援動態分配 GPU 的視訊記憶體,使得 一塊 GPU 上可以同時執行多個任務 。 

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.Session(config=config,...)

 深度學習模型的訓練是一個迭代的過程。在每一輪迭代中,前向傳播演算法會根據當前引數的取值計算出在一 小部分訓練資料上的預測值,然後反向傳播演算法再根據損失函式計算引數的梯度並更新引數。在並行化地訓練深度學習模型時,不同裝置 ( GPU 或 CPU )可以在不同訓練資料上執行這個迭代的過程,而不同並行模式的區別在於不同的引數更新方式。深度學習訓練流程圖如下:

 常用的並行化深度學習模型訓練方式有兩種,同步模式和非同步模式 。在非同步模式中,每一輪選代時,不同裝置會讀取引數最新的取值,但因為不同裝置讀取引數取值的時間不 一樣,所以得到的值也有可能不一樣。根據當前引數的取值和隨機獲取的 一 小部分訓練資料,不同裝置各自執行反向傳播的過程並獨立地更新引數。可以簡單地認為非同步模式就是單機模式複製了多份,每一份使用不同的訓練資料進行訓練 。 在非同步模式下,不同裝置之間是完全獨立的。非同步模式流程圖如下:

 然而使用非同步模式訓練的深度學習模型有可能無法達到較優的訓練結果。為了避免更新不同步的問題,可以使用同步模式 。 在同步模式下,所有的裝置同時讀取引數的取值,並且當反向傳播演算法完成之後同步更新引數的取值 。 單個裝置不會單獨對引數進行更新 ,而會等待所有裝置都完成反向傳播之後再統一更新引數。在每一輪法代時,不同裝置首先統一讀取當前引數的取值,並隨機獲取 一 小部分資料 。 然後在不同裝置上執行反向傳播過程得到在各自訓練資料上引數的梯度。注意雖然所有裝置使用的引數是 一致的,但是因為 訓練資料不同,所以得到引數的梯度就可能不一樣 。當 所有裝置完成反向傳播的計算之後,需要計算出不同裝置上引數梯度的平均值,最後再根據平均值對引數進行更新 。同步模式流程圖如下:

 同步模式解決了非同步模式中存在的引數更新問題,然而同步模式的效率卻低於非同步模式 。在 同步模式下,每一 輪迭代都需要裝置統一 開始、統一 結束。如果裝置的執行速度不一 致 ,那麼 每一輪訓練都需要等待最慢的裝置結束才能開始更新引數 , 於是很多時間將被花在等待上 。 雖然理論上非同步模式存在缺陷,但因為訓練深度 學習模型 時使用的隨機梯度下降本身就是梯度下降的 一個近似解法,而且即使是梯度下降也無法保證達到 全域性最優值,所以在實際應用中,在相同時間內,使用非同步模式訓練的模型不 一 定比同步模式差。所以這兩種訓練模式在實踐中都有非常廣泛的應用。

 一般來說一 臺機器上的多個 GPU 效能相似,所以在這種設定下會更多地採用同步模式訓練深度學習模型 。給出如下程式碼

# coding=utf-8
from datetime import datetime
import os
import time

import tensorflow as tf
import mnist_inference

# 定義訓練神經網路時需要用到的引數。
BATCH_SIZE = 100 
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99 
N_GPU = 2

# 定義日誌和模型輸出的路徑。
MODEL_SAVE_PATH = "logs_and_models/"
MODEL_NAME = "model.ckpt"

# 定義資料儲存的路徑。因為需要為不同的GPU提供不同的訓練資料,所以通過placerholder
# 的方式就需要手動準備多份資料。為了方便訓練資料的獲取過程,可以採用Dataset
# 的方式從TFRecord中讀取資料。於是在這裡提供的資料檔案路徑為將MNIST訓練資料
# 轉化為TFRecords格式之後的路徑。
DATA_PATH = "output.tfrecords" 

# 定義輸入佇列得到訓練資料
def get_input():
    dataset = tf.contrib.data.TFRecordDataset([DATA_PATH])

    # 定義資料解析格式。
    def parser(record):
        features = tf.parse_single_example(
            record,
            features={
                'image_raw': tf.FixedLenFeature([], tf.string),
                'pixels': tf.FixedLenFeature([], tf.int64),
                'label': tf.FixedLenFeature([], tf.int64),
            })

        # 解析圖片和標籤資訊。
        decoded_image = tf.decode_raw(features['image_raw'], tf.uint8)
        reshaped_image = tf.reshape(decoded_image, [784])
        retyped_image = tf.cast(reshaped_image, tf.float32)
        label = tf.cast(features['label'], tf.int32)

        return retyped_image, label

    # 定義輸入佇列。
    dataset = dataset.map(parser)
    dataset = dataset.shuffle(buffer_size=10000)
    dataset = dataset.repeat(10)
    dataset = dataset.batch(BATCH_SIZE)
    iterator = dataset.make_one_shot_iterator()

    features, labels = iterator.get_next()
    return features, labels

# 定義損失函式。對於給定的訓練資料、正則化損失計算規則和名稱空間,計算在這個名稱空間
# 下的總損失。之所以需要給定名稱空間是因為不同的GPU上計算得出的正則化損失都會加入名為
# loss的集合,如果不通過名稱空間就會將不同GPU上的正則化損失都加進來。
def get_loss(x, y_, regularizer, scope, reuse_variables=None):
    # 計算神經網路的前向傳播結果。
    with tf.variable_scope(tf.get_variable_scope(), reuse=reuse_variables):
        y = mnist_inference.inference(x, regularizer)
    # 計算交叉熵損失。
    cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
        logits=y, labels=y_))
    # 計算當前GPU上計算得到的正則化損失。
    regularization_loss = tf.add_n(tf.get_collection('losses', scope))
    # 計算最終的總損失。
    loss = cross_entropy + regularization_loss
    return loss

# 計算每一個變數梯度的平均值。
def average_gradients(tower_grads):
    average_grads = []

    # 列舉所有的變數和變數在不同GPU上計算得出的梯度。
    for grad_and_vars in zip(*tower_grads):
        # 計算所有GPU上的梯度平均值。
        grads = []
        for g, _ in grad_and_vars:
            expanded_g = tf.expand_dims(g, 0)
            grads.append(expanded_g)
        grad = tf.concat(grads, 0)
        grad = tf.reduce_mean(grad, 0)

        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        # 將變數和它的平均梯度對應起來。
        average_grads.append(grad_and_var)
    # 返回所有變數的平均梯度,這個將被用於變數的更新。
    return average_grads

# 主訓練過程。
def main(argv=None): 
    # 將簡單的運算放在CPU上,只有神經網路的訓練過程放在GPU上。
    with tf.Graph().as_default(), tf.device('/cpu:0'):
        # 定義基本的訓練過程
        x, y_ = get_input()
        regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
        
        global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
        learning_rate = tf.train.exponential_decay(
            LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY)       
        
        opt = tf.train.GradientDescentOptimizer(learning_rate)
        
        tower_grads = []
        reuse_variables = False
        # 將神經網路的優化過程跑在不同的GPU上。
        for i in range(N_GPU):
            # 將優化過程指定在一個GPU上。
            with tf.device('/gpu:%d' % i):
                with tf.name_scope('GPU_%d' % i) as scope:
                    cur_loss = get_loss(x, y_, regularizer, scope, reuse_variables)
                    # 在第一次宣告變數之後,將控制變數重用的引數設定為True。這樣可以
                    # 讓不同的GPU更新同一組引數。
                    reuse_variables = True
                    grads = opt.compute_gradients(cur_loss)
                    tower_grads.append(grads)
        
        # 計算變數的平均梯度。
        grads = average_gradients(tower_grads)
        for grad, var in grads:
            if grad is not None:
            	tf.summary.histogram('gradients_on_average/%s' % var.op.name, grad)

        # 使用平均梯度更新引數。
        apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
        for var in tf.trainable_variables():
            tf.summary.histogram(var.op.name, var)

        # 計算變數的滑動平均值。
        variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
        variables_to_average = (tf.trainable_variables() +tf.moving_average_variables())
        variables_averages_op = variable_averages.apply(variables_to_average)
        # 每一輪迭代需要更新變數的取值並更新變數的滑動平均值。
        train_op = tf.group(apply_gradient_op, variables_averages_op)

        saver = tf.train.Saver()
        summary_op = tf.summary.merge_all()        
        init = tf.global_variables_initializer()
        with tf.Session(config=tf.ConfigProto(
                allow_soft_placement=True, log_device_placement=True)) as sess:
            # 初始化所有變數並啟動佇列。
            init.run()
            summary_writer = tf.summary.FileWriter(MODEL_SAVE_PATH, sess.graph)

            for step in range(TRAINING_STEPS):
                # 執行神經網路訓練操作,並記錄訓練操作的執行時間。
                start_time = time.time()
                _, loss_value = sess.run([train_op, cur_loss])
                duration = time.time() - start_time
                
                # 每隔一段時間資料當前的訓練進度,並統計訓練速度。
                if step != 0 and step % 10 == 0:
                    # 計算使用過的訓練資料個數。因為在每一次執行訓練操作時,每一個GPU
                    # 都會使用一個batch的訓練資料,所以總共用到的訓練資料個數為
                    # batch大小 × GPU個數。
                    num_examples_per_step = BATCH_SIZE * N_GPU

                    # num_examples_per_step為本次迭代使用到的訓練資料個數,
                    # duration為運行當前訓練過程使用的時間,於是平均每秒可以處理的訓
                    # 練資料個數為num_examples_per_step / duration。
                    examples_per_sec = num_examples_per_step / duration

                    # duration為運行當前訓練過程使用的時間,因為在每一個訓練過程中,
                    # 每一個GPU都會使用一個batch的訓練資料,所以在單個batch上的訓
                    # 練所需要時間為duration / GPU個數。
                    sec_per_batch = duration / N_GPU
    
                    # 輸出訓練資訊。
                    format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)')
                    print (format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch))
                    
                    # 通過TensorBoard視覺化訓練過程。
                    summary = sess.run(summary_op)
                    summary_writer.add_summary(summary, step)
    
                # 每隔一段時間儲存當前的模型。
                if step % 1000 == 0 or (step + 1) == TRAINING_STEPS:
                    checkpoint_path = os.path.join(MODEL_SAVE_PATH, MODEL_NAME)
                    saver.save(sess, checkpoint_path, global_step=step)
        
if __name__ == '__main__':
	tf.app.run()

一 臺 機器上能夠安裝的 GPU有限 , 要進一步提升深度學習模型 的訓練速度 ,就需要將 TensorFlow 分佈 式執行在多臺機器上 。

import tensorflow as tf

# 建立一個本地叢集。
c = tf.constant("Hello, distributed TensorFlow!")
server = tf.train.Server.create_local_server()
sess = tf.Session(server.target)
print sess.run(c)


# 建立兩個叢集
c = tf.constant("Hello from server1!")
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) 
print sess.run(c)
server.join()


import tensorflow as tf
c = tf.constant("Hello from server2!")
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) 
print sess.run(c)
server.join()

使用分散式 TensorFlow 訓練深度 學習模型一般有兩種方式 。一種方式叫做計算圖內分散式( in-graph replication 〉 。 使用這種分散式訓練方式時,所有的任務都會使用 一 個TensorFlow 計算圖中的變數(也就是深度學習模型中的引數),而只是將計算部分發布到不 同的計算伺服器上。 另外 一 種分散式 TensorFlow 訓練深度學習模型的方式叫計算圖之間分散式( between-graph replication )。使用這種分散式方式 時,在每一個計算伺服器上都會建立一個獨立 的 TensorFlow 計算圖,但不同計算圖中的相同引數需要以 一種固定的方式放到同 一個引數伺服器上 。

分散式非同步更新

# -*- coding: utf-8 -*-

import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import mnist_inference

# 配置神經網路的引數。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.01
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 20000
MOVING_AVERAGE_DECAY = 0.99

# 模型儲存的路徑。
MODEL_SAVE_PATH = "logs/log_async"
# MNIST資料路徑。
DATA_PATH = "../../datasets/MNIST_data"

# 通過flags指定執行的引數。在12.4.1小節中對於不同的任務(task)給出了不同的程式,
# 但這不是一種可擴充套件的方式。在這一小節中將使用執行程式時給出的引數來配置在不同
# 任務中執行的程式。
FLAGS = tf.app.flags.FLAGS

# 指定當前執行的是引數伺服器還是計算伺服器。引數伺服器只負責TensorFlow中變數的維護
# 和管理,計算伺服器負責每一輪迭代時執行反向傳播過程。
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
# 指定叢集中的引數伺服器地址。
tf.app.flags.DEFINE_string(
    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
    'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
# 指定叢集中的計算伺服器地址。
tf.app.flags.DEFINE_string(
    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
    'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
# 指定當前程式的任務ID。TensorFlow會自動根據引數伺服器/計算伺服器列表中的埠號
# 來啟動服務。注意引數伺服器和計算伺服器的編號都是從0開始的。
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')

# 定義TensorFlow的計算圖,並返回每一輪迭代時需要執行的操作。這個過程和5.5節中的主
# 函式基本一致,但為了使處理分散式計算的部分更加突出,本小節將此過程整理為一個函式。
def build_model(x, y_, is_chief):
    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
    # 通過和5.5節給出的mnist_inference.py程式碼計算神經網路前向傳播的結果。
    y = mnist_inference.inference(x, regularizer)
    global_step = tf.contrib.framework.get_or_create_global_step()

    # 計算損失函式並定義反向傳播過程。
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    learning_rate = tf.train.exponential_decay(
        LEARNING_RATE_BASE,
        global_step,
        60000 / BATCH_SIZE,
        LEARNING_RATE_DECAY)
    
    train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(
        loss, global_step=global_step)

    # 定義每一輪迭代需要執行的操作。
    if is_chief:
        # 計算變數的滑動平均值。   
        variable_averages = tf.train.ExponentialMovingAverage(
            MOVING_AVERAGE_DECAY, global_step)
        variables_averages_op = variable_averages.apply(
            tf.trainable_variables())
        with tf.control_dependencies([variables_averages_op, train_op]):
            train_op = tf.no_op()
    return global_step, loss, train_op

def main(argv=None):
    # 解析flags並通過tf.train.ClusterSpec配置TensorFlow叢集。
    ps_hosts = FLAGS.ps_hosts.split(',')
    worker_hosts = FLAGS.worker_hosts.split(',')
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    # 通過tf.train.ClusterSpec以及當前任務建立tf.train.Server。
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_id)

    # 引數伺服器只需要管理TensorFlow中的變數,不需要執行訓練的過程。server.join()會
    # 一致停在這條語句上。
    if FLAGS.job_name == 'ps':
        with tf.device("/cpu:0"):
            server.join()

    # 定義計算伺服器需要執行的操作。
    is_chief = (FLAGS.task_id == 0)
    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)

    # 通過tf.train.replica_device_setter函式來指定執行每一個運算的裝置。
    # tf.train.replica_device_setter函式會自動將所有的引數分配到引數伺服器上,而
    # 計算分配到當前的計算伺服器上。圖12-9展示了通過TensorBoard視覺化得到的第一個計
    # 算伺服器上運算分配的結果。
    device_setter = tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_id,
        cluster=cluster)
    
    with tf.device(device_setter):
        # 定義輸入並得到每一輪迭代需要執行的操作。
        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
        global_step, loss, train_op = build_model(x, y_, is_chief)

        hooks=[tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]
        sess_config = tf.ConfigProto(allow_soft_placement=True,
                                     log_device_placement=False)

        # 通過tf.train.MonitoredTrainingSession管理訓練深度學習模型的通用功能。
        with tf.train.MonitoredTrainingSession(master=server.target,
                                               is_chief=is_chief,
                                               checkpoint_dir=MODEL_SAVE_PATH,
                                               hooks=hooks,
                                               save_checkpoint_secs=60,
                                               config=sess_config) as mon_sess:
            print "session started."
            step = 0
            start_time = time.time()

            # 執行迭代過程。在迭代過程中tf.train.MonitoredTrainingSession會幫助完成初始
            # 化、從checkpoint中載入訓練過的模型、輸出日誌並儲存模型, 所以下面的程式中不需要
            # 在呼叫這些過程。tf.train.StopAtStepHook會幫忙判斷是否需要退出。
            while not mon_sess.should_stop():                
                xs, ys = mnist.train.next_batch(BATCH_SIZE)
                _, loss_value, global_step_value = mon_sess.run(
                    [train_op, loss, global_step], feed_dict={x: xs, y_: ys})

                # 每隔一段時間輸出訓練資訊。不同的計算伺服器都會更新全域性的訓練輪數,所以這裡使用
                # global_step_value得到在訓練中使用過的batch的總數。
                if step > 0 and step % 100 == 0:
                    duration = time.time() - start_time
                    sec_per_batch = duration / global_step_value
                    format_str = "After %d training steps (%d global steps), " +\
                                 "loss on training batch is %g. (%.3f sec/batch)"
                    print format_str % (step, global_step_value, loss_value, sec_per_batch)
                step += 1

if __name__ == "__main__":
    tf.app.run()

分散式同步更新

# -*- coding: utf-8 -*-

import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import mnist_inference

# 配置神經網路的引數。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.01
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 20000
MOVING_AVERAGE_DECAY = 0.99

MODEL_SAVE_PATH = "logs/log_sync"
DATA_PATH = "../../datasets/MNIST_data"

# 和非同步模式類似的設定flags。
FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
tf.app.flags.DEFINE_string(
    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
    'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
tf.app.flags.DEFINE_string(
    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
    'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')

# 和非同步模式類似的定義TensorFlow的計算圖。唯一的區別在於使用
# tf.train.SyncReplicasOptimizer函式處理同步更新。
def build_model(x, y_, n_workers, is_chief):
    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
    y = mnist_inference.inference(x, regularizer)
    global_step = tf.contrib.framework.get_or_create_global_step()

    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    learning_rate = tf.train.exponential_decay(
        LEARNING_RATE_BASE,
        global_step,
        60000 / BATCH_SIZE,
        LEARNING_RATE_DECAY)
    
    # 通過tf.train.SyncReplicasOptimizer函式實現同步更新。
    opt = tf.train.SyncReplicasOptimizer(
        tf.train.GradientDescentOptimizer(learning_rate),
        replicas_to_aggregate=n_workers,
        total_num_replicas=n_workers)
    sync_replicas_hook = opt.make_session_run_hook(is_chief)
    train_op = opt.minimize(loss, global_step=global_step)
    
    if is_chief:
        variable_averages = tf.train.ExponentialMovingAverage(
            MOVING_AVERAGE_DECAY, global_step)
        variables_averages_op = variable_averages.apply(
            tf.trainable_variables())
        with tf.control_dependencies([variables_averages_op, train_op]):
            train_op = tf.no_op()
            
    return global_step, loss, train_op, sync_replicas_hook

def main(argv=None):
    # 和非同步模式類似的建立TensorFlow叢集。
    ps_hosts = FLAGS.ps_hosts.split(',')
    worker_hosts = FLAGS.worker_hosts.split(',')
    n_workers = len(worker_hosts)
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_id)

    if FLAGS.job_name == 'ps':
        with tf.device("/cpu:0"):
            server.join()

    is_chief = (FLAGS.task_id == 0)
    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)

    device_setter = tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_id,
        cluster=cluster)
    
    with tf.device(device_setter):
        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
        global_step, loss, train_op, sync_replicas_hook = build_model(x, y_, n_workers, is_chief)

        # 把處理同步更新的hook也加進來。
        hooks=[sync_replicas_hook, tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]
        sess_config = tf.ConfigProto(allow_soft_placement=True,
                                     log_device_placement=False)

        # 訓練過程和非同步一致。
        with tf.train.MonitoredTrainingSession(master=server.target,
                                               is_chief=is_chief,
                                               checkpoint_dir=MODEL_SAVE_PATH,
                                               hooks=hooks,
                                               save_checkpoint_secs=60,
                                               config=sess_config) as mon_sess:
            print "session started."
            step = 0
            start_time = time.time()

            while not mon_sess.should_stop():                
                xs, ys = mnist.train.next_batch(BATCH_SIZE)
                _, loss_value, global_step_value = mon_sess.run(
                    [train_op, loss, global_step], feed_dict={x: xs, y_: ys})

                if step > 0 and step % 100 == 0:
                    duration = time.time() - start_time
                    sec_per_batch = duration / global_step_value
                    format_str = "After %d training steps (%d global steps), " +\
                                 "loss on training batch is %g. (%.3f sec/batch)"
                    print format_str % (step, global_step_value, loss_value, sec_per_batch)
                step += 1

if __name__ == "__main__":
    tf.app.run()

import的程式碼(mnist_inference)

import tensorflow as tf

INPUT_NODE = 784
OUTPUT_NODE = 10
LAYER1_NODE = 500

def get_weight_variable(shape, regularizer):
    weights = tf.get_variable("weights", shape, initializer=tf.truncated_normal_initializer(stddev=0.1))
    if regularizer != None: tf.add_to_collection('losses', regularizer(weights))
    return weights


def inference(input_tensor, regularizer):
    with tf.variable_scope('layer1'):

        weights = get_weight_variable([INPUT_NODE, LAYER1_NODE], regularizer)
        biases = tf.get_variable("biases", [LAYER1_NODE], initializer=tf.constant_initializer(0.0))
        layer1 = tf.nn.relu(tf.matmul(input_tensor, weights) + biases)

    with tf.variable_scope('layer2'):
        weights = get_weight_variable([LAYER1_NODE, OUTPUT_NODE], regularizer)
        biases = tf.get_variable("biases", [OUTPUT_NODE], initializer=tf.constant_initializer(0.0))
        layer2 = tf.matmul(layer1, weights) + biases

    return layer2