1. 程式人生 > >深度有趣 | 25 影象標題生成

深度有趣 | 25 影象標題生成

簡介

介紹基於注意力機制的影象標題生成模型的原理和實現

原理

輸入是一張圖片,輸出是一句對圖片進行描述的文字,這就是影象標題生成

基本思路是先通過預訓練的影象分類模型,從某一個卷積層得到原始圖片的表示,或者稱為上下文contexts

例如從VGG19的conv5_3拿到原始圖片的表示,shape為14*14*512,即512張14*14的小圖

這樣一來,可以理解為將原始圖片分成14*14共196個小塊,每個小塊對應一個512維的特徵

影象標題生成模型結構圖

根據contexts使用LSTM逐步生成單詞,即可產生原始圖片對應的描述文字

在生成每一個單詞時,應該對196個塊有不同的偏重,即所謂的注意力機制

就像我們人一樣,考慮下一個詞時,對圖片的不同區域會有不同的關注度,相關性更強的區域會獲得更多的注意力即更高的權重

根據注意力權重對196個512維的特徵進行加權求和,即可得到基於注意力機制的上下文context

和之前介紹過的Seq2Seq Learning聯絡起來,影象標題生成便屬於one to many這種情況

資料

使用COCO2014資料,http://cocodataset.org/#download,訓練集包括8W多張圖片,驗證集包括4W多張圖片,並且提供了每張圖片對應的標題

每張圖片的標題不止一個,因此訓練集一共411593個標題,而驗證集一共201489個標題,平均一張圖片五個標題

實現

訓練

首先是訓練部分程式碼

載入庫

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.utils import shuffle
from imageio import imread
import scipy.io
import cv2
import os
import json
from tqdm import tqdm
import pickle

載入資料,因為一張圖片可能對應多個標題,因此以一個圖片id和一個標題為一條資料。對於圖片內容,保留中心正方形區域並縮放;對於標題,長度超過20個詞則去除

batch_size = 128
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

def load_data(image_dir, annotation_path):
    with open(annotation_path, 'r') as fr:
        annotation = json.load(fr)
        
    ids = []
    captions = []
    image_dict = {}
    for i in tqdm(range(len(annotation['annotations']))):
        item = annotation['annotations'][i]
        caption = item['caption'].strip().lower()
        caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '')
        caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split()
        caption = [w for w in caption if len(w) > 0]
        
        if len(caption) <= maxlen:
            if not item['image_id'] in image_dict:
                img = imread(image_dir + '%012d.jpg' % item['image_id'])
                h = img.shape[0]
                w = img.shape[1]
                if h > w:
                    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
                else:
                    img = img[:, w // 2 - h // 2: w // 2 + h // 2]   
                img = cv2.resize(img, (image_size, image_size))
                
                if len(img.shape) < 3:
                    img = np.expand_dims(img, -1)
                    img = np.concatenate([img, img, img], axis=-1)
                
                image_dict[item['image_id']] = img
            
            ids.append(item['image_id'])
            captions.append(caption)
    
    return ids, captions, image_dict

train_json = 'data/train/captions_train2014.json'
train_ids, train_captions, train_dict = load_data('data/train/images/COCO_train2014_', train_json)
print(len(train_ids))

檢視一下標題標註

data_index = np.arange(len(train_ids))
np.random.shuffle(data_index)
N = 4
data_index = data_index[:N]
plt.figure(figsize=(12, 20))
for i in range(N):
    caption = train_captions[data_index[i]]
    img = train_dict[train_ids[data_index[i]]]
    plt.subplot(4, 1, i + 1)
    plt.imshow(img)
    plt.title(' '.join(caption))
    plt.axis('off')

整理詞典,一共23728個詞,建立詞和id之間的對映,並使用到三個特殊詞

vocabulary = {}
for caption in train_captions:
    for word in caption:
        vocabulary[word] = vocabulary.get(word, 0) + 1

vocabulary = sorted(vocabulary.items(), key=lambda x:-x[1])
vocabulary = [w[0] for w in vocabulary]

word2id = {'<pad>': 0, '<start>': 1, '<end>': 2}
for i, w in enumerate(vocabulary):
    word2id[w] = i + 3
id2word = {i: w for w, i in word2id.items()}

print(len(vocabulary), vocabulary[:20])

with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([vocabulary, word2id, id2word], fw)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + '.'

將標題轉換為id序列

def convert_captions(data):
    result = []
    for caption in data:
        vector = [word2id['<start>']]
        for word in caption:
            if word in word2id:
                vector.append(word2id[word])
        vector.append(word2id['<end>'])
        result.append(vector)
        
    array = np.zeros((len(data), maxlen + 2), np.int32)
    for i in tqdm(range(len(result))):
        array[i, :len(result[i])] = result[i]
    return array

train_captions = convert_captions(train_captions)
print(train_captions.shape)
print(train_captions[0])
print(translate(train_captions[0]))

使用影象風格遷移中用過的imagenet-vgg-verydeep-19.mat來提取影象特徵,載入vgg19模型並定義一個函式,對於給定的輸入,返回vgg19各個層的輸出值,通過variable_scope實現網路的重用,將conv5_3的輸出作為原始圖片的表示

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']
print(encoded)

基於以上contexts,實現初始化、詞嵌入、特徵對映等部分

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=True, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=True)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

