1. 程式人生 > >keras實現多種分類網路的實現

keras實現多種分類網路的實現

Keras應該是最簡單的一種深度學習框架了,入門非常的簡單.

簡單記錄一下keras實現多種分類網路:如AlexNet、Vgg、ResNet

採用kaggle貓狗大戰的資料作為資料集.

由於AlexNet採用的是LRN標準化,Keras沒有內建函式實現,這裡用batchNormalization代替

收件建立一個model.py的檔案,裡面存放著alexnet,vgg兩種模型,直接匯入就可以了

#coding=utf-8
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, ZeroPadding2D, BatchNormalization
from keras.layers import *
from keras.layers.advanced_activations import LeakyReLU,PReLU
from keras.models import Model

def keras_batchnormalization_relu(layer):
    BN = BatchNormalization()(layer)
    ac = PReLU()(BN)
    return ac


def AlexNet(resize=227, classes=2):
    model = Sequential()
    # 第一段
    model.add(Conv2D(filters=96, kernel_size=(11, 11),
                     strides=(4, 4), padding='valid',
                     input_shape=(resize, resize, 3),
                     activation='relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(3, 3),
                           strides=(2, 2),
                           padding='valid'))
    # 第二段
    model.add(Conv2D(filters=256, kernel_size=(5, 5),
                     strides=(1, 1), padding='same',
                     activation='relu'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(3, 3),
                           strides=(2, 2),
                           padding='valid'))
    # 第三段
    model.add(Conv2D(filters=384, kernel_size=(3, 3),
                     strides=(1, 1), padding='same',
                     activation='relu'))
    model.add(Conv2D(filters=384, kernel_size=(3, 3),
                     strides=(1, 1), padding='same',
                     activation='relu'))
    model.add(Conv2D(filters=256, kernel_size=(3, 3),
                     strides=(1, 1), padding='same',
                     activation='relu'))
    model.add(MaxPooling2D(pool_size=(3, 3),
                           strides=(2, 2), padding='valid'))
    # 第四段
    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(1000, activation='relu'))
    model.add(Dropout(0.5))

    # Output Layer
    model.add(Dense(classes,activation='softmax'))
    # model.add(Activation('softmax'))

    return model


def AlexNet2(inputs, classes=2, prob=0.5):
    '''
    自己寫的函式,嘗試keras另外一種寫法
    :param inputs: 輸入
    :param classes: 類別的個數
    :param prob: dropout的概率
    :return: 模型
    '''
    # Conv2D(32, (3, 3), dilation_rate=(2, 2), padding='same')(inputs)
    print "input shape:", inputs.shape

    conv1 = Conv2D(filters=96, kernel_size=(11, 11), strides=(4, 4), padding='valid')(inputs)
    conv1 = keras_batchnormalization_relu(conv1)
    print "conv1 shape:", conv1.shape
    pool1 = MaxPool2D(pool_size=(3, 3), strides=(2, 2))(conv1)
    print "pool1 shape:", pool1.shape

    conv2 = Conv2D(filters=256, kernel_size=(5, 5), padding='same')(pool1)
    conv2 = keras_batchnormalization_relu(conv2)
    print "conv2 shape:", conv2.shape
    pool2 = MaxPool2D(pool_size=(3, 3), strides=(2, 2))(conv2)
    print "pool2 shape:", pool2.shape

    conv3 = Conv2D(filters=384, kernel_size=(3, 3), padding='same')(pool2)
    conv3 = PReLU()(conv3)
    print "conv3 shape:", conv3.shape

    conv4 = Conv2D(filters=384, kernel_size=(3, 3), padding='same')(conv3)
    conv4 = PReLU()(conv4)
    print "conv4 shape:", conv4

    conv5 = Conv2D(filters=256, kernel_size=(3, 3), padding='same')(conv4)
    conv5 = PReLU()(conv5)
    print "conv5 shape:", conv5

    pool3 = MaxPool2D(pool_size=(3, 3), strides=(2, 2))(conv5)
    print "pool3 shape:", pool3.shape

    dense1 = Flatten()(pool3)
    dense1 = Dense(4096, activation='relu')(dense1)
    print "dense2 shape:", dense1
    dense1 = Dropout(prob)(dense1)
    # print "dense1 shape:", dense1

    dense2 = Dense(4096, activation='relu')(dense1)
    print "dense2 shape:", dense2
    dense2 = Dropout(prob)(dense2)
    # print "dense2 shape:", dense2

    predict= Dense(classes, activation='softmax')(dense2)

    model = Model(inputs=inputs, outputs=predict)
    return model


