深度學習 —— 降噪自動編碼機
降噪自動編碼機(dA)是傳統自動編碼機的延伸,在深度網路 [Vincent08]中被引入,我們先簡單討論一下自動編碼機。
自動編碼機
自動編碼機接受輸入,首先通過確定對映使用編碼機把它對映到隱藏層的y屬於,例如
這裡s是非線性例如sigmoid。隱藏的y,或程式碼然後通過解碼機映射回與x有相同形狀的重構z。這個對映與此前的轉化類似,即
這裡,撇不代表矩陣轉置。z被看作是給定程式碼y對於x的預測。可選的,逆對映的權重矩陣可以被限制為前向對映的轉置,即
這被稱為受限權重。該模型的引數通過優化使得平均重構誤差最小。
重構誤差可以通過多種方式衡量,依賴於給定編碼輸入合適的分佈假設。可以使用傳統的平方差
如果輸入是位元向量或位元可能性向量,可以使用重構的交叉熵
期望編碼y是一個能捕捉資料中變動主要因子座標的分佈體現。這與主成分對映能捕捉資料中變動主要因子類似。確實,如果只有單一線性隱藏層(程式碼)並且使用平均平方差標準訓練網路,那麼k隱藏單元學習對映資料中k主要成分跨度的輸入。如果 隱藏層為非線性,自動編碼機與PCA 表現不同,能夠捕捉輸入分佈的不同模型面。這種不同在我們對疊多個編碼機和相應解碼機以構建深度編碼機時尤為重要。
由於y被是做是對x的簡單壓縮,因此不可能對所有x取得良好效果(損失小)。優化使它對於訓練樣本來說是良好壓縮,並希望對其他輸入也是如此,但並不是對任意輸入都能這樣。這是自動編碼機所謂的泛化:對於與訓練樣本分佈一致的測試樣本它能給予低重構誤差,但對於輸入的隨機樣本一般重構誤差較大。
我們使用Theano來實現一個自動編碼機
def __init__( self, numpy_rng, theano_rng=None, input=None, n_visible=784, n_hidden=500, W=None, bhid=None, bvis=None ): """ Initialize the dA class by specifying the number of visible units (the dimension d of the input ), the number of hidden units ( the dimension d' of the latent or hidden space ) and the corruption level. The constructor also receives symbolic variables for the input, weights and bias. Such a symbolic variables are useful when, for example the input is the result of some computations, or when weights are shared between the dA and an MLP layer. When dealing with SdAs this always happens, the dA on layer 2 gets as input the output of the dA on layer 1, and the weights of the dA are used in the second stage of training to construct an MLP. :type numpy_rng: numpy.random.RandomState :param numpy_rng: number random generator used to generate weights :type theano_rng: theano.tensor.shared_randomstreams.RandomStreams :param theano_rng: Theano random generator; if None is given one is generated based on a seed drawn from `rng` :type input: theano.tensor.TensorType :param input: a symbolic description of the input or None for standalone dA :type n_visible: int :param n_visible: number of visible units :type n_hidden: int :param n_hidden: number of hidden units :type W: theano.tensor.TensorType :param W: Theano variable pointing to a set of weights that should be shared belong the dA and another architecture; if dA should be standalone set this to None :type bhid: theano.tensor.TensorType :param bhid: Theano variable pointing to a set of biases values (for hidden units) that should be shared belong dA and another architecture; if dA should be standalone set this to None :type bvis: theano.tensor.TensorType :param bvis: Theano variable pointing to a set of biases values (for visible units) that should be shared belong dA and another architecture; if dA should be standalone set this to None """ self.n_visible = n_visible self.n_hidden = n_hidden # create a Theano random generator that gives symbolic random values if not theano_rng: theano_rng = RandomStreams(numpy_rng.randint(2 ** 30)) # note : W' was written as `W_prime` and b' as `b_prime` if not W: # W is initialized with `initial_W` which is uniformely sampled # from -4*sqrt(6./(n_visible+n_hidden)) and # 4*sqrt(6./(n_hidden+n_visible))the output of uniform if # converted using asarray to dtype # theano.config.floatX so that the code is runable on GPU initial_W = numpy.asarray( numpy_rng.uniform( low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)), high=4 * numpy.sqrt(6. / (n_hidden + n_visible)), size=(n_visible, n_hidden) ), dtype=theano.config.floatX ) W = theano.shared(value=initial_W, name='W', borrow=True) if not bvis: bvis = theano.shared( value=numpy.zeros( n_visible, dtype=theano.config.floatX ), borrow=True ) if not bhid: bhid = theano.shared( value=numpy.zeros( n_hidden, dtype=theano.config.floatX ), name='b', borrow=True ) self.W = W # b corresponds to the bias of the hidden self.b = bhid # b_prime corresponds to the bias of the visible self.b_prime = bvis # tied weights, therefore W_prime is W transpose self.W_prime = self.W.T self.theano_rng = theano_rng # if no input is given, generate a variable representing the input if input is None: # we use a matrix because we expect a minibatch of several # examples, each example being a row self.x = T.dmatrix(name='input') else: self.x = input self.params = [self.W, self.b, self.b_prime]
注意我們將input作為一個引數傳入,這樣我們可以將不同層自動編碼機連線以構建深度網路,k層的輸出會成為k+1層的輸入。
現在我們來看隱藏表徵和重構訊號的計算
def get_hidden_values(self, input):
""" Computes the values of the hidden layer """
return T.nnet.sigmoid(T.dot(input, self.W) + self.b)
def get_reconstructed_input(self, hidden):
"""Computes the reconstructed input given the values of the
hidden layer
"""
return T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime)
使用上述函式我們可以計算代價以及單步隨機梯度下降的更新
def get_cost_updates(self, corruption_level, learning_rate):
""" This function computes the cost and the updates for one trainng
step of the dA """
tilde_x = self.get_corrupted_input(self.x, corruption_level)
y = self.get_hidden_values(tilde_x)
z = self.get_reconstructed_input(y)
# note : we sum over the size of a datapoint; if we are using
# minibatches, L will be a vector, with one entry per
# example in minibatch
L = - T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1)
# note : L is now a vector, where each element is the
# cross-entropy cost of the reconstruction of the
# corresponding example of the minibatch. We need to
# compute the average of all these to get the cost of
# the minibatch
cost = T.mean(L)
# compute the gradients of the cost of the `dA` with respect
# to its parameters
gparams = T.grad(cost, self.params)
# generate the list of updates
updates = [
(param, param - learning_rate * gparam)
for param, gparam in zip(self.params, gparams)
]
return (cost, updates)
我們現在定義一個函式,以迭代的方式更新引數W,b和b_primes,以使代價最小。
da = dA(
numpy_rng=rng,
theano_rng=theano_rng,
input=x,
n_visible=28 * 28,
n_hidden=500
)
cost, updates = da.get_cost_updates(
corruption_level=0.,
learning_rate=learning_rate
)
train_da = theano.function(
[index],
cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size]
}
)
start_time = timeit.default_timer()
############
# TRAINING #
############
# go through training epochs
for epoch in range(training_epochs):
# go through trainng set
c = []
for batch_index in range(n_train_batches):
c.append(train_da(batch_index))
print('Training epoch %d, cost ' % epoch, numpy.mean(c, dtype='float64'))
end_time = timeit.default_timer()
training_time = (end_time - start_time)
print(('The no corruption code for file ' +
os.path.split(__file__)[1] +
' ran for %.2fm' % ((training_time) / 60.)), file=sys.stderr)
image = Image.fromarray(
tile_raster_images(X=da.W.get_value(borrow=True).T,
img_shape=(28, 28), tile_shape=(10, 10),
tile_spacing=(1, 1)))
image.save('filters_corruption_0.png')
# start-snippet-3
#####################################
# BUILDING THE MODEL CORRUPTION 30% #
#####################################
rng = numpy.random.RandomState(123)
theano_rng = RandomStreams(rng.randint(2 ** 30))
da = dA(
numpy_rng=rng,
theano_rng=theano_rng,
input=x,
n_visible=28 * 28,
n_hidden=500
)
cost, updates = da.get_cost_updates(
corruption_level=0.3,
learning_rate=learning_rate
)
train_da = theano.function(
[index],
cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size]
}
)
start_time = timeit.default_timer()
############
# TRAINING #
############
# go through training epochs
for epoch in range(training_epochs):
# go through trainng set
c = []
for batch_index in range(n_train_batches):
c.append(train_da(batch_index))
print('Training epoch %d, cost ' % epoch, numpy.mean(c, dtype='float64'))
end_time = timeit.default_timer()
training_time = (end_time - start_time)
print(('The 30% corruption code for file ' +
os.path.split(__file__)[1] +
' ran for %.2fm' % (training_time / 60.)), file=sys.stderr)
# end-snippet-3
# start-snippet-4
image = Image.fromarray(tile_raster_images(
X=da.W.get_value(borrow=True).T,
img_shape=(28, 28), tile_shape=(10, 10),
tile_spacing=(1, 1)))
image.save('filters_corruption_30.png')
# end-snippet-4
os.chdir('../')
if __name__ == '__main__':
test_dA()
如果除了最小化重構誤差外沒有其他約束性條件,一個有n輸入和n編碼維度的自動編碼機只會通過把輸入對映到拷貝中來學習自身函式。這樣的自動編碼機無法將測試樣本從其他輸入設定中區分出來。
令人吃驚的 [Bengio07] 實驗報告建議在實踐中,通過隨機梯度下降訓練,使用比輸入多的隱藏單元的非線性自動編碼機可以獲得有用的表徵。這裡有用代表一個將編碼作為輸入的網路具有低區分誤差。
一個簡單的解釋是結合提早停止的隨機梯度下降類似與引數的L2正則化。為實現連續輸入的完美重構,一個具有單一隱藏層、非線性隱藏單元的自動編碼機需要在第一(編碼)層有非常小的權重,以使得隱藏單元的非線性限制線上性區域,而第二(解碼) 層需要非常大的權重。對於二分輸入,最小化重構誤差也需要非常大的權重。考慮到隱性或顯性正則化使得獲得大權重解變得十分困難,優化函式顯示編碼只對與訓練集中相似的樣本有效,而這正是我們需要的。這表明,表徵體現了訓練集中的統計規則呈現,而不是簡單的複製輸入。
還有其他方式防止有超過輸入隱藏單元的自動編碼機學習身份函式,在隱藏表徵中捕捉輸入中的有用資訊。一種是稀疏化(使更多的隱藏單元為零或接近零) [Ranzato07] [Lee08]等做了很多這方面研究。另一種是在輸入到重構的轉化中加入隨機性,這種方式在受限波爾茲機和降噪自動編碼機中得到使用。
降噪自動編碼機
降噪自動編碼機背後的思想非常簡單,強迫隱藏層學到抗干擾性更強的特徵而不是簡單的學到自己。我們訓練自動編碼機重建受損壞的輸入。
降噪自動編碼機是使用了隨機性的自動編碼機。直覺上降噪自動編碼機處理兩個問題:嘗試對輸入進行編碼(儲存輸入的資訊),嘗試隨機的處理損壞問題,後者只能通過捕捉輸入的統計依賴完成。降噪自動編碼機可以從其他不同角度來理解(多折學習,隨機運算,由下而上的資訊理論,由上而下的生成模型)這些在[Vincent08] 中都有解釋。
在[Vincent08] 中,隨機損壞過程隨機地將一些輸入設定成零,有時可達一半。因此降噪自動編碼機試圖從未損壞(未缺失)值預測損壞(缺失)值,通過隨機從損壞模式中選擇子集。注意能從重置中預測任何變數子集是完全捕捉一組變數聯合分佈的充要條件(這也是Gibbs取樣的原理)。
將自動編碼機轉化為降噪自動編碼機我們只要在輸入中加入隨機損失步驟。輸入損失可以用很多方式,但我們使用最基本的隨機使輸入為零的方式。程式碼如下:
def get_corrupted_input(self, input, corruption_level):
"""This function keeps ``1-corruption_level`` entries of the inputs the
same and zero-out randomly selected subset of size ``coruption_level``
Note : first argument of theano.rng.binomial is the shape(size) of
random numbers that it should produce
second argument is the number of trials
third argument is the probability of success of any trial
this will produce an array of 0s and 1s where 1 has a
probability of 1 - ``corruption_level`` and 0 with
``corruption_level``
The binomial function return int64 data type by
default. int64 multiplicated by the input
type(floatX) always return float64. To keep all data
in floatX when floatX is float32, we set the dtype of
the binomial to floatX. As in our case the value of
the binomial is always 0 or 1, this don't change the
result. This is needed to allow the gpu to work
correctly as it only support float32 for now.
"""
return self.theano_rng.binomial(size=input.shape, n=1,
p=1 - corruption_level,
dtype=theano.config.floatX) * input
在多層自動編碼機中dA的權重要在相應的sigmoid層共享,因此dA有指向共享引數的Theano變數。如果引數為None,則會重新構建。
最終的降噪自動編碼機程式碼如下:
class dA(object):
"""Denoising Auto-Encoder class (dA)
A denoising autoencoders tries to reconstruct the input from a corrupted
version of it by projecting it first in a latent space and reprojecting
it afterwards back in the input space. Please refer to Vincent et al.,2008
for more details. If x is the input then equation (1) computes a partially
destroyed version of x by means of a stochastic mapping q_D. Equation (2)
computes the projection of the input into the latent space. Equation (3)
computes the reconstruction of the input, while equation (4) computes the
reconstruction error.
.. math::
\tilde{x} ~ q_D(\tilde{x}|x) (1)
y = s(W \tilde{x} + b) (2)
x = s(W' y + b') (3)
L(x,z) = -sum_{k=1}^d [x_k \log z_k + (1-x_k) \log( 1-z_k)] (4)
"""
def __init__(
self,
numpy_rng,
theano_rng=None,
input=None,
n_visible=784,
n_hidden=500,
W=None,
bhid=None,
bvis=None
):
"""
Initialize the dA class by specifying the number of visible units (the
dimension d of the input ), the number of hidden units ( the dimension
d' of the latent or hidden space ) and the corruption level. The
constructor also receives symbolic variables for the input, weights and
bias. Such a symbolic variables are useful when, for example the input
is the result of some computations, or when weights are shared between
the dA and an MLP layer. When dealing with SdAs this always happens,
the dA on layer 2 gets as input the output of the dA on layer 1,
and the weights of the dA are used in the second stage of training
to construct an MLP.
:type numpy_rng: numpy.random.RandomState
:param numpy_rng: number random generator used to generate weights
:type theano_rng: theano.tensor.shared_randomstreams.RandomStreams
:param theano_rng: Theano random generator; if None is given one is
generated based on a seed drawn from `rng`
:type input: theano.tensor.TensorType
:param input: a symbolic description of the input or None for
standalone dA
:type n_visible: int
:param n_visible: number of visible units
:type n_hidden: int
:param n_hidden: number of hidden units
:type W: theano.tensor.TensorType
:param W: Theano variable pointing to a set of weights that should be
shared belong the dA and another architecture; if dA should
be standalone set this to None
:type bhid: theano.tensor.TensorType
:param bhid: Theano variable pointing to a set of biases values (for
hidden units) that should be shared belong dA and another
architecture; if dA should be standalone set this to None
:type bvis: theano.tensor.TensorType
:param bvis: Theano variable pointing to a set of biases values (for
visible units) that should be shared belong dA and another
architecture; if dA should be standalone set this to None
"""
self.n_visible = n_visible
self.n_hidden = n_hidden
# create a Theano random generator that gives symbolic random values
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
# note : W' was written as `W_prime` and b' as `b_prime`
if not W:
# W is initialized with `initial_W` which is uniformely sampled
# from -4*sqrt(6./(n_visible+n_hidden)) and
# 4*sqrt(6./(n_hidden+n_visible))the output of uniform if
# converted using asarray to dtype
# theano.config.floatX so that the code is runable on GPU
initial_W = numpy.asarray(
numpy_rng.uniform(
low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),
high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
W = theano.shared(value=initial_W, name='W', borrow=True)
if not bvis:
bvis = theano.shared(
value=numpy.zeros(
n_visible,
dtype=theano.config.floatX
),
borrow=True
)
if not bhid:
bhid = theano.shared(
value=numpy.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.W = W
# b corresponds to the bias of the hidden
self.b = bhid
# b_prime corresponds to the bias of the visible
self.b_prime = bvis
# tied weights, therefore W_prime is W transpose
self.W_prime = self.W.T
self.theano_rng = theano_rng
# if no input is given, generate a variable representing the input
if input is None:
# we use a matrix because we expect a minibatch of several
# examples, each example being a row
self.x = T.dmatrix(name='input')
else:
self.x = input
self.params = [self.W, self.b, self.b_prime]
def get_corrupted_input(self, input, corruption_level):
"""This function keeps ``1-corruption_level`` entries of the inputs the
same and zero-out randomly selected subset of size ``coruption_level``
Note : first argument of theano.rng.binomial is the shape(size) of
random numbers that it should produce
second argument is the number of trials
third argument is the probability of success of any trial
this will produce an array of 0s and 1s where 1 has a
probability of 1 - ``corruption_level`` and 0 with
``corruption_level``
The binomial function return int64 data type by
default. int64 multiplicated by the input
type(floatX) always return float64. To keep all data
in floatX when floatX is float32, we set the dtype of
the binomial to floatX. As in our case the value of
the binomial is always 0 or 1, this don't change the
result. This is needed to allow the gpu to work
correctly as it only support float32 for now.
"""
return self.theano_rng.binomial(size=input.shape, n=1,
p=1 - corruption_level,
dtype=theano.config.floatX) * input
def get_hidden_values(self, input):
""" Computes the values of the hidden layer """
return T.nnet.sigmoid(T.dot(input, self.W) + self.b)
def get_reconstructed_input(self, hidden):
"""Computes the reconstructed input given the values of the
hidden layer
"""
return T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime)
def get_cost_updates(self, corruption_level, learning_rate):
""" This function computes the cost and the updates for one trainng
step of the dA """
tilde_x = self.get_corrupted_input(self.x, corruption_level)
y = self.get_hidden_values(tilde_x)
z = self.get_reconstructed_input(y)
# note : we sum over the size of a datapoint; if we are using
# minibatches, L will be a vector, with one entry per
# example in minibatch
L = - T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1)
# note : L is now a vector, where each element is the
# cross-entropy cost of the reconstruction of the
# corresponding example of the minibatch. We need to
# compute the average of all these to get the cost of
# the minibatch
cost = T.mean(L)
# compute the gradients of the cost of the `dA` with respect
# to its parameters
gparams = T.grad(cost, self.params)
# generate the list of updates
updates = [
(param, param - learning_rate * gparam)
for param, gparam in zip(self.params, gparams)
]
return (cost, updates)
彙總
通過dA類構建並訓練一個例項非常簡單
# allocate symbolic variables for the data
index = T.lscalar() # index to a [mini]batch
x = T.matrix('x') # the data is presented as rasterized images
#####################################
# BUILDING THE MODEL CORRUPTION 30% #
#####################################
rng = numpy.random.RandomState(123)
theano_rng = RandomStreams(rng.randint(2 ** 30))
da = dA(
numpy_rng=rng,
theano_rng=theano_rng,
input=x,
n_visible=28 * 28,
n_hidden=500
)
cost, updates = da.get_cost_updates(
corruption_level=0.3,
learning_rate=learning_rate
)
train_da = theano.function(
[index],
cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size]
}
)
start_time = timeit.default_timer()
############
# TRAINING #
############
# go through training epochs
for epoch in range(training_epochs):
# go through trainng set
c = []
for batch_index in range(n_train_batches):
c.append(train_da(batch_index))
print('Training epoch %d, cost ' % epoch, numpy.mean(c, dtype='float64'))
end_time = timeit.default_timer()
training_time = (end_time - start_time)
print(('The 30% corruption code for file ' +
os.path.split(__file__)[1] +
' ran for %.2fm' % (training_time / 60.)), file=sys.stderr)
為了瞭解網路學習到的內容我們圖示過濾器(由權重矩陣定義)。注意這裡我們忽略了偏差並且將權重經倍數常數處理(權重轉化為0和1之間)。
我們使用PIL中的tile_raster_images,下列程式碼將過濾器儲存為影象:
image = Image.fromarray(tile_raster_images(
X=da.W.get_value(borrow=True).T,
img_shape=(28, 28), tile_shape=(10, 10),
tile_spacing=(1, 1)))
image.save('filters_corruption_30.png')
執行程式碼
python dA.py
不使用噪聲的過濾器
使用30%噪聲的過濾器