1. 程式人生 > >####好好好#######讓Keras更酷一些!Keras模型雜談

####好好好#######讓Keras更酷一些!Keras模型雜談

Keras伴我走來 #

回想起進入機器學習領域的這兩三年來,Keras是一直陪伴在筆者的身邊。要不是當初剛掉進這個坑時碰到了Keras這個這麼易用的框架,能快速實現我的想法,我也不確定我是否能有毅力堅持下來,畢竟當初是theano、pylearn、caffe、torch等的天下,哪怕在今天它們對我來說仍然像天書一般。

後來為了拓展視野,我也去學習了一段時間的tensorflow,用純tensorflow寫過若干程式,但不管怎樣,仍然無法割捨Keras。隨著對Keras的瞭解的深入,尤其是花了一點時間研究過Keras的原始碼後,我發現Keras並沒有大家詬病的那樣“欠缺靈活性”。事實上,Keras那精巧的封裝,可以讓我們輕鬆實現很多複雜的功能。我越來越感覺,Keras像是一件非常精美的藝術品,充分體現了Keras的開發者們深厚的創作功力。

本文介紹Keras中自定義模型的一些內容,相對而言,這屬於Keras進階的內容,剛入門的朋友請暫時忽略。

層的自定義 #

這裡介紹Keras中自定義層及其一些運用技巧。

基本定義方法 #

在Keras中,自定義層的最簡單方法是通過Lambda層的方式:

from keras.layers import *
from keras import backend as K

x_in = Input(shape=(10,))
x = Lambda(lambda x: x+2)(x_in) # 對輸入加上2

有時候,我們希望區分訓練階段和測試階段,比如訓練階段給輸入加入一些噪聲,而測試階段則去掉噪聲,這需要用K.in_train_phase實現,比如

def add_noise_in_train(x):
    x_ = x + K.random_normal(shape=K.shape(x)) # 加上標準高斯噪聲
    return K.in_train_phase(x_, x)

x_in = Input(shape=(10,))
x = Lambda(add_noise_in_train)(x_in) # 訓練階段加入高斯噪聲,測試階段去掉

當然,Lambda層僅僅適用於不需要增加訓練引數的情形,如果想要實現的功能需要往模型新增引數,那麼就必須要用到自定義Layer了。其實這也不復雜,相比於Lambda層只不過程式碼多了幾行,官方文章已經寫得很清楚了:

https://keras.io/layers/writing-your-own-keras-layers/

這裡把它頁面上的例子搬過來:

class MyLayer(Layer):

    def __init__(self, output_dim, **kwargs):
        self.output_dim = output_dim # 可以自定義一些屬性,方便呼叫
        super(MyLayer, self).__init__(**kwargs) # 必須

    def build(self, input_shape):
        # 新增可訓練引數
        self.kernel = self.add_weight(name='kernel', 
                                      shape=(input_shape[1], self.output_dim),
                                      initializer='uniform',
                                      trainable=True)

    def call(self, x):
        # 定義功能,相當於Lambda層的功能函式
        return K.dot(x, self.kernel)

    def compute_output_shape(self, input_shape):
        # 計算輸出形狀,如果輸入和輸出形狀一致,那麼可以省略,否則最好加上
        return (input_shape[0], self.output_dim)

雙輸出的層 #

平時我們碰到的所有層,幾乎都是單輸出的,包括Keras中自帶的所有層,都是一個或者多個輸入,然後返回一個結果輸出的。那麼Keras可不可以定義雙輸出的層呢?答案是可以,但要明確定義好output_shape,比如下面這個層,簡單地將輸入切開分兩半,並且同時返回。

