1. 程式人生 > >自編碼器及其tensorflow實現

自編碼器及其tensorflow實現

自編碼器理論

自編碼器(AutoEncoder)顧名思義,就是可以用自身的高階特徵編碼自己。自編碼器實際上也是一種神經網路,它的輸入和輸入的維度是一樣的。藉助稀疏編碼的思想,目標是使用稀疏的一些高階特徵重新組合來重構自己。
早年在關於稀疏編碼(Sparse Coding)的研究中,通過對大量黑白風景照片提取16*16的影象碎片分析,研究發現幾乎所有的影象碎片都可以由64種正交的邊組合得到,並且組合出一張影象碎片需要的邊的數量是很少的,也就是稀疏的。聲音也有同樣的情況,大量未標註的音訊中可以得到20種基本結構,絕大多數聲音都可以由這些基本的結構線性組合得到。這就是特徵的稀疏表達,通過少量的基本特徵組合、拼裝得到更高層抽象的特徵。
說到自編碼器,就必須要提到Hinton在science上發表的文章Reducing the dimensionality of data with neural networks,這篇文章系統講解了使用自編碼器對資料進行降維的方法,隨後發展出了DBN(Deep Belief Networks, DBN),為訓練很深的網路提供了一個可行的方案。也就是用無監督的逐層訓練提取特徵,將網路的權重初始化到一個比較好的狀態這種無監督的逐層訓練,和自編碼器是非常相似的。
基本的自編碼器模型是一個簡單的三層神經網路結構:一個輸入層、一個隱藏層和一個輸出層。其中輸出層和輸入層具有相同的維數。

1240
image.png


自編碼器包含兩部分:編碼器(encoder)和解碼器(decoder)。從輸入層到隱藏層是編碼的過程,從隱藏層到輸出是解碼的過程。

1240
image.png


如果僅僅是簡單地將輸入複製到輸出,這樣的建模是沒有意義的。因此,自編碼器的網路在設計上,經常採用欠完備(undercomplete)的形式,限制隱藏層h的維度比輸入x小。學習過程可以定義為最小化損失函式:

png.latex?L(%5Ctextbf%7Bx%7D,g(f(%5Ctextbf%7Bx%7D)))


損失函式可以定義為:

1240
image.png


當損失函式取均方誤差是,欠完備的自編碼器會學習出PCA相同的子空間。
此外,還有一類自編碼器,叫做正則自編碼器,正則自編碼器通過定義合適的損失函式使得模型具有對輸入特性的學習能力,並不限制隱層維度和編碼器自身的深度。
正則自編碼器有很多形式,例如稀疏自編碼器,去噪編碼器等。稀疏自編碼器可以認為是帶有潛變數的生成模型的近似最大似然訓練,去噪自編碼器將輸入中添加了噪聲,訓練網路隱式地去學習資料的分佈特徵。

自編碼器的tensorflow實現

下面將給出自編碼器的tensorflow實現。
首先匯入庫和資料。

import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

自編碼器中用到了xavier initialization初始化引數:

# define xavier initialization function, mean distribution or gauss distribution
# mean = 0, var = 2/(n_in+n_out) 
def xavier_init(fan_in, fan_out, constant=1):
    """xavier initialization function
    fan_in: input node number
    fan_out: output node number"""
    low = -constant*np.sqrt(6/(fan_in+fan_out))
    high = constant*np.sqrt(6/(fan_in+fan_out))
    return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high, dtype=tf.float32)

定義一個去噪自編碼器的類,包括一個構建函式init(),完成一些成員變數輸出化和網路構建,定義權重初始化函式_initialize_weights(),定義損失函式cost和單步訓練函式partial_fit().