Y = tf.placeholder(tf.int32, [None, maxlen + 2])
Y_in = Y[:, :-1]
Y_out = Y[:, 1:]
mask = tf.to_float(tf.not_equal(Y_out, word2id['<pad>']))

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    state = dense(context_mean, hidden_size, name='initial_state')
    memory = dense(context_mean, hidden_size, name='initial_memory')
    
with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, Y_in)
    
with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)
loss = 0
alphas = []

依次生成標題中的每個詞,包括計算注意力和context、計算選擇器、lstm處理、計算輸出、計算損失函式幾個部分

for t in range(maxlen + 1):
    with tf.variable_scope('attend'):
        h0 = dense(state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
        h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
        h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
        h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
        h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block
        
        alpha = tf.nn.softmax(h0) # batch_size, num_block
        # contexts:                 batch_size, num_block, num_filter
        # tf.expand_dims(alpha, 2): batch_size, num_block, 1
        context = tf.reduce_sum(contexts * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter
        alphas.append(alpha)
        
    with tf.variable_scope('selector'):
        beta = dense(state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
        context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter
        
    with tf.variable_scope('lstm'):
        h0 = tf.concat([embedded[:, t, :], context], 1) # batch_size, embedding_size + num_filter
        _, (memory, state) = lstm(inputs=h0, state=[memory, state])
    
    with tf.variable_scope('decode'):
        h0 = dropout(state)
        h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
        h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
        h0 += embedded[:, t, :]
        h0 = tf.nn.tanh(h0)
        
        h0 = dropout(h0)
        logits = dense(h0, len(word2id), activation=None, name='fc_logits')
    
    loss += tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=Y_out[:, t], logits=logits) * mask[:, t])
    tf.get_variable_scope().reuse_variables()

在損失函式中加入注意力正則項,定義優化器

alphas = tf.transpose(tf.stack(alphas), (1, 0, 2)) # batch_size, maxlen + 1, num_block
alphas = tf.reduce_sum(alphas, 1) # batch_size, num_block
attention_loss = tf.reduce_sum(((maxlen + 1) / num_block - alphas) ** 2)
total_loss = (loss + attention_loss) / batch_size

with tf.variable_scope('optimizer', reuse=tf.AUTO_REUSE):
    global_step = tf.Variable(0, trainable=False)
    vars_t = [var for var in tf.trainable_variables() if not var.name.startswith('endpoints')]
    train = tf.contrib.layers.optimize_loss(total_loss, global_step, 0.001, 'Adam', clip_gradients=5.0, variables=vars_t)

訓練模型,將一些tensor的值寫入events檔案,便於後續使用tensorboard檢視

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()
OUTPUT_DIR = 'model'
if not os.path.exists(OUTPUT_DIR):
    os.mkdir(OUTPUT_DIR)

tf.summary.scalar('losses/loss', loss)
tf.summary.scalar('losses/attention_loss', attention_loss)
tf.summary.scalar('losses/total_loss', total_loss)
summary = tf.summary.merge_all()
writer = tf.summary.FileWriter(OUTPUT_DIR)

epochs = 20
for e in range(epochs):
    train_ids, train_captions = shuffle(train_ids, train_captions)
    for i in tqdm(range(len(train_ids) // batch_size)):
        X_batch = np.array([train_dict[x] for x in train_ids[i * batch_size: i * batch_size + batch_size]])
        Y_batch = train_captions[i * batch_size: i * batch_size + batch_size]

        _ = sess.run(train, feed_dict={X: X_batch, Y: Y_batch})

        if i > 0 and i % 100 == 0:
            writer.add_summary(sess.run(summary, 
                                        feed_dict={X: X_batch, Y: Y_batch}), 
                                        e * len(train_ids) // batch_size + i)
            writer.flush()
            
    saver.save(sess, os.path.join(OUTPUT_DIR, 'image_caption'))

使用以下命令可以在tensorboard中檢視歷史訓練資料

tensorboard --logdir=model

經過20輪訓練後,損失函式曲線如下

影象標題生成模型訓練損失函式

驗證

接下來是驗證部分程式碼,即在驗證集上生成每張圖片的標題,然後和標註進行對比和評估

在生成每一個詞的時候,可以選擇概率最大的詞,即貪婪的做法,但不一定最優,因為當前概率最大的詞並不能保證之後產生的序列整體概率最大

也不能像中文分詞中那樣使用viterbi演算法,因為viterbi演算法要求整個序列的概率分佈已知,才能使用動態規劃找到最大概率路徑

但生成標題的時候,是一個詞一個詞地生成,而且選擇的類別等於詞典的大小,遠遠超出中文分詞序列標註中的四分類,因此不可能窮盡所有可能的序列

一種折中的做法是使用beam search,涉及一個引數beam size,舉個例子,當beam size等於3時

  • 生成第一個詞時,保留概率最大的三個詞
  • 生成第二個詞時,在以上三個詞的基礎上,進一步生成九個詞,保留九個序列中概率最大的三個
  • 生成第n個詞時,基於上一步保留下來的三個序列,進一步生成九個詞,保留新的九個序列中概率最大的三個
  • 就好比一棵樹,每一次所有的樹枝都會進一步長出三個子樹枝,然後對於所有樹枝,保留最好的三個,其他全部砍掉
  • 重複以上過程,直到生成了結束詞,或者生成的序列達到了最大長度

驗證部分的大多數程式碼和訓練部分相同

載入庫

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.utils import shuffle
from imageio import imread
import scipy.io
import cv2
import os
import json
from tqdm import tqdm
import pickle

載入資料

batch_size = 128
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

def load_data(image_dir, annotation_path):
    with open(annotation_path, 'r') as fr:
        annotation = json.load(fr)
        
    ids = []
    captions = []
    image_dict = {}
    for i in tqdm(range(len(annotation['annotations']))):
        item = annotation['annotations'][i]
        caption = item['caption'].strip().lower()
        caption = caption.replace('.', '').replace(',', '').replace("'", '').replace('"', '')
        caption = caption.replace('&', 'and').replace('(', '').replace(')', '').replace('-', ' ').split()
        caption = [w for w in caption if len(w) > 0]
        
        if len(caption) <= maxlen:
            if not item['image_id'] in image_dict:
                img = imread(image_dir + '%012d.jpg' % item['image_id'])
                h = img.shape[0]
                w = img.shape[1]
                if h > w:
                    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
                else:
                    img = img[:, w // 2 - h // 2: w // 2 + h // 2]   
                img = cv2.resize(img, (image_size, image_size))
                
                if len(img.shape) < 3:
                    img = np.expand_dims(img, -1)
                    img = np.concatenate([img, img, img], axis=-1)
                
                image_dict[item['image_id']] = img
            
            ids.append(item['image_id'])
            captions.append(caption)
    
    return ids, captions, image_dict

val_json = 'data/val/captions_val2014.json'
val_ids, val_captions, val_dict = load_data('data/val/images/COCO_val2014_', val_json)
print(len(val_ids))

整理正確答案

gt = {}
for i in tqdm(range(len(val_ids))):
    val_id = val_ids[i]
    if not val_id in gt:
        gt[val_id] = []
    gt[val_id].append(' '.join(val_captions[i]) + ' .')
print(len(gt))

載入訓練部分整理好的詞典

with open('dictionary.pkl', 'rb') as fr:
    [vocabulary, word2id, id2word] = pickle.load(fr)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + ' .'

載入vgg19模型

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']
print(encoded)

驗證部分需要定義幾個placeholder,因為要使用到beam search,所以每生成一個詞就需要輸入一些相關的值

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=False)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    initial_state = dense(context_mean, hidden_size, name='initial_state')
    initial_memory = dense(context_mean, hidden_size, name='initial_memory')
    
contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])
last_memory = tf.placeholder(tf.float32, [None, hidden_size])
last_state = tf.placeholder(tf.float32, [None, hidden_size])
last_word = tf.placeholder(tf.int32, [None])

with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, last_word)
    
with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)

