####好好好#######讓Keras更酷一些!Keras模型雜談
Keras伴我走來 #
回想起進入機器學習領域的這兩三年來,Keras是一直陪伴在筆者的身邊。要不是當初剛掉進這個坑時碰到了Keras這個這麼易用的框架,能快速實現我的想法,我也不確定我是否能有毅力堅持下來,畢竟當初是theano、pylearn、caffe、torch等的天下,哪怕在今天它們對我來說仍然像天書一般。
後來為了拓展視野,我也去學習了一段時間的tensorflow,用純tensorflow寫過若干程式,但不管怎樣,仍然無法割捨Keras。隨著對Keras的瞭解的深入,尤其是花了一點時間研究過Keras的原始碼後,我發現Keras並沒有大家詬病的那樣“欠缺靈活性”。事實上,Keras那精巧的封裝,可以讓我們輕鬆實現很多複雜的功能。我越來越感覺,Keras像是一件非常精美的藝術品,充分體現了Keras的開發者們深厚的創作功力。
本文介紹Keras中自定義模型的一些內容,相對而言,這屬於Keras進階的內容,剛入門的朋友請暫時忽略。
層的自定義 #
這裡介紹Keras中自定義層及其一些運用技巧。
基本定義方法 #
在Keras中,自定義層的最簡單方法是通過Lambda層的方式:
from keras.layers import *
from keras import backend as K
x_in = Input(shape=(10,))
x = Lambda(lambda x: x+2)(x_in) # 對輸入加上2
有時候,我們希望區分訓練階段和測試階段,比如訓練階段給輸入加入一些噪聲,而測試階段則去掉噪聲,這需要用K.in_train_phase實現,比如
def add_noise_in_train(x):
x_ = x + K.random_normal(shape=K.shape(x)) # 加上標準高斯噪聲
return K.in_train_phase(x_, x)
x_in = Input(shape=(10,))
x = Lambda(add_noise_in_train)(x_in) # 訓練階段加入高斯噪聲,測試階段去掉
當然,Lambda層僅僅適用於不需要增加訓練引數的情形,如果想要實現的功能需要往模型新增引數,那麼就必須要用到自定義Layer了。其實這也不復雜,相比於Lambda層只不過程式碼多了幾行,官方文章已經寫得很清楚了:
這裡把它頁面上的例子搬過來:
class MyLayer(Layer):
def __init__(self, output_dim, **kwargs):
self.output_dim = output_dim # 可以自定義一些屬性,方便呼叫
super(MyLayer, self).__init__(**kwargs) # 必須
def build(self, input_shape):
# 新增可訓練引數
self.kernel = self.add_weight(name='kernel',
shape=(input_shape[1], self.output_dim),
initializer='uniform',
trainable=True)
def call(self, x):
# 定義功能,相當於Lambda層的功能函式
return K.dot(x, self.kernel)
def compute_output_shape(self, input_shape):
# 計算輸出形狀,如果輸入和輸出形狀一致,那麼可以省略,否則最好加上
return (input_shape[0], self.output_dim)
雙輸出的層 #
平時我們碰到的所有層,幾乎都是單輸出的,包括Keras中自帶的所有層,都是一個或者多個輸入,然後返回一個結果輸出的。那麼Keras可不可以定義雙輸出的層呢?答案是可以,但要明確定義好output_shape,比如下面這個層,簡單地將輸入切開分兩半,並且同時返回。
class SplitVector(Layer):
def __init__(self, **kwargs):
super(SplitVector, self).__init__(**kwargs)
def call(self, inputs):
# 按第二個維度對tensor進行切片,返回一個list
in_dim = K.int_shape(inputs)[-1]
return [inputs[:, :in_dim//2], inputs[:, in_dim//2:]]
def compute_output_shape(self, input_shape):
# output_shape也要是對應的list
in_dim = input_shape[-1]
return [(None, in_dim//2), (None, in_dim-in_dim//2)]
x1, x2 = SplitVector()(x_in) # 使用方法
層與loss的結合 #
有了《Keras中自定義複雜的loss函式》一文經驗的讀者可以知道,Keras中對loss的基本定義是一個輸入為y_true和y_pred函式。但在比較複雜的情況下,它不僅僅是預測值和目標值的函式,還可以結合權重進行復雜的運算。
這裡再次以center loss為例,介紹一種基於自定義層的寫法。
class Dense_with_Center_loss(Layer):
def __init__(self, output_dim, **kwargs):
self.output_dim = output_dim
super(Dense_with_Center_loss, self).__init__(**kwargs)
def build(self, input_shape):
# 新增可訓練引數
self.kernel = self.add_weight(name='kernel',
shape=(input_shape[1], self.output_dim),
initializer='glorot_normal',
trainable=True)
self.bias = self.add_weight(name='bias',
shape=(self.output_dim,),
initializer='zeros',
trainable=True)
self.centers = self.add_weight(name='centers',
shape=(self.output_dim, input_shape[1]),
initializer='glorot_normal',
trainable=True)
def call(self, inputs):
# 對於center loss來說,返回結果還是跟Dense的返回結果一致
# 所以還是普通的矩陣乘法加上偏置
self.inputs = inputs
return K.dot(inputs, self.kernel) + self.bias
def compute_output_shape(self, input_shape):
return (input_shape[0], self.output_dim)
def loss(self, y_true, y_pred, lamb=0.5):
# 定義完整的loss
y_true = K.cast(y_true, 'int32') # 保證y_true的dtype為int32
crossentropy = K.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)
centers = K.gather(self.centers, y_true[:, 0]) # 取出樣本中心
center_loss = K.sum(K.square(centers - self.inputs), axis=1) # 計算center loss
return crossentropy + lamb * center_loss
f_size = 2
x_in = Input(shape=(784,))
f = Dense(f_size)(x_in)
dense_center = Dense_with_Center_loss(10)
output = dense_center(f)
model = Model(x_in, output)
model.compile(loss=dense_center.loss,
optimizer='adam',
metrics=['sparse_categorical_accuracy'])
# 這裡是y_train是類別的整數id,不用轉為one hot
model.fit(x_train, y_train, epochs=10)
花式回撥器 #
除了修改模型,我們還可能在訓練過程中做很多事情,比如每個epoch結束後,算一下驗證集的指標,儲存最優模型,還有可能在多少個epoch後就降低學習率,或者修改正則項引數,等等,這些都可以通過回撥器來實現。
回撥器官方頁:https://keras.io/callbacks/
儲存最優模型 #
在Keras中,根據驗證集的指標來保留最優模型,最簡便的方法是通過自帶的ModelCheckpoint,比如
checkpoint = ModelCheckpoint(filepath='./best_model.weights',
monitor='val_acc',
verbose=1,
save_best_only=True)
model.fit(x_train,
y_train,
epochs=10,
validation_data=(x_test, y_test),
callbacks=[checkpoint])
然而,這種方法雖然簡單,但是有一個明顯的缺點,就是裡邊的指標是由compile的metrics來確定的,而Keres中自定義一個metric,需要寫成張量運算才行,也就是說如果你期望的指標並不能寫成張量運算(比如bleu等指標),那麼就沒法寫成一個metric函數了,也就不能用這個方案了。
於是,一個萬能的方案就出來了:自己寫回調器,愛算什麼就算什麼。比如:
from keras.callbacks import Callback
def evaluate(): # 評測函式
pred = model.predict(x_test)
return np.mean(pred.argmax(axis=1) == y_test) # 愛算啥就算啥
# 定義Callback器,計算驗證集的acc,並儲存最優模型
class Evaluate(Callback):
def __init__(self):
self.accs = []
self.highest = 0.
def on_epoch_end(self, epoch, logs=None):
acc = evaluate()
self.accs.append(acc)
if acc >= self.highest: # 儲存最優模型權重
self.highest = acc
model.save_weights('best_model.weights')
# 愛執行什麼就執行什麼
print 'acc: %s, highest: %s' % (acc, self.highest)
evaluator = Evaluate()
model.fit(x_train,
y_train,
epochs=10,
callbacks=[evaluator])
修改超引數 #
訓練過程中還有可能對超引數進行微調,比如最常見的一個需求是根據epoch來調整學習率,這可以簡單地通過LearningRateScheduler來實現,它也屬於回撥器之一。
from keras.callbacks import LearningRateScheduler
def lr_schedule(epoch):
# 根據epoch返回不同的學習率
if epoch < 50:
lr = 1e-2
elif epoch < 80:
lr = 1e-3
else:
lr = 1e-4
return lr
lr_scheduler = LearningRateScheduler(lr_schedule)
model.fit(x_train,
y_train,
epochs=10,
callbacks=[evaluator, lr_scheduler])
如果是其他超引數呢?比如前面center loss的lamb,或者是類似的正則項。這種情況下,我們需要將lamb設為一個Variable,然後自定義一個回撥器來動態賦值。比如當初我定義的一個loss:
def mycrossentropy(y_true, y_pred, e=0.1):
loss1 = K.categorical_crossentropy(y_true, y_pred)
loss2 = K.categorical_crossentropy(K.ones_like(y_pred)/nb_classes, y_pred)
return (1-e)*loss1 + e*loss2
如果要動態改變引數e,那麼可以改為
e = K.variable(0.1)
def mycrossentropy(y_true, y_pred):
loss1 = K.categorical_crossentropy(y_true, y_pred)
loss2 = K.categorical_crossentropy(K.ones_like(y_pred)/nb_classes, y_pred)
return (1-e)*loss1 + e*loss2
model.compile(loss=mycrossentropy,
optimizer='adam')
class callback4e(Callback):
def __init__(self, e):
self.e = e
def on_epoch_end(self, epoch, logs={}):
if epoch > 100: # 100個epoch之後設為0.01
K.set_value(self.e, 0.01)
model.fit(x_train,
y_train,
epochs=10,
callbacks=[callback4e(e)])
注意Callback類共支援六種在不同階段的執行函式:on_epoch_begin、on_epoch_end、on_batch_begin、on_batch_end、on_train_begin、on_train_end,每個函式所執行的階段不一樣(根據名字很容易判斷),可以組合起來實現很複雜的功能。比如warmup,就是指設定了預設學習率後,並不是一開始就用這個學習率訓練,而是在前幾個epoch中,從零慢慢增加到預設的學習率,這個過程可以理解為在為模型調整更好的初始化。參考程式碼:
class Evaluate(Callback):
def __init__(self):
self.num_passed_batchs = 0
self.warmup_epochs = 10
def on_batch_begin(self, batch, logs=None):
# params是模型自動傳遞給Callback的一些引數
if self.params['steps'] == None:
self.steps_per_epoch = np.ceil(1. * self.params['samples'] / self.params['batch_size'])
else:
self.steps_per_epoch = self.params['steps']
if self.num_passed_batchs < self.steps_per_epoch * self.warmup_epochs:
# 前10個epoch中,學習率線性地從零增加到0.001
K.set_value(self.model.optimizer.lr,
0.001 * (self.num_passed_batchs + 1) / self.steps_per_epoch / self.warmup_epochs)
self.num_passed_batchs += 1
Keras無限可能 #
Keras還有很多可圈可點的技巧,比如可以直接利用model.add_loss來靈活地增加loss,還有模型巢狀呼叫、純粹作為tensorflow的簡單上層api,等等,就不一一整理了,歡迎有疑問、有興趣的讀者留言討論。
通常我們認為Keras這樣的高度封裝的庫,靈活性是比較欠缺的,但事實上不然。要知道,Keras並不是簡單地呼叫tensorflow或者theano中現成的上層函式,而僅僅是通過backend來封裝了一些基本的函式,然後把所有的東西(各種層、優化器等)用自己的backend重寫了一遍!也正是如此,它才能支援切換不同的後段。
能做到這個程度,Keras的靈活性是不容置喙的,但是這種靈活性在幫助文件和普通的案例中比較難體現,很多時候要閱讀原始碼,才能感覺到Keras那樣的寫法已經無可挑剔了。我感覺,用Keras實現複雜的模型,既是一種挑戰,又像是一種藝術創作,當你成功時,你就會陶醉於你創造出來的藝術品了。
轉載到請包括本文地址:https://kexue.fm/archives/5765