(四)使用CNN實現文字情感分析(Pytorch)
文章目錄
在之前的筆記中,我們設法使用RNNs和 Bag of Tricks for Efficient Text Classification中的FastText模型實現了約85%的測試準確率。在這篇筆記中,我們將使用卷積神經網路(CNN)進行情感分析,實現 Convolutional Neural Networks for Sentence Classification這篇論文中的模型。
注意:本教程不是為了全面介紹和解釋cnn。要獲得更好更深入的解釋,請點選
傳統上,cnn是用來分析影象的,它由一個或多個卷積層和一個或多個線性層組成。卷積層使用過濾器(也稱為核心或接受域)掃描一張影象,並生成一張經過處理的影象。這個處理過的影象版本可以被送入另一個卷積層或線性層。每個濾鏡都有一個形狀,例如,一個3x3濾鏡覆蓋影象的3畫素寬X3畫素高的區域,濾鏡的每個元素都有一個權重,3x3濾鏡有9個權重。在傳統的影象處理中,這些權值是由工程師手工指定的,然而,神經網路中的卷積層的主要優點是這些權值是通過反向傳播學習的。
學習權重背後的直覺思想是你的卷積層表現得像特徵提取器,提取影象的部分是你的CNN最重要的目標,例如,如果使用一個CNN在一個影象中檢測面部,CNN可能在影象中尋找鼻子,嘴巴和一雙眼睛的特徵。
那麼,為什麼要在文字上使用cnn呢?以同樣的方式,一個3x3濾鏡可以檢視一個影象的碎片,一個1x2濾鏡可以檢視一段文字中的2個連續的單詞,即bi-gram。在前面的教程中我們看FastText模型使用bi-grams通過顯式地將它們新增到一個文字,在CNN模型中,我們將使用多個大小不同的過濾器來觀察bi-grams (1 x2 filter),tri-grams (1 x3 filter)和n-grams(1 x n filter)內的文字。
這裡的直覺是,在評論中出現某些 bi-grams, tri-grams 和 n-grams是最終情感的良好指示。
準備資料
與之前的筆記一樣,我們將先準備資料。
與前面使用FastText模型的筆記不同,我們將不再顯式地建立bi-grams並將它們附加到句子末尾。
因為卷積層希望batch維度是第一個,所以我們可以告訴TorchText使用欄位(field)上的batch_first = True引數返回已經排列過的資料。
import torch
from torchtext import data
from torchtext import datasets
import random
import numpy as np
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
TEXT = data.Field(tokenize='spacy', tokenizer_language='en_core_web_sm', batch_first=True)
LABEL = data.LabelField(dtype = torch.float)
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(random_state = random.seed(SEED))
構建詞彙表並載入預先訓練的單詞嵌入。
MAX_VOCAB_SIZE = 25_000
TEXT.build_vocab(train_data,
max_size = MAX_VOCAB_SIZE,
vectors = "glove.6B.100d",
unk_init = torch.Tensor.normal_)
LABEL.build_vocab(train_data)
與之前一樣,我們建立迭代器。
BATCH_SIZE = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size = BATCH_SIZE,
device = device)
搭建模型
現在來搭建我們的模型。
第一個主要障礙是如何將cnn用於文字視覺化。影象通常是二維的(我們現在將忽略存在第三個“顏色”維度的事實),而文字是一維的。然而,我們知道,在我們之前的教程中(以及幾乎所有的NLP管道),第一步都是將單詞轉換為詞嵌入。這就是我們在二維空間中視覺化單詞的方法,每個單詞沿著一個軸,向量元素沿著另一個軸。觀察下面嵌入句子的二維表示:
然後我們可以使用[n x emb_dim]的過濾器。這將完全覆蓋
n
n
n順序單詞,因為它們的寬度將是emb_dim維度。觀察下面的影象,我們的單詞向量是用綠色表示的。這裡我們有4個5維嵌入的單詞,建立一個[4x5]“影象”張量。一次覆蓋兩個單詞的過濾器(例如bi-grams)將是[2x5]過濾器,用黃色表示,過濾器的每個元素都有一個權重與它相關聯。這個過濾器的輸出(用紅色顯示)將是一個單個實數,它是過濾器覆蓋的所有元素的加權和。
然後,過濾器將“下移”影象(或在句子中移動)以覆蓋下一個bi-gram,並計算另一個輸出(加權和)。
最後,過濾器再次向下移動,並計算該過濾器的最終輸出。
在我們的例子中(在一般情況下,濾鏡的寬度等於“影象”的寬度),我們的輸出將會是一個向量的元素個數等於影象的高度(或句子中單詞的個數)減去濾鏡的高度再加 1,比如在上面例子中的4 - 2 + 1 = 3。
這個例子展示瞭如何計算一個過濾器的輸出。我們的模型(以及幾乎所有的cnn)會有很多這樣的過濾器。其理念是,每個過濾器將學習不同的特徵來提取。在上面的例子中,我們希望每個[2 x emb_dim]的過濾器將尋找不同bi-grams的出現。
在我們的模型中,我們也將有不同大小的過濾器,高度為3、4和5,每一個都有100個。直覺告訴我們,我們將尋找與分析電影評論情緒相關的不同的 tri-grams, 4-grams 和 5-grams。
我們模型中的下一步是在卷積層的輸出上使用池化(特別是最大池化)。這類似於FastText模型,在該模型中,我們對每個單詞向量執行平均值(由F.avg_pool2d函式實現),但在這不是對維度進行平均值,我們是取一個維度上的最大值。下面是一個從卷積層的輸出中取最大值(0.9)的例子(沒有顯示的是應用到卷積輸出的啟用函式)。
這裡的想法是,最大值是決定評論情緒的“最重要”特徵,對應於評論中“最重要”的n-gram。我們怎麼知道“最重要的”n-gram是什麼?幸運的是,我們不需要這樣做!通過反向傳播,過濾器的權值被改變,因此每當看到某些n-grams高度象徵情緒時,過濾器的輸出是一個“高”值。如果這個“高”值是輸出中的最大值,那麼它將通過最大池化層。
因為我們的模型有100個3種不同尺寸的過濾器,這意味著我們有300個不同的模型認為重要的n-grams。我們把這些連線在一起成為一個單一的向量,並通過一個線性層來預測情感。我們可以將這一線性層的權重看作是對每一個300箇中n-grams的“加權證據”,然後做出最終決定。
實現細節
我們用nn.Conv2d來實現卷積層。in_channels引數是影象進入卷積層的“通道”的數量。在實際的影象中,這通常是3個通道(紅色、藍色和綠色通道各一個通道),然而,當使用文字時,我們只有一個通道,即文字本身。out_channels是過濾器的數量,kernel_size是過濾器的大小。每個kernel_sizes大小都是[n x emb_dim],其中 n n n是 n-grams的大小。
在PyTorch中,RNNs想要batch維度在第二位,而CNNs想要batch維度在第一位——我們不必在這裡對資料進行排列,因為我們已經在文字欄位中設定了batch_first = True。然後我們通過嵌入層傳遞句子來獲得嵌入。輸入到nn.Conv2d層的第二個維度必須是通道維數。由於文字在技術上沒有通道維度,我們將張量unsqueeze 以建立一個通道維度。這與卷積層初始化時的in_channels=1相匹配。
然後我們將張量通過卷積層和池化層,在卷積層之後使用ReLU啟用函式。池化層的另一個很好的特性是它們處理不同長度的句子。卷積層輸出的大小取決於對卷積層輸入的大小,不同批次包含不同長度的句子。如果沒有最大池化層,線性層的輸入將取決於輸入句子的大小(而不是我們想要的大小)。解決這個問題的一種方法是將所有的句子修剪成相同的長度,但是對於最大池化層,我們總是知道線性層的輸入將是過濾器的總數。注意:那麼有一個例外,如果你的句子長度比使用的最大的過濾器短,然後你將不得不填充你的句子到最大的過濾器的長度。在IMDb資料中沒有隻有5個詞長的評論,所以我們不需要擔心,但如果你使用自己的資料,你會擔心的。
最後,我們對過濾器的輸出進行串聯並執行dropout,然後將它們通過線性層來做出我們的預測。
import torch.nn as nn
import torch.nn.functional as F
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim,
dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
self.conv_0 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[0], embedding_dim))
self.conv_1 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[1], embedding_dim))
self.conv_2 = nn.Conv2d(in_channels = 1,
out_channels = n_filters,
kernel_size = (filter_sizes[2], embedding_dim))
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
#text = [batch size, sent len]
embedded = self.embedding(text)
#embedded = [batch size, sent len, emb dim]
embedded = embedded.unsqueeze(1)
#embedded = [batch size, 1, sent len, emb dim]
conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
#conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]
pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
#pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim = 1))
#cat = [batch size, n_filters * len(filter_sizes)]
return self.fc(cat)
目前CNN模型只能使用3個不同大小的過濾器,但我們實際上可以改進我們模型的程式碼,使其更通用,並採取任何數量的過濾器。
我們把所有的卷積層放到一個nn.ModuleList中,nn.ModuleList是一個用來儲存PyTorch 列表的函式。如果我們只是簡單地使用一個標準的Python列表,列表中的模組不能被列表之外的任何模組“看到”,這會導致一些錯誤。
我們現在可以傳遞一個任意大小的過濾器大小列表,列表綜合將為每個過濾器建立一個卷積層。然後,在forward方法中,我們遍歷列表,應用每個卷積層來獲得卷積輸出的列表,在將輸出連線在一起並通過dropout層和線性層之前,我們還通過列表理解中的max pooling來處理這些輸出。
class CNN1(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
super(CNN1, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, pad_idx)
self.convs = nn.ModuleList([
nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim))
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch_size, sent_len]
embedded = self.embedding(text)
# embedded = [batch_size, sent_len, emb_dim]
embedded = embedded.unsqueeze(1)
# embedded = [batch_size, 1, sent_len, emb_dim]
conved = [F.relu(conv(embedded)) for conv in self.convs]
# conved_n = [batch_size, n_filters, sent_len - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# pooled_n = [batch_size, n_filters]
cat = self.dropout(torch.cat(pooled, dim=1))
# cat = [batch_size, n_filters * len(filter_sizes)]
return self.fc(cat)
我們也可以使用1-dimensional卷積層來實現上述模型,其中嵌入維數是過濾器的“深度”,句子中標記的數量是寬度。
我們將使用2-dimensional卷積模型在本筆記中執行測試,但將下面1-dimensional模型的實現留給感興趣的人。
class CNN1d(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim,
dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
self.convs = nn.ModuleList([
nn.Conv1d(in_channels = embedding_dim,
out_channels = n_filters,
kernel_size = fs)
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
#text = [batch size, sent len]
embedded = self.embedding(text)
#embedded = [batch size, sent len, emb dim]
embedded = embedded.permute(0, 2, 1)
#embedded = [batch size, emb dim, sent len]
conved = [F.relu(conv(embedded)) for conv in self.convs]
#conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
#pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat(pooled, dim = 1))
#cat = [batch size, n_filters * len(filter_sizes)]
return self.fc(cat)
我們建立了CNN類的一個例項。
如果我們想要執行一維卷積模型,我們可以將CNN1改為CNN1d,注意這兩個模型給出的結果幾乎相同。
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model = CNN1(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
檢查模型中的引數數量,我們可以看到它與FastText模型幾乎相同。
CNN1和CNN1d模型的引數數量完全相同。
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
The model has 2,620,801 trainable parameters
接下來,我們將載入預先訓練過的嵌入。
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
tensor([[-0.1117, -0.4966, 0.1631, ..., 1.2647, -0.2753, -0.1325],
[-0.8555, -0.7208, 1.3755, ..., 0.0825, -1.1314, 0.3997],
[-0.0382, -0.2449, 0.7281, ..., -0.1459, 0.8278, 0.2706],
...,
[-0.0614, -0.0516, -0.6159, ..., -0.0354, 0.0379, -0.1809],
[ 0.1885, -0.1690, 0.1530, ..., -0.2077, 0.5473, -0.4517],
[-0.1182, -0.4701, -0.0600, ..., 0.7991, -0.0194, 0.4785]])
然後將《unk》和《pad》標記的初始權值歸零。
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
訓練模型
訓練和以前一樣。我們初始化優化器,損失函式(criterion),並將模型和criterion放在GPU上(如果可用)
import torch.optim as optim
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
model = model.to(device)
criterion = criterion.to(device)
我們實現了計算準確率的函式…
def binary_accuracy(preds, y):
"""
Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
"""
#round predictions to the closest integer
rounded_preds = torch.round(torch.sigmoid(preds))
correct = (rounded_preds == y).float() #convert into float for division
acc = correct.sum() / len(correct)
return acc
我們定義一個函式來訓練我們的模型…
注意:當我們再次使用dropout時,我們必須記住使用model.train()來確保在訓練時dropout是“開啟的”。
def train(model, iterator, optimizer, criterion):
epoch_loss = 0
epoch_acc = 0
model.train()
for batch in iterator:
optimizer.zero_grad()
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
我們定義一個函式來測試我們的模型…
注意:同樣,由於我們現在使用的是dropout,我們必須記住使用model.eval()來確保在評估時dropout被“關閉”。
def evaluate(model, iterator, criterion):
epoch_loss = 0
epoch_acc = 0
model.eval()
with torch.no_grad():
for batch in iterator:
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
讓我們定義函式來告訴我們每一個epoch需要多長時間。
import time
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
最後,我們訓練我們的模型…
N_EPOCHS = 5
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut4-model.pt')
print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')
Epoch: 01 | Epoch Time: 0m 13s
Train Loss: 0.645 | Train Acc: 62.08%
Val. Loss: 0.488 | Val. Acc: 78.64%
Epoch: 02 | Epoch Time: 0m 11s
Train Loss: 0.418 | Train Acc: 81.14%
Val. Loss: 0.361 | Val. Acc: 84.59%
Epoch: 03 | Epoch Time: 0m 11s
Train Loss: 0.300 | Train Acc: 87.33%
Val. Loss: 0.348 | Val. Acc: 85.06%
Epoch: 04 | Epoch Time: 0m 11s
Train Loss: 0.217 | Train Acc: 91.49%
Val. Loss: 0.320 | Val. Acc: 86.71%
Epoch: 05 | Epoch Time: 0m 11s
Train Loss: 0.156 | Train Acc: 94.22%
Val. Loss: 0.334 | Val. Acc: 87.06%
我們得到的測試結果可與前2個模型相媲美!
model.load_state_dict(torch.load('tut4-model.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
Test Loss: 0.339 | Test Acc: 85.39%
使用者輸入
再一次,作為健壯性檢查,我們可以檢查一些輸入的句子。
注意:正如在實現細節中提到的,輸入句子必須至少比使用的最大的過濾器高度長。我們修改predict_sentiment函式,使其也接受最小長度引數。如果標記化的輸入句子小於min_len標記,則追加填充標記(《pad》),使其成為min_len標記。
import spacy
nlp = spacy.load('en_core_web_sm')
def predict_sentiment(model, sentence, min_len = 5):
model.eval()
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
if len(tokenized) < min_len:
tokenized += ['<pad>'] * (min_len - len(tokenized))
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
tensor = torch.LongTensor(indexed).to(device)
tensor = tensor.unsqueeze(0)
prediction = torch.sigmoid(model(tensor))
return prediction.item()
一個負面評論的例子…
res = predict_sentiment(model, "This film is terrible")
print(res)
0.11022213101387024
一個正面評論的例子…
res = predict_sentiment(model, "This film is great")
print(res)
0.9785954356193542
完整程式碼
import torch
from torchtext import data
from torchtext import datasets
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np
SEED = 1234
random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
TEXT = data.Field(tokenize='spacy', tokenizer_language='en_core_web_sm', batch_first=True)
LABEL = data.LabelField(dtype=torch.float)
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(random_state = random.seed(SEED))
MAX_VOCAB_SIZE = 25_000
TEXT.build_vocab(
train_data,
max_size = MAX_VOCAB_SIZE,
vectors = 'glove.6B.100d',
unk_init = torch.Tensor.normal_
)
LABEL.build_vocab(train_data)
BATCH_SIZE = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size=BATCH_SIZE,
device=device
)
print(len(train_iterator))
class CNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_size, output_dim, dropout, pad_idx):
super(CNN, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, pad_idx)
self.conv_0 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_size[0], embedding_dim))
self.conv_1 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_size[1], embedding_dim))
self.conv_2 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_size[2], embedding_dim))
self.fc = nn.Linear(len(filter_size) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch_size, sent_len]
embedded = self.embedding(text)
# embedded = [batch_size, sent_len, emb_dim]
embedded = embedded.unsqueeze(1)
# embedded = [batch_size, 1, sent_len, emb_dim]
conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
# conved_n = [batch_size, n_filters, sent_len - filter_size[n] + 1]
pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
# pooled_n = [batch_size, n_filters]
cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim=1))
# cat = [batch_size, n_filters * len(filter_sizes)]
return self.fc(cat)
class CNN1(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
super(CNN1, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, pad_idx)
self.convs = nn.ModuleList([
nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim))
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch_size, sent_len]
embedded = self.embedding(text)
# embedded = [batch_size, sent_len, emb_dim]
embedded = embedded.unsqueeze(1)
# embedded = [batch_size, 1, sent_len, emb_dim]
conved = [F.relu(conv(embedded)) for conv in self.convs]
# conved_n = [batch_size, n_filters, sent_len - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# pooled_n = [batch_size, n_filters]
cat = self.dropout(torch.cat(pooled, dim=1))
# cat = [batch_size, n_filters * len(filter_sizes)]
return self.fc(cat)
class CNN1d(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim,
dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.convs = nn.ModuleList([
nn.Conv1d(in_channels=embedding_dim,
out_channels=n_filters,
kernel_size=fs)
for fs in filter_sizes
])
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text = [batch size, sent len]
embedded = self.embedding(text)
# embedded = [batch size, sent len, emb dim]
embedded = embedded.permute(0, 2, 1)
# embedded = [batch size, emb dim, sent len]
conved = [F.relu(conv(embedded)) for conv in self.convs]
# conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# pooled_n = [batch size, n_filters]
cat = self.dropout(torch.cat(pooled, dim=1))
# cat = [batch size, n_filters * len(filter_sizes)]
return self.fc(cat)
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model = CNN1(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
import torch.optim as optim
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
model = model.to(device)
criterion = criterion.to(device)
def binary_accuracy(preds, y):
"""
Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
"""
#round predictions to the closest integer
rounded_preds = torch.round(torch.sigmoid(preds))
correct = (rounded_preds == y).float() #convert into float for division
acc = correct.sum() / len(correct)
return acc
def train(model, iterator, optimizer, criterion):
epoch_loss = 0
epoch_acc = 0
model.train()
for batch in iterator:
optimizer.zero_grad()
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
def evaluate(model, iterator, criterion):
epoch_loss = 0
epoch_acc = 0
model.eval()
with torch.no_grad():
for batch in iterator:
predictions = model(batch.text).squeeze(1)
loss = criterion(predictions, batch.label)
acc = binary_accuracy(predictions, batch.label)
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
import time
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
N_EPOCHS = 5
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut4-model.pt')
print(f'Epoch: {epoch + 1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc * 100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc * 100:.2f}%')
model.load_state_dict(torch.load('tut4-model.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
import spacy
nlp = spacy.load('en_core_web_sm')
def predict_sentiment(model, sentence, min_len = 5):
model.eval()
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
if len(tokenized) < min_len:
tokenized += ['<pad>'] * (min_len - len(tokenized))
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
tensor = torch.LongTensor(indexed).to(device)
tensor = tensor.unsqueeze(0)
prediction = torch.sigmoid(model(tensor))
return prediction.item()
res = predict_sentiment(model, "This film is terrible")
print(res)
res = predict_sentiment(model, "This film is great")
print(res)