def vgg13(resize=224, classes=2, prob=0.5):
    model = Sequential()
    model.add(Conv2D(64, (3, 3), strides=(1, 1), input_shape=(resize, resize, 3), padding='same', activation='relu',
                     kernel_initializer='uniform'))
    model.add(Conv2D(64, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, (3, 2), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(128, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(prob))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(prob))
    model.add(Dense(classes, activation='softmax'))
    return model

def vgg16(resize=224, classes=2, prob=0.5):
    model = Sequential()
    model.add(Conv2D(64, (3, 3), strides=(1, 1), input_shape=(resize, resize, 3), padding='same', activation='relu',
                     kernel_initializer='uniform'))
    model.add(Conv2D(64, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, (3, 2), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(128, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(Conv2D(512, (3, 3), strides=(1, 1), padding='same', activation='relu', kernel_initializer='uniform'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(prob))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(prob))
    model.add(Dense(classes, activation='softmax'))
    return model

然後建立一個train.py檔案,用於讀取資料和訓練資料的.

#coding=utf-8
import keras
import cv2
import os
import numpy as np
import model
import modelResNet
import tensorflow as tf
from keras.layers import Input, Dense
from keras.preprocessing.image import ImageDataGenerator

resize = 224
batch_size = 128
path = "/home/hjxu/PycharmProjects/01_cats_vs_dogs/data"

trainDirectory = '/home/hjxu/PycharmProjects/01_cats_vs_dogs/data/train/'
def load_data():
    imgs = os.listdir(path + "/train/")
    num = len(imgs)
    train_data = np.empty((5000, resize, resize, 3), dtype="int32")
    train_label = np.empty((5000, ), dtype="int32")
    test_data = np.empty((5000, resize, resize, 3), dtype="int32")
    test_label = np.empty((5000, ), dtype="int32")
    for i in range(5000):
        if i % 2:
            train_data[i] = cv2.resize(cv2.imread(path + '/train/' + 'dog.' + str(i) + '.jpg'), (resize, resize))
            train_label[i] = 1
        else:
            train_data[i] = cv2.resize(cv2.imread(path + '/train/' + 'cat.' + str(i) + '.jpg'), (resize, resize))
            train_label[i] = 0
    for i in range(5000, 10000):
        if i % 2:
            test_data[i-5000] = cv2.resize(cv2.imread(path + '/train/' + 'dog.' + str(i) + '.jpg'), (resize, resize))
            test_label[i-5000] = 1
        else:
            test_data[i-5000] = cv2.resize(cv2.imread(path + '/train/' + 'cat.' + str(i) + '.jpg'), (resize, resize))
            test_label[i-5000] = 0
    return train_data, train_label, test_data, test_label


def main():

    train_data, train_label, test_data, test_label = load_data()
    train_data, test_data = train_data.astype('float32'), test_data.astype('float32')
    train_data, test_data = train_data/255, test_data/255

    train_label = keras.utils.to_categorical(train_label, 2)
    '''
      #one_hot轉碼,如果使用 categorical_crossentropy,就需要用到to_categorical函式完成轉碼
    '''


    test_label = keras.utils.to_categorical(test_label, 2)

    inputs = Input(shape=(224, 224, 3))

    modelAlex = model.AlexNet2(inputs, classes=2)
    '''
    匯入模型
    '''


    modelAlex.compile(loss='categorical_crossentropy',
                  optimizer='sgd',
                  metrics=['accuracy'])
    '''
    def compile(self, optimizer, loss, metrics=None, loss_weights=None,
                    sample_weight_mode=None, **kwargs):

        optimizer:優化器,為預定義優化器名或優化器物件,參考優化器
        loss: 損失函式,為預定義損失函式名或者一個目標函式
        metrics:列表,包含評估模型在訓練和測試時的效能指標,典型用法是 metrics=['accuracy']
        sample_weight_mode:如果需要按時間步為樣本賦值,需要將改制設定為"temoral"
        如果想用自定義的效能評估函式:如下
         def mean_pred(y_true, y_pred):
            return k.mean(y_pred)

        model.compile(loss = 'binary_crossentropy', metrics=['accuracy', mean_pred],...)
        損失函式同理,再看 keras內建支援的損失函式有
         mean_squared_error
        mean_absolute_error
        mean_absolute_percentage_error
        mean_squared_logarithmic_error
        squared_hinge
        hinge
        categorical_hinge
        logcosh
        categorical_crossentropy
        sparse_categorical_crossentropy
        binary_crossentropy
        kullback_leibler_divergence
        poisson
        cosine_proximity

    '''
    modelAlex.summary()
    '''
    # 列印模型資訊
    '''

    modelAlex.fit(train_data, train_label,
              batch_size=batch_size,
              epochs=50,
              validation_split=0.2,
              shuffle=True)
    '''
    def fit(self, x=None,            # x:輸入資料
            y=None,                  # y:標籤 Numpy array
            batch_size=32,           # batch_size:訓練時,一個batch的樣本會被計算一次梯度下降
            epochs=1,                # epochs: 訓練的輪數,每個epoch會把訓練集迴圈一遍
            verbose=1,               # 日誌顯示:0表示不在標準輸入輸出流輸出,1表示輸出進度條,2表示每個epoch輸出
            callbacks=None,          # 回撥函式
            validation_split=0.,     # 0-1的浮點數,用來指定訓練集一定比例作為驗證集,驗證集不參與訓練
            validation_data=None,    # (x,y)的tuple,是指定的驗證集
            shuffle=True,            # 如果是"batch",則是用來處理HDF5資料的特殊情況,將在batch內部將資料打亂
            class_weight=None,       # 字典,將不同的類別對映為不同的權值,用來在訓練過程中調整損失函式的
            sample_weight=None,      # 權值的numpy array,用於訓練的時候調整損失函式
            initial_epoch=0,         # 該引數用於從指定的epoch開始訓練,繼續之前的訓練
            **kwargs):

    返回:返回一個History的物件,其中History.history損失函式和其他指標的數值隨epoch變化的情況

    '''
    scores = modelAlex.evaluate(train_data, train_label, verbose=1)
    print(scores)

    scores = modelAlex.evaluate(test_data, test_label, verbose=1)
    print(scores)
    modelAlex.save('my_model_weights2.h5')

def main2():
    train_datagen = ImageDataGenerator(rescale=1. / 255,
                                       shear_range=0.2,
                                       zoom_range=0.2,
                                       horizontal_flip=True)
    test_datagen = ImageDataGenerator(rescale=1. / 255)
    train_generator = train_datagen.flow_from_directory(trainDirectory,
                                                        target_size=(224, 224),
                                                        batch_size=32,
                                                        class_mode='binary')

    validation_generator = test_datagen.flow_from_directory(trainDirectory,
                                                            target_size=(224, 224),
                                                            batch_size=32,
                                                            class_mode='binary')

    inputs = Input(shape=(224, 224, 3))
    # modelAlex = model.AlexNet2(inputs, classes=2)
    modelAlex = model.vgg13(resize=224, classes=2, prob=0.5)
    # modelAlex = modelResNet.ResNet50(shape=224, classes=2)
    modelAlex.compile(loss='sparse_categorical_crossentropy',
                      optimizer='sgd',
                      metrics=['accuracy'])
    modelAlex.summary()

    modelAlex.fit_generator(train_generator,
                        steps_per_epoch=1000,
                        epochs=60,
                        validation_data=validation_generator,
                        validation_steps=200)

    modelAlex.save('model32.hdf5')
    #



if __name__ == "__main__":
    '''
    如果資料是按照貓狗大戰的資料,都在同一個資料夾下,使用main()函式
    如果資料按照貓和狗分成兩類,則使用main2()函式
    '''
    main2()

得到模型後該怎麼測試一張影象呢?

建立一個testOneImg.py指令碼,程式碼如下

#coding=utf-8
from keras.preprocessing.image import load_img#load_image作用是載入圖片
from keras.preprocessing.image import img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.applications.vgg16 import decode_predictions
import numpy as np
import cv2
import model
from keras.models import Sequential

pats = '/home/hjxu/tf_study/catVsDogsWithKeras/my_model_weights.h5'
modelAlex = model.AlexNet(resize=224, classes=2)
# AlexModel = model.AlexNet(weightPath='/home/hjxu/tf_study/catVsDogsWithKeras/my_model_weights.h5')

modelAlex.load_weights(pats)
#
img = cv2.imread('/home/hjxu/tf_study/catVsDogsWithKeras/111.jpg')
img = cv2.resize(img, (224, 224))
x = img_to_array(img/255) # 三維(224,224,3)

x = np.expand_dims(x, axis=0) # 四維(1,224,224,3)#因為keras要求的維度是這樣的,所以要增加一個維度
# x = preprocess_input(x) # 預處理
print(x.shape)
y_pred = modelAlex.predict(x) # 預測概率 t1 = time.time() print("測試圖:", decode_predictions(y_pred)) # 輸出五個最高概率(類名, 語義概念, 預測概率)
print y_pred

不得不說,Keras真心簡單方便.