1. 程式人生 > 其它 >深入淺出解讀卷積神經網路

深入淺出解讀卷積神經網路

作者:石文華

編輯:田 旭

卷積神經網路

圖1 全連線神經網路結構圖

圖2 卷積神經網路結構圖

卷積神經網路和全連線的神經網路結構上的差異還是比較大的,全連線的網路,相鄰兩層的節點都有邊相連,而卷積神經網路,相鄰節點只有部分節點相連。

全連線神經網路處理影象的最大問題在於全連線層的引數太多,引數太多的話容易發生過擬合而且會導致計算速度減慢,卷積神經網路可以減小引數個數的目的。

假設輸入是一張圖片大小為28*28*3,第一層隱藏層有500個節點,那麼第一層的引數就有28*28*3*500+500=1176000個引數,當圖片更大時,引數就更多了,而且這只是第一層。

那麼為什麼卷積神經網路可以達到減小引數的目的呢?

卷積神經網路最為關鍵的有卷積層,池化層,全連線層。

卷積層

卷積層中每個節點的輸入只是上一層神經網路的一小塊,通常由卷積核來實現,卷積核是一個過濾器,可以想象成一個掃描視窗,扣到每張圖片上,然後根據設定好的大小步長等等掃描圖片,計算規則是被扣的影象畫素矩陣跟卷積核的權重對應位置相乘然後求和,每掃描一次得到一個輸出。卷積層所做的工作可以理解為對影象畫素的每一小塊進行特徵抽象。可以通過多個不同的卷積核對同一張圖片進行卷積,卷積核的個數,其實就是卷積之後輸出矩陣的深度。卷積神經網路的引數個數與圖片大小無關,只跟過濾器的尺寸、深度以及卷積核的個數(輸出矩陣的深度)有關。假設是還是28*28*3的圖片,卷積核的大小設為3*3*3,輸出矩陣的深度為500,那麼引數個數為3*3*3*500+500=14000個引數,對比全連線層,引數減少了很多。

圖3 形象的卷積層示例

池化層

池化層可以認為是將一張高解析度的圖片轉化為低解析度的圖片。可以非常有效的縮小矩陣的尺寸,從而減小全連線層的引數個數,這樣可以加快計算速率同時又防止過擬合,池化,可以減小模型,提高速度,同時提高所提取特徵的魯棒性。

使用2*2的過濾器步長為2,最大池化如下圖所示:

圖4 2*2過濾器最大池化示例圖

我們可以將卷積層和池化層看成是自動特徵提取就可以了。

通過上面直觀的介紹,現在我們就知道為什麼卷積神經網路可以達到減小引數的目的了?

和全連線神經網路相比,卷積神經網路的優勢在於共享權重和稀疏連線。共享權重在於引數只與過濾器有關。卷積神經網路減少引數的另外一個原因是稀疏連線。輸出節點至於輸入圖片矩陣的部分畫素矩陣有關,也就是跟卷積核扣上去的那一小塊矩陣有關。這就是稀疏連線的概念。

卷積神經網路通過權重共享和稀疏連線來減少引數的。從而防止過度擬合。

訓練過程

卷積神經網路的訓練過程大致可分為如下幾步:

第一步:匯入相關庫、載入引數

(左右滑動,檢視完整程式碼)

import math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import h5py
from tensorflow.python.framework import ops
from tf_utils import *
np.random.seed(1)
X_train_orig,Y_train_orig,X_test_orig,Y_test_orig,classes=load_dataset()
index=0
plt.imshow(X_train_orig[index])
print("y="+str(np.squeeze(Y_train_orig[:,index])))
plt.show()

第二步:歸一化,有利於加快梯度下降

(左右滑動,檢視完整程式碼)

X_train=X_train_orig/255.0
X_test=X_test_orig/255.0
Y_train=convert_to_one_hot(Y_train_orig,6)
Y_test=convert_to_one_hot(Y_test_orig,6)

第三步:定義引數及卷積神經網路結構

(左右滑動,檢視完整程式碼)

def create_placeholder(num_px,channel,n_y):
    X=tf.placeholder(tf.float32,shape=(None,num_px,num_px,channel),name='X')
    Y=tf.placeholder(tf.float32,shape=(None,n_y),name='Y')
    return X,Y
X,Y=create_placeholder(64,3,6)
print("X="+str(X))
print("Y="+str(Y))

def weight_variable(shape):
    return tf.Variable(tf.truncated_normal(shape,stddev=0.1))
def bias_variable(shape):
    return tf.Variable(tf.constant(0.1,shape=shape))
def conv2d(x,W):
    return tf.nn.conv2d(x,W,strides=[1,1,1,1],padding='SAME')
def max_pool_2x2(x):
    return tf.nn.max_pool(x,ksize=[1,2,2,1],strides=[1,2,2,1],padding='SAME')

def initialize_parameters():
    w_conv1=weight_variable([5,5,3,32])
    b_conv1=bias_variable([32])
    
    w_conv2=weight_variable([5,5,32,64])
    b_conv2=bias_variable([64])
    
    w_fc1=weight_variable([16*16*64,512])
    b_fc1=bias_variable([512])
    
    w_fc2=weight_variable([512,6])
    b_fc2=bias_variable([6])
    
    parameters={
        "w_conv1":w_conv1,
        "b_conv1":b_conv1,
        "w_conv2":w_conv2,
        "b_conv2":b_conv2,
        "w_fc1":w_fc1,
        "b_fc1":b_fc1,
        "w_fc2":w_fc2,
        "b_fc2":b_fc2
    }
    return parameters