生成部分,不需要迴圈,重複一次即可,後面進行beam search時再進行多次輸入資料並得到輸出

with tf.variable_scope('attend'):
    h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
    h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
    h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
    h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
    h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block

    alpha = tf.nn.softmax(h0) # batch_size, num_block
    # contexts:                 batch_size, num_block, num_filter
    # tf.expand_dims(alpha, 2): batch_size, num_block, 1
    context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter

with tf.variable_scope('selector'):
    beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
    context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter

with tf.variable_scope('lstm'):
    h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter
    _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])

with tf.variable_scope('decode'):
    h0 = dropout(current_state)
    h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
    h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
    h0 += embedded
    h0 = tf.nn.tanh(h0)

    h0 = dropout(h0)
    logits = dense(h0, len(word2id), activation=None, name='fc_logits')
    probs = tf.nn.softmax(logits)

載入訓練好的模型,對每個batch的資料進行beam search,依次生成每一個詞

這裡beam size設為1主要是為了節省時間,驗證共花了10個小時,具體應用時可以適當加大beam size

MODEL_DIR = 'model'
sess = tf.Session()
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))

beam_size = 1
id2sentence = {}

val_ids = list(set(val_ids))
if len(val_ids) % batch_size != 0:
    for i in range(batch_size - len(val_ids) % batch_size):
        val_ids.append(val_ids[0])
