【NLP理論到實戰】09 文字情感分類
技術標籤:NLP理論到實戰文字情感分類word embedding
文字情感分類
目標
- 知道文字處理的基本方法
- 能夠使用資料實現情感分類的
1. 案例介紹
為了對前面的word embedding這種常用的文字向量化的方法進行鞏固,這裡我們會完成一個文字情感分類的案例
現在我們有一個經典的資料集IMDB
資料集,地址:http://ai.stanford.edu/~amaas/data/sentiment/
,這是一份包含了5萬條流行電影的評論資料,其中訓練集25000條,測試集25000條。資料格式如下:
下圖左邊為名稱,其中名稱包含兩部分,分別是序號和情感評分,(1-4為neg,5-10為pos),右邊為評論內容
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-h4E2W0ke-1607401349065)(…/images/1.3/樣本名稱.png)]
根據上述的樣本,需要使用pytorch完成模型,實現對評論情感進行預測
2. 思路分析
首先可以把上述問題定義為分類問題,情感評分分為1-10,10個類別(也可以理解為迴歸問題,這裡當做分類問題考慮)。那麼根據之前的經驗,我們的大致流程如下:
- 準備資料集
- 構建模型
- 模型訓練
- 模型評估
知道思路之後,那麼我們一步步來完成上述步驟
3. 準備資料集
準備資料集和之前的方法一樣,例項化dataset,準備dataloader,最終我們的資料可以處理成如下格式:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-toPVr6vu-1607401349068)(…/images/1.3/情感分類-data載入1.png)]
其中有兩點需要注意:
- 如何完成基礎打Dataset的構建和Dataloader的準備
- 每個batch中文字的長度不一致的問題如何解決
- 每個batch中的文字如何轉化為數字序列
3.1 基礎Dataset的準備
import torch
from torch.utils.data import DataLoader,Dataset
import os
import re
data_base_path = r"data\aclImdb"
#1. 定義tokenize的方法
def tokenize(text):
# fileters = '!"#$%&()*+,-./:;<=>[email protected][\\]^_`{|}~\t\n'
fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
text = re.sub("<.*?>"," ",text,flags=re.S)
text = re.sub("|".join(fileters)," ",text,flags=re.S)
return [i.strip() for i in text.split()]
#2. 準備dataset
class ImdbDataset(Dataset):
def __init__(self,mode):
super(ImdbDataset,self).__init__()
if mode=="train":
text_path = [os.path.join(data_base_path,i) for i in ["train/neg","train/pos"]]
else:
text_path = [os.path.join(data_base_path,i) for i in ["test/neg","test/pos"]]
self.total_file_path_list = []
for i in text_path:
self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])
def __getitem__(self, idx):
cur_path = self.total_file_path_list[idx]
cur_filename = os.path.basename(cur_path)
label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #處理標題,獲取label,轉化為從[0-9]
text = tokenize(open(cur_path).read().strip()) #直接按照空格進行分詞
return label,text
def __len__(self):
return len(self.total_file_path_list)
# 2. 例項化,準備dataloader
dataset = ImdbDataset(mode="train")
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)
#3. 觀察資料輸出結果
for idx,(label,text) in enumerate(dataloader):
print("idx:",idx)
print("table:",label)
print("text:",text)
break
輸出如下:
idx: 0
table: tensor([3, 1])
text: [('I', 'Want'), ('thought', 'a'), ('this', 'great'), ('was', 'recipe'), ('a', 'for'), ('great', 'failure'), ('idea', 'Take'), ('but', 'a'), ('boy', 's'), ('was', 'y'), ('it', 'plot'), ('poorly', 'add'), ('executed', 'in'), ('We', 'some'), ('do', 'weak'), ('get', 'completely'), ('a', 'undeveloped'), ('broad', 'characters'), ('sense', 'and'), ('of', 'than'), ('how', 'throw'), ('complex', 'in'), ('and', 'the'), ('challenging', 'worst'), ('the', 'special'), ('backstage', 'effects'), ('operations', 'a'), ('of', 'horror'), ('a', 'movie'), ('show', 'has'), ('are', 'known'), ('but', 'Let'), ('virtually', 'stew'), ('no', 'for'), ...('show', 'somehow'), ('rather', 'destroy'), ('than', 'every'), ('anything', 'copy'), ('worth', 'of'), ('watching', 'this'), ('for', 'film'), ('its', 'so'), ('own', 'it'), ('merit', 'will')]
明顯,其中的text內容出現對應,和想象的不太相似,出現問題的原因在於Dataloader
中的引數collate_fn
collate_fn
的預設值為torch自定義的default_collate
,collate_fn
的作用就是對每個batch進行處理,而預設的default_collate
處理出錯。
解決問題的思路:
手段1:考慮先把資料轉化為數字序列,觀察其結果是否符合要求,之前使用DataLoader並未出現類似錯誤
手段2:考慮自定義一個collate_fn
,觀察結果
這裡使用方式2,自定義一個collate_fn
,然後觀察結果:
def collate_fn(batch):
#batch是list,其中是一個一個元組,每個元組是dataset中__getitem__的結果
batch = list(zip(*batch))
labes = torch.tensor(batch[0],dtype=torch.int32)
texts = batch[1]
del batch
return labes,texts
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True,collate_fn=collate_fn)
#此時輸出正常
for idx,(label,text) in enumerate(dataloader):
print("idx:",idx)
print("table:",label)
print("text:",text)
break
3.2 文字序列化
再介紹word embedding的時候,我們說過,不會直接把文字轉化為向量,而是先轉化為數字,再把數字轉化為向量,那麼這個過程該如何實現呢?
這裡我們可以考慮把文字中的每個詞語和其對應的數字,使用字典儲存,同時實現方法把句子通過字典對映為包含數字的列表。
實現文字序列化之前,考慮以下幾點:
- 如何使用字典把詞語和數字進行對應
- 不同的詞語出現的次數不盡相同,是否需要對高頻或者低頻詞語進行過濾,以及總的詞語數量是否需要進行限制
- 得到詞典之後,如何把句子轉化為數字序列,如何把數字序列轉化為句子
- 不同句子長度不相同,每個batch的句子如何構造成相同的長度(可以對短句子進行填充,填充特殊字元)
- 對於新出現的詞語在詞典中沒有出現怎麼辦(可以使用特殊字元代理)
思路分析:
- 對所有句子進行分詞
- 詞語存入字典,根據次數對詞語進行過濾,並統計次數
- 實現文字轉數字序列的方法
- 實現數字序列轉文字方法
import numpy as np
class Word2Sequence():
UNK_TAG = "UNK"
PAD_TAG = "PAD"
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG :self.UNK,
self.PAD_TAG :self.PAD
}
self.fited = False
def to_index(self,word):
"""word -> index"""
assert self.fited == True,"必須先進行fit操作"
return self.dict.get(word,self.UNK)
def to_word(self,index):
"""index -> word"""
assert self.fited , "必須先進行fit操作"
if index in self.inversed_dict:
return self.inversed_dict[index]
return self.UNK_TAG
def __len__(self):
return self(self.dict)
def fit(self, sentences, min_count=1, max_count=None, max_feature=None):
"""
:param sentences:[[word1,word2,word3],[word1,word3,wordn..],...]
:param min_count: 最小出現的次數
:param max_count: 最大出現的次數
:param max_feature: 總詞語的最大數量
:return:
"""
count = {}
for sentence in sentences:
for a in sentence:
if a not in count:
count[a] = 0
count[a] += 1
# 比最小的數量大和比最大的數量小的需要
if min_count is not None:
count = {k: v for k, v in count.items() if v >= min_count}
if max_count is not None:
count = {k: v for k, v in count.items() if v <= max_count}
# 限制最大的數量
if isinstance(max_feature, int):
count = sorted(list(count.items()), key=lambda x: x[1])
if max_feature is not None and len(count) > max_feature:
count = count[-int(max_feature):]
for w, _ in count:
self.dict[w] = len(self.dict)
else:
for w in sorted(count.keys()):
self.dict[w] = len(self.dict)
self.fited = True
# 準備一個index->word的字典
self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence,max_len=None):
"""
實現吧句子轉化為陣列(向量)
:param sentence:
:param max_len:
:return:
"""
assert self.fited, "必須先進行fit操作"
if max_len is not None:
r = [self.PAD]*max_len
else:
r = [self.PAD]*len(sentence)
if max_len is not None and len(sentence)>max_len:
sentence=sentence[:max_len]
for index,word in enumerate(sentence):
r[index] = self.to_index(word)
return np.array(r,dtype=np.int64)
def inverse_transform(self,indices):
"""
實現從陣列 轉化為文字
:param indices: [1,2,3....]
:return:[word1,word2.....]
"""
sentence = []
for i in indices:
word = self.to_word(i)
sentence.append(word)
return sentence
if __name__ == '__main__':
w2s = Word2Sequence()
w2s.fit([
["你", "好", "麼"],
["你", "好", "哦"]])
print(w2s.dict)
print(w2s.fited)
print(w2s.transform(["你","好","嘛"]))
print(w2s.transform(["你好嘛"],max_len=10))
完成了wordsequence
之後,接下來就是儲存現有樣本中的資料字典,方便後續的使用。
實現對IMDB資料的處理和儲存
#1. 對IMDB的資料記性fit操作
def fit_save_word_sequence():
from wordSequence import Word2Sequence
ws = Word2Sequence()
train_path = [os.path.join(data_base_path,i) for i in ["train/neg","train/pos"]]
total_file_path_list = []
for i in train_path:
total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
for cur_path in tqdm(total_file_path_list,ascii=True,desc="fitting"):
ws.fit(tokenize(open(cur_path).read().strip()))
ws.build_vocab()
# 對wordSequesnce進行儲存
pickle.dump(ws,open("./model/ws.pkl","wb"))
#2. 在dataset中使用wordsequence
ws = pickle.load(open("./model/ws.pkl","rb"))
def collate_fn(batch):
MAX_LEN = 500
#MAX_LEN = max([len(i) for i in texts]) #取當前batch的最大值作為batch的最大長度
batch = list(zip(*batch))
labes = torch.tensor(batch[0],dtype=torch.int)
texts = batch[1]
#獲取每個文字的長度
lengths = [len(i) if len(i)<MAX_LEN else MAX_LEN for i in texts]
texts = torch.tensor([ws.transform(i, MAX_LEN) for i in texts])
del batch
return labes,texts,lengths
#3. 獲取輸出
dataset = ImdbDataset(ws,mode="train")
dataloader = DataLoader(dataset=dataset,batch_size=20,shuffle=True,collate_fn=collate_fn)
for idx,(label,text,length) in enumerate(dataloader):
print("idx:",idx)
print("table:",label)
print("text:",text)
print("length:",length)
break
輸出如下
idx: 0
table: tensor([ 7, 4, 3, 8, 1, 10, 7, 10, 7, 2, 1, 8, 1, 2, 2, 4, 7, 10,
1, 4], dtype=torch.int32)
text: tensor([[ 50983, 77480, 82366, ..., 1, 1, 1],
[ 54702, 57262, 102035, ..., 80474, 56457, 63180],
[ 26991, 57693, 88450, ..., 1, 1, 1],
...,
[ 51138, 73263, 80428, ..., 1, 1, 1],
[ 7022, 78114, 83498, ..., 1, 1, 1],
[ 5353, 101803, 99148, ..., 1, 1, 1]])
length: [296, 500, 221, 132, 74, 407, 500, 130, 54, 217, 80, 322, 72, 156, 94, 270, 317, 117, 200, 379]
思考:前面我們自定義了MAX_LEN作為句子的最大長度,如果我們需要把每個batch中的最長的句子長度作為當前batch的最大長度,該如何實現?
4. 構建模型
這裡我們只練習使用word embedding,所以模型只有一層,即:
- 資料經過word embedding
- 資料通過全連線層返回結果,計算
log_softmax
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from build_dataset import get_dataloader,ws,MAX_LEN
class IMDBModel(nn.Module):
def __init__(self,max_len):
super(IMDBModel,self).__init__()
self.embedding = nn.Embedding(len(ws),300,padding_idx=ws.PAD) #[N,300]
self.fc = nn.Linear(max_len*300,10) #[max_len*300,10]
def forward(self, x):
embed = self.embedding(x) #[batch_size,max_len,300]
embed = embed.view(x.size(0),-1)
out = self.fc(embed)
return F.log_softmax(out,dim=-1)
5. 模型的訓練和評估
訓練流程和之前相同
- 例項化模型,損失函式,優化器
- 遍歷dataset_loader,梯度置為0,進行向前計算
- 計算損失,反向傳播優化損失,更新引數
train_batch_size = 128
test_batch_size = 1000
imdb_model = IMDBModel(MAX_LEN)
optimizer = optim.Adam(imdb_model.parameters())
criterion = nn.CrossEntropyLoss()
def train(epoch):
mode = True
imdb_model.train(mode)
train_dataloader =get_dataloader(mode,train_batch_size)
for idx,(target,input,input_lenght) in enumerate(train_dataloader):
optimizer.zero_grad()
output = imdb_model(input)
loss = F.nll_loss(output,target) #traget需要是[0,9],不能是[1-10]
loss.backward()
optimizer.step()
if idx %10 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, idx * len(input), len(train_dataloader.dataset),
100. * idx / len(train_dataloader), loss.item()))
torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
def test():
test_loss = 0
correct = 0
mode = False
imdb_model.eval()
test_dataloader = get_dataloader(mode, test_batch_size)
with torch.no_grad():
for target, input, input_lenght in test_dataloader:
output = imdb_model(input)
test_loss += F.nll_loss(output, target,reduction="sum")
pred = torch.max(output,dim=-1,keepdim=False)[-1]
correct = pred.eq(target.data).sum()
test_loss = test_loss/len(test_dataloader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
test_loss, correct, len(test_dataloader.dataset),
100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
test()
for i in range(3):
train(i)
test()
這裡我們僅僅使用了一層全連線層,其分類效果不會很好,這裡重點是理解常見的模型流程和word embedding的使用方法