第四步:前行傳播過程

(左右滑動,檢視完整程式碼)

def forward_propagation(X,parameters):
    w_conv1=parameters["w_conv1"]
    b_conv1=parameters["b_conv1"]
    h_conv1=tf.nn.relu(conv2d(X,w_conv1)+b_conv1)
    h_pool1=max_pool_2x2(h_conv1)
    
    w_conv2=parameters["w_conv2"]
    b_conv2=parameters["b_conv2"]
    h_conv2=tf.nn.relu(conv2d(h_pool1,w_conv2)+b_conv2)
    h_pool2=max_pool_2x2(h_conv2)
    
    w_fc1=parameters["w_fc1"]
    b_fc1=parameters["b_fc1"]
    h_pool2_flat=tf.reshape(h_pool2,[-1,16*16*64])
    h_fc1=tf.nn.relu(tf.matmul(h_pool2_flat,w_fc1)+b_fc1)
    
    #keep_prob=tf.placeholder(tf.float32)
    #h_fc1_drop=tf.nn.dropout(h_fc1,keep_prob)
    
    w_fc2=parameters["w_fc2"]
    b_fc2=parameters["b_fc2"]
    y_conv=tf.matmul(h_fc1,w_fc2)+b_fc2
    return y_conv

第五步:成本函式

(左右滑動,檢視完整程式碼)

def compute_cost(y_conv,Y):    cost=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=y_conv,labels=Y))
    return cost

第六步:梯度下降更新引數

(左右滑動,檢視完整程式碼)

def random_mini_batches1(X, Y, mini_batch_size = 64, seed = 0):
    m = X.shape[0]                  # number of training examples
    mini_batches = []
    np.random.seed(seed)
    Y=Y.T         #(1080,6)
    # Step 1: Shuffle (X, Y)
    permutation = list(np.random.permutation(m))
    shuffled_X = X[permutation,:,:,:]
    shuffled_Y = Y[permutation,:].reshape((m,Y.shape[1]))

    # Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case.
    num_complete_minibatches = math.floor(m/mini_batch_size) # number of mini batches of size mini_batch_size in your partitionning
    for k in range(0, num_complete_minibatches):
        mini_batch_X = shuffled_X[k * mini_batch_size : k * mini_batch_size + mini_batch_size,:,:,:]
        mini_batch_Y = shuffled_Y[k * mini_batch_size : k * mini_batch_size + mini_batch_size,:]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)
    
    # Handling the end case (last mini-batch < mini_batch_size)
    if m % mini_batch_size != 0:
        mini_batch_X = shuffled_X[num_complete_minibatches * mini_batch_size : m,:,:,:]
        mini_batch_Y = shuffled_Y[num_complete_minibatches * mini_batch_size : m,:]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)
    
    return mini_batches

第七步:訓練模型

(左右滑動,檢視完整程式碼)

def model(X_train,Y_train,X_test,Y_test,learning_rate=0.001,num_epochs=20,minibatch_size=32,print_cost=True):
    ops.reset_default_graph()  #(1080, 64, 64, 3)
    tf.set_random_seed(1)      #Y_train(6, 1080)
    seed=3
    (m,num_px1,num_px2,c)=X_train.shape
    n_y=Y_train.shape[0]
    costs=[]
    X,Y=create_placeholder(64,3,6)
    parameters=initialize_parameters()
    
    Z3=forward_propagation(X,parameters)
    cost=compute_cost(Z3,Y)
    optm=tf.train.AdamOptimizer(learning_rate).minimize(cost)
    
    correct_prediction=tf.equal(tf.argmax(Z3,1),tf.argmax(Y,1))#居然忘記1了,所以一直出現損失越來越小了,但是準確率卻一直是0
    accuracy=tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
    with tf.Session() as sess:
        tf.global_variables_initializer().run()
        for epoch in range(num_epochs):
            epoch_cost=0
            num_minibatches=int(m/minibatch_size)
            seed+=1
            #下面輸入要求(6,,1080)格式,所以要加個轉置
            minibatches=random_mini_batches1(X_train,Y_train,minibatch_size,seed)
            
            for minibatch in minibatches:
                (minibatch_X,minibatch_Y)=minibatch
                _,minibatch_cost=sess.run([optm,cost],feed_dict={X:minibatch_X,Y:minibatch_Y})
                epoch_cost+=minibatch_cost/num_minibatches
            if(print_cost==True and epoch % 2==0):
                #print("Epoch",'%04d' % (epoch+1),"cost={:.9f}".format(epoch_cost))
                print("Cost after epoch %i:%f" % (epoch,epoch_cost))
            if(print_cost==True and epoch %1==0):
                costs.append(epoch_cost)
                
        print("Train Accuracy:",accuracy.eval({X:X_train,Y:Y_train.T}))
        print("Test Accuracy:",accuracy.eval({X:X_test,Y:Y_test.T}))
        plt.plot(np.squeeze(costs))
        plt.ylabel('cost')
        plt.xlabel('iterations(per tens)')
        plt.title("learning rate="+str(learning_rate))
        plt.show()
        
        parameters=sess.run(parameters)
        return parameters
parameters=model(X_train,Y_train,X_test,Y_test)