pytorch版NIC分析
模型架構CNN+RNN,就不做過多的介紹。由於pytorch中呼叫inception不變,將googlenet替換成了resnet50,rnn部分仍然使用lstm不變。
1、構建詞彙庫
把訓練集合的captions依次拆開,統計詞彙頻率,只保留頻率最高的的15000-4個單詞,手動再補充四個:
<pad> <start> <end> <unk> 這樣詞彙庫裡就有15000個單詞。
詞彙庫包含了三部分:
兩個列表 word2idx idx2word 表示索引和單詞的一一對應 以及詞彙庫的大小。
def build_vocab(json, threshold=4, max_words=15000): """Build a simple vocabulary wrapper.""" coco = COCO(json) counter = Counter()#用於統計詞彙頻率 ids = coco.anns.keys() #captions的索引 for i, id in enumerate(ids): caption = str(coco.anns[id]['caption']) #對於每一個caption tokens = nltk.tokenize.word_tokenize(caption.lower())#把每一個caption 拆開 counter.update(tokens) #統計詞彙頻率 if i % 1000 == 0: print("[%d/%d] Tokenized the captions." %(i, len(ids))) # 4 special tokens words = counter.most_common(max_words-4) #找出頻率最高的15000-4個詞(另外4個自己補) # If the word frequency is less than 'threshold', then the word is discarded. words = [word for word, cnt in words if cnt >= threshold] #只保留出現次數在4次以上的單詞 # Creates a vocab wrapper and add some special tokens. vocab = Vocabulary() vocab.add_word('<pad>') vocab.add_word(vocab.start_token()) vocab.add_word(vocab.end_token()) vocab.add_word('<unk>') #加四個特殊的單詞 # Adds the words to the vocabulary. for i, word in enumerate(words): vocab.add_word(word) #word2idx idx2word 兩個列表 idx=9953+4 print('Total number of words in vocab:', len(words)) #最後的詞彙庫大小是 return vocab
2、首先是相關引數的設定:
訓練的batch_size大小,學習率,存檔位置等等
batch_size = args.batch_size num_workers = 8 #執行緒數目 ngpu = 1 initial_step = initial_epoch = 0 embed_size = args.embed_size num_hiddens = args.num_hidden learning_rate = 1e-3 num_epochs = 3 log_step = args.log_step save_step = 500 checkpoint_dir = args.checkpoint_dir #訓練時候引數的設定
3、制定影象的轉換形式,建立訓練集和測試集資料載入器
# Image Preprocessing transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), ]) #對資料集做的預處理 reize 隨機翻轉 轉tensor 歸一化 # load COCOs dataset train_IMAGES_PATH = base_dir+'dataset/COCO2014/coco/train2014' train_CAPTION_FILE_PATH =base_dir+ 'dataset/COCO2014/coco/annotations/captions_train2014.json' vocab = load_vocab() #建立詞彙庫 train_loader = get_coco_data_loader(path=train_IMAGES_PATH, json=train_CAPTION_FILE_PATH, vocab=vocab, transform=transform, batch_size=batch_size, shuffle=True, num_workers=num_workers) #訓練集資料載入器 val_IMAGES_PATH = base_dir+'dataset/COCO2014/coco/val2014' val_CAPTION_FILE_PATH = base_dir+'dataset/COCO2014/coco/annotations/captions_val2014.json' val_loader = get_coco_data_loader(path=val_IMAGES_PATH, json=val_CAPTION_FILE_PATH, vocab=vocab, transform=transform, batch_size=batch_size, shuffle=True, num_workers=num_workers)
其中,資料載入器的具體構建形式如下:
def get_coco_data_loader(path, json, vocab, transform=None,
batch_size=32, shuffle=True, num_workers=2): #訓練集資料載入器
"""Returns custom COCO Dataloader"""
coco = CocoDataset(path=path,
json=json,
vocab=vocab,
transform=transform)
data_loader = torch.utils.data.DataLoader(dataset=coco,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,#num_workers 多程序>=0
collate_fn=collate_fn)#構建資料載入器
#collate_fn是輸出啥 輸出 batch_size的images, padded_captions和caption_lengths(是個list)
return data_loader
一般collate_fn預設就好,這裡由於要用到caption,自己寫的。
4、準備好資料以及資料讀入的方式和形式以後,建立模型結構
encoder = CNN(embed_size)
decoder = RNN(embed_size, num_hiddens, len(vocab), 1, rec_unit=args.rec_unit)
其中 CNN:
使用resnet50的結構,最後返回512維度的特徵。
class CNN(nn.Module):
"""Class to build new model including all but last layers"""
def __init__(self, output_dim=1000):
super(CNN, self).__init__()#super用於呼叫父類,super(CNN,self).__init__()用於呼叫父類的初始化函式
# TODO: change with resnet152?
pretrained_model = models.resnet50(pretrained=True)
self.resnet = Sequential(*list(pretrained_model.children())[:-1])#去掉最後一層
self.linear = nn.Linear(pretrained_model.fc.in_features, output_dim) #修改最後一層
self.batchnorm = nn.BatchNorm1d(output_dim, momentum=0.01) #對編碼器的輸出做batch_normal
self.init_weights()
def init_weights(self):
# weight init, inspired by tutorial
self.linear.weight.data.normal_(0,0.02)
self.linear.bias.data.fill_(0) #只對最後一個全連線層做初始化 前面的部分凍結
def forward(self, x):
x = self.resnet(x)
x = Variable(x.data) #[6, 512, 1, 1]
x = x.view(x.size(0), -1) # flatten 接全連線層 ([6, 512])
x = self.linear(x)
return x #返回最後編碼的影象特徵 (6,512)
RNN:
建立RNN的結構,
首先建立詞嵌入矩陣,self.embeddings是隨機初始化的。
原論文中,RNN階段,首先把提取的影象特徵作為輸入,然後預測得到第一個詞(貪心演算法),然後作為輸入預測下一個詞。
但是程式碼中的處理方式並不是這樣,所以此處存疑。
程式碼中的方式是建立一個單層的lstm,每個batch的caption中的所有單詞提取出來的embedding和影象特徵結合作為輸入,得到隱藏層的輸出,再接一個全連線層,得到這個batch的captions中所有單詞的輸出 比如有6個句子,一共81個單詞,詞彙庫大小為9957,則輸出為(81,9957)。這個操作並沒看懂
class RNN(torch.nn.Module):
"""
Recurrent Neural Network for Text Generation.
To be used as part of an Encoder-Decoder network for Image Captioning.
"""
__rec_units = {
'elman': nn.RNN, 'gru': nn.GRU, 'lstm': nn.LSTM }
def __init__(self, emb_size, hidden_size, vocab_size, num_layers=1, rec_unit='gru'):
"""
Initializer
:param embed_size: size of word embeddings
:param hidden_size: size of hidden state of the recurrent unit
:param vocab_size: size of the vocabulary (output of the network)
:param num_layers: number of recurrent layers (default=1)
:param rec_unit: type of recurrent unit (default=gru)
"""
rec_unit = rec_unit.lower()
assert rec_unit in RNN.__rec_units, 'Specified recurrent unit is not available'
super(RNN, self).__init__()
self.embeddings = nn.Embedding(vocab_size, emb_size) #(vocab_size, emb_size) 得到詞嵌入矩陣,開始是隨機初始化的
self.unit = RNN.__rec_units[rec_unit](emb_size, hidden_size, num_layers,
batch_first=True)#建立LSTM的結構
self.linear = nn.Linear(hidden_size, vocab_size) #由隱藏層去預測單詞的概率
def forward(self, features, captions, lengths):
"""
Forward pass through the network
:param features: features from CNN feature extractor
:param captions: encoded and padded (target) image captions
:param lengths: actual lengths of image captions
:returns: predicted distributions over the vocabulary
"""
# embed tokens in vector space
embeddings = self.embeddings(captions) #輸入:[6,15],用詞彙庫下標標記單詞 輸出([6, 15, 512])
#得到的embeddings是在 self.embeddings提取出來的每個單詞的embedding
# append image as first input
inputs = torch.cat((features.unsqueeze(1), embeddings), 1)#[6,16,512] 影象特徵和單詞結合作為最初的輸入
# pack data (prepare it for pytorch model)
inputs_packed = pack_padded_sequence(inputs, lengths, batch_first=True)
# run data through recurrent network
hiddens, _ = self.unit(inputs_packed)#經過lstm
outputs = self.linear(hiddens[0]) #預測的所有單詞的可能性(81,9957)
return outputs
5、訓練
得到的RNN輸出與targrts計算交叉熵損失函式、targets是batch中所有單詞的下標。
images = utils.to_var(images, volatile=True)#把img的tensor轉化為variable [6, 3, 224, 224]
captions = utils.to_var(captions) #把captions的tensor轉化 [6,81] 6個句子一共81個單詞
targets = pack_padded_sequence(captions, lengths, batch_first=True)[0]
#把不同長度的captions打包成一維向量
# Forward, Backward and Optimize
decoder.zero_grad()
encoder.zero_grad() #每計算一個batch之前,把梯度清零
if ngpu > 1:
# run on multiple GPU
features = nn.parallel.data_parallel(encoder, images, range(ngpu))
outputs = nn.parallel.data_parallel(decoder, features, range(ngpu))
else:
# run on single GPU
features = encoder(images) #(6,512)
outputs = decoder(features, captions, lengths) #(81,9957)對每個單詞的概率預測
train_loss = criterion(outputs, targets) #計算交叉熵損失
整個的思路:
提取影象特徵,影象特徵和對應caption的embeddings結合起來作為輸入,輸出caption中單詞的下標。
感覺很奇怪,正在訓練,看看訓練出來效果怎麼樣。
6、測試
與論文思路一致,最初使用影象的特徵作為輸入,預測下一個時刻的單詞,這裡使用的是貪心演算法,即選取概率最大的一個。最後把所有句子預測的結果整合在一起。
def sample(self, features, max_len=25):
"""
Sample from Recurrent network using greedy decoding
:param features: features from CNN feature extractor
:returns: predicted image captions
"""
output_ids = []
states = None
inputs = features.unsqueeze(1)
for i in range(max_len):
# pass data through recurrent network
hiddens, states = self.unit(inputs, states) #剛開始用影象特徵作為inputs 輸入:[6,512] 得到隱藏態[6,512]
outputs = self.linear(hiddens.squeeze(1)) #得到輸出
# find maximal predictions
predicted = outputs.max(1)[1] #預測下一個時刻的單詞
# append results from given step to global results
output_ids.append(predicted) #新增預測的單詞id
# prepare chosen words for next decoding step
inputs = self.embeddings(predicted) #更新預測出的單詞embedding作為新的inputs
inputs = inputs.unsqueeze(1)
output_ids = torch.stack(output_ids, 1) #把每個句子預測的結果整合在一起
return output_ids.squeeze()
最後把單詞下標換回單詞本身,得到預測的句子。
def convert_back_to_text(idx_arr, vocab):
from itertools import takewhile
blacklist = [vocab.word2idx[word] for word in [vocab.start_token()]]
predicate = lambda word_id: vocab.idx2word[word_id] != vocab.end_token()
sampled_caption = [vocab.idx2word[word_id] for word_id in takewhile(predicate, idx_arr) if word_id not in blacklist]
sentence = ' '.join(sampled_caption)
return sentence