print(len(val_ids))

for i in tqdm(range(len(val_ids) // batch_size)):
    X_batch = np.array([val_dict[x] for x in val_ids[i * batch_size: i * batch_size + batch_size]])
    contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_batch})

    result = []
    complete = []
    for b in range(batch_size):
        result.append([{
            'sentence': [], 
            'memory': initial_memory_[b], 
            'state': initial_state_[b],
            'score': 1.0,
            'alphas': []
        }])
        complete.append([])

    for t in range(maxlen + 1):
        cache = [[] for b in range(batch_size)]
        step = 1 if t == 0 else beam_size
        for s in range(step):
            if t == 0:
                last_word_ = np.ones([batch_size], np.int32) * word2id['<start>']
            else:
                last_word_ = np.array([result[b][s]['sentence'][-1] for b in range(batch_size)], np.int32)

            last_memory_ = np.array([result[b][s]['memory'] for b in range(batch_size)], np.float32)
            last_state_ = np.array([result[b][s]['state'] for b in range(batch_size)], np.float32)

            current_memory_, current_state_, probs_, alpha_ = sess.run(
                [current_memory, current_state, probs, alpha], feed_dict={
                    contexts_phr: contexts_, 
                    last_memory: last_memory_,
                    last_state: last_state_,
                    last_word: last_word_
                    })
            
            for b in range(batch_size):
                word_and_probs = [[w, p] for w, p in enumerate(probs_[b])]
                word_and_probs.sort(key=lambda x:-x[1])
                word_and_probs = word_and_probs[:beam_size + 1]

                for w, p in word_and_probs:
                    item = {
                        'sentence': result[b][s]['sentence'] + [w], 
                        'memory': current_memory_[b], 
                        'state': current_state_[b],
                        'score': result[b][s]['score'] * p,
                        'alphas': result[b][s]['alphas'] + [alpha_[b]]
                    }

                    if id2word[w] == '<end>':
                        complete[b].append(item)
                    else:
                        cache[b].append(item)
        
        for b in range(batch_size):
            cache[b].sort(key=lambda x:-x['score'])
            cache[b] = cache[b][:beam_size]
        result = cache.copy()
    
    for b in range(batch_size):
        if len(complete[b]) == 0:
            final_sentence = result[b][0]['sentence']
        else:
            final_sentence = complete[b][0]['sentence']
        
        val_id = val_ids[i * batch_size + b] 
        if not val_id in id2sentence:
            id2sentence[val_id] = [translate(final_sentence)]