class SplitVector(Layer):

    def __init__(self, **kwargs):
        super(SplitVector, self).__init__(**kwargs)

    def call(self, inputs):
        # 按第二個維度對tensor進行切片,返回一個list
        in_dim = K.int_shape(inputs)[-1]
        return [inputs[:, :in_dim//2], inputs[:, in_dim//2:]]

    def compute_output_shape(self, input_shape):
        # output_shape也要是對應的list
        in_dim = input_shape[-1]
        return [(None, in_dim//2), (None, in_dim-in_dim//2)]


x1, x2 = SplitVector()(x_in) # 使用方法

層與loss的結合 #

有了《Keras中自定義複雜的loss函式》一文經驗的讀者可以知道,Keras中對loss的基本定義是一個輸入為y_true和y_pred函式。但在比較複雜的情況下,它不僅僅是預測值和目標值的函式,還可以結合權重進行復雜的運算。

這裡再次以center loss為例,介紹一種基於自定義層的寫法。

class Dense_with_Center_loss(Layer):

    def __init__(self, output_dim, **kwargs):
        self.output_dim = output_dim
        super(Dense_with_Center_loss, self).__init__(**kwargs)

    def build(self, input_shape):
        # 新增可訓練引數
        self.kernel = self.add_weight(name='kernel',
                                      shape=(input_shape[1], self.output_dim),
                                      initializer='glorot_normal',
                                      trainable=True)
        self.bias = self.add_weight(name='bias',
                                    shape=(self.output_dim,),
                                    initializer='zeros',
                                    trainable=True)
        self.centers = self.add_weight(name='centers',
                                       shape=(self.output_dim, input_shape[1]),
                                       initializer='glorot_normal',
                                       trainable=True)

    def call(self, inputs):
        # 對於center loss來說,返回結果還是跟Dense的返回結果一致
        # 所以還是普通的矩陣乘法加上偏置
        self.inputs = inputs
        return K.dot(inputs, self.kernel) + self.bias

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)

    def loss(self, y_true, y_pred, lamb=0.5):
        # 定義完整的loss
        y_true = K.cast(y_true, 'int32') # 保證y_true的dtype為int32
        crossentropy = K.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)
        centers = K.gather(self.centers, y_true[:, 0]) # 取出樣本中心
        center_loss = K.sum(K.square(centers - self.inputs), axis=1) # 計算center loss
        return crossentropy + lamb * center_loss


f_size = 2

x_in = Input(shape=(784,))
f = Dense(f_size)(x_in)

dense_center = Dense_with_Center_loss(10)
output = dense_center(f)

model = Model(x_in, output)
model.compile(loss=dense_center.loss,
              optimizer='adam',
              metrics=['sparse_categorical_accuracy'])

# 這裡是y_train是類別的整數id,不用轉為one hot
model.fit(x_train, y_train, epochs=10)

花式回撥器 #

除了修改模型,我們還可能在訓練過程中做很多事情,比如每個epoch結束後,算一下驗證集的指標,儲存最優模型,還有可能在多少個epoch後就降低學習率,或者修改正則項引數,等等,這些都可以通過回撥器來實現。

回撥器官方頁:https://keras.io/callbacks/

儲存最優模型 #

在Keras中,根據驗證集的指標來保留最優模型,最簡便的方法是通過自帶的ModelCheckpoint,比如

checkpoint = ModelCheckpoint(filepath='./best_model.weights',
                             monitor='val_acc',
                             verbose=1,
                             save_best_only=True)

model.fit(x_train,
          y_train,
          epochs=10,
          validation_data=(x_test, y_test),
          callbacks=[checkpoint])

然而,這種方法雖然簡單,但是有一個明顯的缺點,就是裡邊的指標是由compile的metrics來確定的,而Keres中自定義一個metric,需要寫成張量運算才行,也就是說如果你期望的指標並不能寫成張量運算(比如bleu等指標),那麼就沒法寫成一個metric函數了,也就不能用這個方案了。

於是,一個萬能的方案就出來了:自己寫回調器,愛算什麼就算什麼。比如:

from keras.callbacks import Callback

def evaluate(): # 評測函式
    pred = model.predict(x_test)
    return np.mean(pred.argmax(axis=1) == y_test) # 愛算啥就算啥


# 定義Callback器,計算驗證集的acc,並儲存最優模型
class Evaluate(Callback):

    def __init__(self):
        self.accs = []
        self.highest = 0.

    def on_epoch_end(self, epoch, logs=None):
        acc = evaluate()
        self.accs.append(acc)
        if acc >= self.highest: # 儲存最優模型權重
            self.highest = acc
            model.save_weights('best_model.weights')

        # 愛執行什麼就執行什麼
        print 'acc: %s, highest: %s' % (acc, self.highest)


evaluator = Evaluate()
model.fit(x_train,
          y_train,
          epochs=10,
          callbacks=[evaluator])

修改超引數 #

訓練過程中還有可能對超引數進行微調,比如最常見的一個需求是根據epoch來調整學習率,這可以簡單地通過LearningRateScheduler來實現,它也屬於回撥器之一。

from keras.callbacks import LearningRateScheduler


def lr_schedule(epoch):
    # 根據epoch返回不同的學習率
    if epoch < 50:
        lr = 1e-2
    elif epoch < 80:
        lr = 1e-3
    else:
        lr = 1e-4
    return lr


lr_scheduler = LearningRateScheduler(lr_schedule)

model.fit(x_train,
          y_train,
          epochs=10,
          callbacks=[evaluator, lr_scheduler])

如果是其他超引數呢?比如前面center loss的lamb,或者是類似的正則項。這種情況下,我們需要將lamb設為一個Variable,然後自定義一個回撥器來動態賦值。比如當初我定義的一個loss:

def mycrossentropy(y_true, y_pred, e=0.1):
    loss1 = K.categorical_crossentropy(y_true, y_pred)
    loss2 = K.categorical_crossentropy(K.ones_like(y_pred)/nb_classes, y_pred)
    return (1-e)*loss1 + e*loss2

如果要動態改變引數e,那麼可以改為

e = K.variable(0.1)

def mycrossentropy(y_true, y_pred):
    loss1 = K.categorical_crossentropy(y_true, y_pred)
    loss2 = K.categorical_crossentropy(K.ones_like(y_pred)/nb_classes, y_pred)
    return (1-e)*loss1 + e*loss2


model.compile(loss=mycrossentropy,
              optimizer='adam')


class callback4e(Callback):
    def __init__(self, e):
        self.e = e
    def on_epoch_end(self, epoch, logs={}):
        if epoch > 100: # 100個epoch之後設為0.01 
            K.set_value(self.e, 0.01)

model.fit(x_train,
          y_train,
          epochs=10,
          callbacks=[callback4e(e)])

注意Callback類共支援六種在不同階段的執行函式:on_epoch_begin、on_epoch_end、on_batch_begin、on_batch_end、on_train_begin、on_train_end,每個函式所執行的階段不一樣(根據名字很容易判斷),可以組合起來實現很複雜的功能。比如warmup,就是指設定了預設學習率後,並不是一開始就用這個學習率訓練,而是在前幾個epoch中,從零慢慢增加到預設的學習率,這個過程可以理解為在為模型調整更好的初始化。參考程式碼:

class Evaluate(Callback):
    def __init__(self):
        self.num_passed_batchs = 0
        self.warmup_epochs = 10
    def on_batch_begin(self, batch, logs=None):
        # params是模型自動傳遞給Callback的一些引數
        if self.params['steps'] == None:
            self.steps_per_epoch = np.ceil(1. * self.params['samples'] / self.params['batch_size'])
        else:
            self.steps_per_epoch = self.params['steps']
        if self.num_passed_batchs < self.steps_per_epoch * self.warmup_epochs:
            # 前10個epoch中,學習率線性地從零增加到0.001
            K.set_value(self.model.optimizer.lr,
                        0.001 * (self.num_passed_batchs + 1) / self.steps_per_epoch / self.warmup_epochs)
            self.num_passed_batchs += 1

Keras無限可能 #

Keras還有很多可圈可點的技巧,比如可以直接利用model.add_loss來靈活地增加loss,還有模型巢狀呼叫、純粹作為tensorflow的簡單上層api,等等,就不一一整理了,歡迎有疑問、有興趣的讀者留言討論。

通常我們認為Keras這樣的高度封裝的庫,靈活性是比較欠缺的,但事實上不然。要知道,Keras並不是簡單地呼叫tensorflow或者theano中現成的上層函式,而僅僅是通過backend來封裝了一些基本的函式,然後把所有的東西(各種層、優化器等)用自己的backend重寫了一遍!也正是如此,它才能支援切換不同的後段。

能做到這個程度,Keras的靈活性是不容置喙的,但是這種靈活性在幫助文件和普通的案例中比較難體現,很多時候要閱讀原始碼,才能感覺到Keras那樣的寫法已經無可挑剔了。我感覺,用Keras實現複雜的模型,既是一種挑戰,又像是一種藝術創作,當你成功時,你就會陶醉於你創造出來的藝術品了。

轉載到請包括本文地址:https://kexue.fm/archives/5765