class AdditiveGaussianNoiseAutoencoder(object):   
    # define construct function
    def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus, 
                 optimizer=tf.train.AdamOptimizer(), scale = 0.1):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.transfer = transfer_function
        self.scale = tf.placeholder(tf.float32)
        self.training_scale = scale
        network_weights = self._initialize_weights()
        self.weights = network_weights

       # define auto-encoder net structure
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, self.n_input])

        with tf.name_scope('hidden_layr'):
            self.hidden = self.transfer(tf.add(tf.matmul(self.x+scale*tf.random_normal((n_input,)),
                                                     self.weights['w1']), self.weights['b1']))
            tf.summary.histogram('hidden',self.hidden)
           # tf.summary.image('hidden_image',self.hidden)
        with tf.name_scope('output_layr'):
            self.reconstruction = tf.add(tf.matmul(self.hidden,self.weights['w2']), self.weights['b2'])
        # define loss function
        with tf.name_scope('loss_func'):
            self.cost = 0.5*tf.reduce_mean(tf.pow(tf.subtract(self.reconstruction,self.x),2.0))

        self.optimizer = optimizer.minimize(self.cost)
        # initialize all variables
        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)
        self.merged = tf.summary.merge_all()

        # parameter initialize function
    def _initialize_weights(self):
        all_weights = dict()
        all_weights['w1'] = tf.Variable(xavier_init(self.n_input,self.n_hidden))
        all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden],dtype=tf.float32))
        all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input],dtype=tf.float32))
        all_weights['b2'] = tf.Variable(tf.zeros([self.n_input],dtype=tf.float32))
        return all_weights

    # 1 step train function
    def partial_fit(self, X):
        cost, opt, merged = self.sess.run((self.cost, self.optimizer, self.merged),feed_dict={self.x:X, self.scale:self.training_scale})

        return cost, merged

    # loss function
    def calc_total_cost(self,X):
        return self.sess.run(self.cost, feed_dict={self.x:X, self.scale:self.training_scale})

之後在主函式中對訓練資料進行預處理,訓練網路:

if __name__  == '__main__':
    mnist = input_data.read_data_sets('MNIST_DATA', one_hot=True)
    logdir = './auto_encoder_logdir'
    summary_writer = tf.summary.FileWriter(logdir)

    with tf.Graph().as_default():
        # define standard scale fucntion
        def standard_scale(X_train, X_test):
            preprocessor = prep.StandardScaler().fit(X_train)
            X_train = preprocessor.transform(X_train)
            X_test = preprocessor.transform(X_test)
            return X_train, X_test

        # define get random block function
        def get_random_block_from_data(data, batch_size):
            start_index = np.random.randint(0, len(data)-batch_size)
            return data[start_index:(start_index+batch_size)]

        X_train, X_test = standard_scale(mnist.train.images, mnist.test.images)

        n_samples = int(mnist.train.num_examples)        
        training_epochs = 20
        batch_size = 128
        display_step = 2

        autoencoder = AdditiveGaussianNoiseAutoencoder(n_input = 784, 
                                                       n_hidden = 200,
                                                       transfer_function=tf.nn.softplus,
                                                       optimizer = tf.train.AdamOptimizer(learning_rate=0.001),
                                                       scale = 0.01
                                                       )

        # training process

        for epoch in range(training_epochs):
            avg_cost = 0
            total_batch = int(n_samples/batch_size)
            for i in range(total_batch):

                batch_xs = get_random_block_from_data(X_train, batch_size)
                #cost = autoencoder.partial_fit(batch_xs)
                cost, merged = autoencoder.partial_fit(batch_xs)
                summary_writer.add_summary(merged, i)
                avg_cost += cost/n_samples*batch_size 
                if epoch%display_step == 0:
                    print('Epoch:','%04d'%(epoch+1), 'cost=','{:.9f}'.format(avg_cost))

            print('Total cost:'+str(autoencoder.calc_total_cost(X_test)))
        summary_writer.close()

網路訓練完成後,但是tensorboard視覺化的效果並不好,無法顯示網路結構。待修改。另外還有一個問題就是,如何視覺化特徵提取的結果?
未完待續……