print(len(id2sentence))

將標題生成結果寫入檔案,便於後續評估

with open('generated.txt', 'w') as fw:
    for i in id2sentence.keys():
        fw.write(str(i) + '^' + id2sentence[i][0] + '^' + '_'.join(gt[i]) + '\n')

其中BLEU在影象標題生成、機器翻譯等任務中用得比較多,可以簡單理解為1-gram、2-gram、3-gram、4-gram的命中率

from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.cider.cider import Cider

id2sentence = {}
gt = {}
with open('generated.txt', 'r') as fr:
    lines = fr.readlines()
    for line in lines:
        line = line.strip('\n').split('^')
        i = line[0]
        id2sentence[i] = [line[1]]
        gt[i] = line[2].split('_')

scorers = [
    (Bleu(4), ['Bleu_1', 'Bleu_2', 'Bleu_3', 'Bleu_4']),
    (Rouge(), 'ROUGE_L'),
    (Cider(), 'CIDEr')
]

for scorer, name in scorers:
    score, _ = scorer.compute_score(gt, id2sentence)
    if type(score) == list:
        for n, s in zip(name, score):
            print(n, s)
    else:
        print(name, score)

評估結果如下,適當加大beam size可以進一步提高各項指標

  • Bleu_1:0.6878
  • Bleu_2:0.4799
  • Bleu_3:0.3347
  • Bleu_4:0.2355
  • ROUGE: 0.5304
  • CIDEr: 0.7293

使用

最後,通過以下程式碼在本機上使用訓練好的模型,為任意圖片生成標題

整體程式碼結構和驗證部分比較類似,但是由於只需要對一張圖片生成標題,所以beam search部分的程式碼簡化很多

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from imageio import imread
import scipy.io
import cv2
import os
import pickle

batch_size = 1
maxlen = 20
image_size = 224

MEAN_VALUES = np.array([123.68, 116.779, 103.939]).reshape((1, 1, 3))

with open('dictionary.pkl', 'rb') as fr:
    [vocabulary, word2id, id2word] = pickle.load(fr)

def translate(ids):
    words = [id2word[i] for i in ids if i >= 3]
    return ' '.join(words) + ' .'

vgg = scipy.io.loadmat('imagenet-vgg-verydeep-19.mat')
vgg_layers = vgg['layers']

def vgg_endpoints(inputs, reuse=None):
    with tf.variable_scope('endpoints', reuse=reuse):
        def _weights(layer, expected_layer_name):
            W = vgg_layers[0][layer][0][0][2][0][0]
            b = vgg_layers[0][layer][0][0][2][0][1]
            layer_name = vgg_layers[0][layer][0][0][0][0]
            assert layer_name == expected_layer_name
            return W, b

        def _conv2d_relu(prev_layer, layer, layer_name):
            W, b = _weights(layer, layer_name)
            W = tf.constant(W)
            b = tf.constant(np.reshape(b, (b.size)))
            return tf.nn.relu(tf.nn.conv2d(prev_layer, filter=W, strides=[1, 1, 1, 1], padding='SAME') + b)

        def _avgpool(prev_layer):
            return tf.nn.avg_pool(prev_layer, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

        graph = {}
        graph['conv1_1']  = _conv2d_relu(inputs, 0, 'conv1_1')
        graph['conv1_2']  = _conv2d_relu(graph['conv1_1'], 2, 'conv1_2')
        graph['avgpool1'] = _avgpool(graph['conv1_2'])
        graph['conv2_1']  = _conv2d_relu(graph['avgpool1'], 5, 'conv2_1')
        graph['conv2_2']  = _conv2d_relu(graph['conv2_1'], 7, 'conv2_2')
        graph['avgpool2'] = _avgpool(graph['conv2_2'])
        graph['conv3_1']  = _conv2d_relu(graph['avgpool2'], 10, 'conv3_1')
        graph['conv3_2']  = _conv2d_relu(graph['conv3_1'], 12, 'conv3_2')
        graph['conv3_3']  = _conv2d_relu(graph['conv3_2'], 14, 'conv3_3')
        graph['conv3_4']  = _conv2d_relu(graph['conv3_3'], 16, 'conv3_4')
        graph['avgpool3'] = _avgpool(graph['conv3_4'])
        graph['conv4_1']  = _conv2d_relu(graph['avgpool3'], 19, 'conv4_1')
        graph['conv4_2']  = _conv2d_relu(graph['conv4_1'], 21, 'conv4_2')
        graph['conv4_3']  = _conv2d_relu(graph['conv4_2'], 23, 'conv4_3')
        graph['conv4_4']  = _conv2d_relu(graph['conv4_3'], 25, 'conv4_4')
        graph['avgpool4'] = _avgpool(graph['conv4_4'])
        graph['conv5_1']  = _conv2d_relu(graph['avgpool4'], 28, 'conv5_1')
        graph['conv5_2']  = _conv2d_relu(graph['conv5_1'], 30, 'conv5_2')
        graph['conv5_3']  = _conv2d_relu(graph['conv5_2'], 32, 'conv5_3')
        graph['conv5_4']  = _conv2d_relu(graph['conv5_3'], 34, 'conv5_4')
        graph['avgpool5'] = _avgpool(graph['conv5_4'])

        return graph

X = tf.placeholder(tf.float32, [None, image_size, image_size, 3])
encoded = vgg_endpoints(X - MEAN_VALUES)['conv5_3']

k_initializer = tf.contrib.layers.xavier_initializer()
b_initializer = tf.constant_initializer(0.0)
e_initializer = tf.random_uniform_initializer(-1.0, 1.0)

def dense(inputs, units, activation=tf.nn.tanh, use_bias=True, name=None):
    return tf.layers.dense(inputs, units, activation, use_bias,
                           kernel_initializer=k_initializer, bias_initializer=b_initializer, name=name)

def batch_norm(inputs, name):
    return tf.contrib.layers.batch_norm(inputs, decay=0.95, center=True, scale=True, is_training=False, 
                                        updates_collections=None, scope=name)

def dropout(inputs):
    return tf.layers.dropout(inputs, rate=0.5, training=False)

num_block = 14 * 14
num_filter = 512
hidden_size = 1024
embedding_size = 512

encoded = tf.reshape(encoded, [-1, num_block, num_filter]) # batch_size, num_block, num_filter
contexts = batch_norm(encoded, 'contexts')

with tf.variable_scope('initialize'):
    context_mean = tf.reduce_mean(contexts, 1)
    initial_state = dense(context_mean, hidden_size, name='initial_state')
    initial_memory = dense(context_mean, hidden_size, name='initial_memory')
    
contexts_phr = tf.placeholder(tf.float32, [None, num_block, num_filter])
last_memory = tf.placeholder(tf.float32, [None, hidden_size])
last_state = tf.placeholder(tf.float32, [None, hidden_size])
last_word = tf.placeholder(tf.int32, [None])

with tf.variable_scope('embedding'):
    embeddings = tf.get_variable('weights', [len(word2id), embedding_size], initializer=e_initializer)
    embedded = tf.nn.embedding_lookup(embeddings, last_word)
    
with tf.variable_scope('projected'):
    projected_contexts = tf.reshape(contexts_phr, [-1, num_filter]) # batch_size * num_block, num_filter
    projected_contexts = dense(projected_contexts, num_filter, activation=None, use_bias=False, name='projected_contexts')
    projected_contexts = tf.reshape(projected_contexts, [-1, num_block, num_filter]) # batch_size, num_block, num_filter

lstm = tf.nn.rnn_cell.BasicLSTMCell(hidden_size)

with tf.variable_scope('attend'):
    h0 = dense(last_state, num_filter, activation=None, name='fc_state') # batch_size, num_filter
    h0 = tf.nn.relu(projected_contexts + tf.expand_dims(h0, 1)) # batch_size, num_block, num_filter
    h0 = tf.reshape(h0, [-1, num_filter]) # batch_size * num_block, num_filter
    h0 = dense(h0, 1, activation=None, use_bias=False, name='fc_attention') # batch_size * num_block, 1
    h0 = tf.reshape(h0, [-1, num_block]) # batch_size, num_block

    alpha = tf.nn.softmax(h0) # batch_size, num_block
    # contexts:                 batch_size, num_block, num_filter
    # tf.expand_dims(alpha, 2): batch_size, num_block, 1
    context = tf.reduce_sum(contexts_phr * tf.expand_dims(alpha, 2), 1, name='context') # batch_size, num_filter

with tf.variable_scope('selector'):
    beta = dense(last_state, 1, activation=tf.nn.sigmoid, name='fc_beta') # batch_size, 1
    context = tf.multiply(beta, context, name='selected_context')  # batch_size, num_filter

with tf.variable_scope('lstm'):
    h0 = tf.concat([embedded, context], 1) # batch_size, embedding_size + num_filter
    _, (current_memory, current_state) = lstm(inputs=h0, state=[last_memory, last_state])

with tf.variable_scope('decode'):
    h0 = dropout(current_state)
    h0 = dense(h0, embedding_size, activation=None, name='fc_logits_state')
    h0 += dense(context, embedding_size, activation=None, use_bias=False, name='fc_logits_context')
    h0 += embedded
    h0 = tf.nn.tanh(h0)

    h0 = dropout(h0)
    logits = dense(h0, len(word2id), activation=None, name='fc_logits')
    probs = tf.nn.softmax(logits)

MODEL_DIR = 'model'
sess = tf.Session()
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint(MODEL_DIR))

beam_size = 3
img = imread('test.png')
if img.shape[-1] == 4:
    img = img[:, :, :-1]
h = img.shape[0]
w = img.shape[1]
if h > w:
    img = img[h // 2 - w // 2: h // 2 + w // 2, :]
else:
    img = img[:, w // 2 - h // 2: w // 2 + h // 2]
img = cv2.resize(img, (image_size, image_size))
X_data = np.expand_dims(img, 0)

contexts_, initial_memory_, initial_state_ = sess.run([contexts, initial_memory, initial_state], feed_dict={X: X_data})

result = [{
    'sentence': [], 
    'memory': initial_memory_[0], 
    'state': initial_state_[0],
    'score': 1.0,
    'alphas': []
}]
complete = []
for t in range(maxlen + 1):
    cache = []
    step = 1 if t == 0 else beam_size
    for s in range(step):
        if t == 0:
            last_word_ = np.ones([batch_size], np.int32) * word2id['<start>']
        else:
            last_word_ = np.array([result[s]['sentence'][-1]], np.int32)

        last_memory_ = np.array([result[s]['memory']], np.float32)
        last_state_ = np.array([result[s]['state']], np.float32)

        current_memory_, current_state_, probs_, alpha_ = sess.run(
            [current_memory, current_state, probs, alpha], feed_dict={
                contexts_phr: contexts_, 
                last_memory: last_memory_,
                last_state: last_state_,
                last_word: last_word_
                })
        
        word_and_probs = [[w, p] for w, p in enumerate(probs_[0])]
        word_and_probs.sort(key=lambda x:-x[1])
        word_and_probs = word_and_probs[:beam_size + 1]

        for w, p in word_and_probs:
            item = {
                'sentence': result[s]['sentence'] + [w], 
                'memory': current_memory_[0], 
                'state': current_state_[0],
                'score': result[s]['score'] * p,
                'alphas': result[s]['alphas'] + [alpha_[0]]
            }
            if id2word[w] == '<end>':
                complete.append(item)
            else:
                cache.append(item)

    cache.sort(key=lambda x:-x['score'])
    cache = cache[:beam_size]
    result = cache.copy()

if len(complete) == 0:
    final_sentence = result[0]['sentence']
    alphas = result[0]['alphas']
else:
    final_sentence = complete[0]['sentence']
    alphas = complete[0]['alphas']

sentence = translate(final_sentence)
print(sentence)
sentence = sentence.split(' ')

img = (img - img.min()) / (img.max() - img.min())
n = int(np.ceil(np.sqrt(len(sentence))))
plt.figure(figsize=(10, 8))
for i in range(len(sentence)):
    word = sentence[i]
    a = np.reshape(alphas[i], (14, 14))
    a = cv2.resize(a, (image_size, image_size))
    a = np.expand_dims(a, -1)
    a = (a - a.min()) / (a.max() - a.min())
    combine = 0.5 * img +  0.5 * a
    plt.subplot(n, n, i + 1)
    plt.text(0, 1, word, color='black', backgroundcolor='white', fontsize=12)
    plt.imshow(combine)
    plt.axis('off')
plt.show()

標題生成結果如下,非常準確地涵蓋了新娘、新郎、擺pose、拍照等關鍵詞,並且注意力視覺化也很好地反映了生成每個詞時模型對圖片不同區域的關注度

影象標題生成結果

參考

視訊講解課程