阿里天池 NLP 入門賽 TextCNN 方案程式碼詳細註釋和流程講解
前言
這篇文章用於記錄阿里天池 NLP 入門賽,詳細講解了整個資料處理流程,以及如何從零構建一個模型,適合新手入門。
賽題以新聞資料為賽題資料,資料集報名後可見並可下載。賽題資料為新聞文字,並按照字元級別進行匿名處理。整合劃分出14個候選分類類別:財經、彩票、房產、股票、家居、教育、科技、社會、時尚、時政、體育、星座、遊戲、娛樂的文字資料。實質上是一個 14 分類問題。
賽題資料由以下幾個部分構成:訓練集20w條樣本,測試集A包括5w條樣本,測試集B包括5w條樣本。
比賽地址:https://tianchi.aliyun.com/competition/entrance/531810/introduction
這篇文章中使用的模型主要是CNN + LSTM + Attention,主要學習的是資料處理的完整流程,以及模型構建的完整流程。雖然還沒有使用 Bert 等方案,不過如果看完了這篇文章,理解了整個流程之後,即使你想要使用其他模型來處理,也能更快實現。
1. 為什麼寫篇文章
首先,這篇文章的程式碼全部都來源於 Datawhale 提供的開原始碼,我添加了自己的筆記,幫助新手更好地理解這個程式碼。
1.1 Datawhale 提供的程式碼有哪些需要改進?
Datawhale 提供的程式碼裡包含了資料處理,以及從 0 到 1模型建立的完整流程。但是和前面提供的 basesline 的都不太一樣,它包含了非常多資料處理的細節,模型也是由 3 個部分構成,所以看起來難度陡然上升。
其次,程式碼裡的註釋非常少,也沒有講解整個資料處理和網路的整體流程。這些對於新手來說,增加了理解的門檻。
在資料競賽方面,我也是一個新人,花了一天的時間,仔細研究資料在一種每一個步驟的轉化,對於一些難以理解的程式碼,在群裡詢問之後,也得到了 Datawhale 成員的熱心解答。最終才明白了全部的程式碼。
1.2 我做了什麼改進?
所以,為了減少對於新手的閱讀難度,我添加了一些內容。
-
首先,梳理了整個流程,包括兩大部分:資料處理和模型。
因為程式碼不是從上到下順序閱讀的。因此,更容易讓人理解的做法是:先從整體上給出巨集觀的資料轉換流程圖,其中要包括資料在每一步的 shape,以及包含的轉換步驟,讓讀者心中有一個框架圖,再帶著這個框架圖去看細節,會更加了然於胸。
-
其次,除了瞭解了整體流程,在真正的程式碼細節裡,讀者可能還是會看不懂某一段小邏輯。因此,我在原有程式碼的基礎之上增添了許多註釋,以降低程式碼的理解門檻。
2. 資料處理
2.1 資料拆分為 10 份
-
資料首先會經過
all_data2fold
函式,這個函式的作用是把原始的 DataFrame 資料,轉換為一個list
,有 10 個元素,表示交叉驗證裡的 10 份,每個元素是dict
,每個dict
包括label
和text
。首先根據
label
來劃分資料行所在index
, 生成label2id
。label2id
是一個dict
,key
為label
,value
是一個list
,儲存的是該類對應的index
。
然後根據label2id
,把每一類別的資料,劃分到 10 份資料中。
2. 最後,把前 9 份資料作為訓練集`train_data`,最後一份資料作為驗證集`dev_data`,並讀取測試集`test_data`。
2.2 定義並建立 Vacab
Vocab 的作用是:
- 建立 詞 和
index
對應的字典,這裡包括 2 份字典,分別是:_id2word
和_id2extword
。 - 其中
_id2word
是從新聞得到的, 把詞頻小於 5 的詞替換為了UNK
。對應到模型輸入的batch_inputs1
。 _id2extword
是從word2vec.txt
中得到的,有 5976 個詞。對應到模型輸入的batch_inputs2
。- 後面會有兩個
embedding
層,其中_id2word
對應的embedding
是可學習的,_id2extword
對應的embedding
是從檔案中載入的,是固定的。 - 建立 label 和 index 對應的字典。
- 上面這些字典,都是基於
train_data
建立的。
3. 模型
3.1 把文章分割為句子
-
上上一步得到的 3 個數據,都是一個
list
,list
裡的每個元素是 dict,每個 dict 包括label
和text
。這 3 個數據會經過get_examples
函式。get_examples
函式裡,會呼叫sentence_split
函式,把每一篇文章分割成為句子。然後,根據
vocab
,把 word 轉換為對應的索引,這裡使用了 2 個字典,轉換為 2 份索引,分別是:word_ids
和extword_ids
。最後返回的資料是一個 list,每個元素是一個 tuple:(label, 句子數量,doc)
。其中doc
又是一個 list,每個 元素是一個 tuple:(句子長度,word_ids, extword_ids)
。
-
在迭代訓練時,呼叫
data_iter
函式,生成每一批的batch_data
。在data_iter
函式裡,會呼叫batch_slice
函式生成每一個batch
。拿到batch_data
後,每個資料的格式仍然是上圖中所示的格式,下面,呼叫batch2tensor
函式。
3.2 生成訓練資料
batch2tensor
函式最後返回的資料是:(batch_inputs1, batch_inputs2, batch_masks), batch_labels
。形狀都是(batch_size, doc_len, sent_len)
。doc_len
表示每篇新聞有幾乎話,sent_len
表示每句話有多少個單詞。
batch_masks
在有單詞的位置,值為1,其他地方為 0,用於後面計算 Attention,把那些沒有單詞的位置的 attention 改為 0。
batch_inputs1, batch_inputs2, batch_masks
,形狀是(batch_size, doc_len, sent_len)
,轉換為(batch_size * doc_len, sent_len)
。
3.3 網路部分
下面,終於來到網路部分。模型結構圖如下:
### 3.3.1 WordCNNEncoder
WordCNNEncoder 網路結構示意圖如下:
#### 1. Embedding
batch_inputs1, batch_inputs2
都輸入到WordCNNEncoder
。WordCNNEncoder
包括兩個embedding
層,分別對應batch_inputs1
,embedding 層是可學習的,得到word_embed
;batch_inputs2
,讀取的是外部訓練好的詞向,因此是不可學習的,得到extword_embed
。所以會分別得到兩個詞向量,將 2 個詞向量相加,得到最終的詞向量batch_embed
,形狀是(batch_size * doc_len, sent_len, 100)
,然後新增一個維度,變為(batch_size * doc_len, 1, sent_len, 100)
,對應 Pytorch 裡影象的(B, C, H, W)
。
2. CNN
然後,分別定義 3 個卷積核,output channel 都是 100 維。
第一個卷積核大小為[2,100]
,得到的輸出是(batch_size * doc_len, 100, sent_len-2+1, 1)
,定義一個池化層大小為[sent_len-2+1, 1]
,最終得到輸出經過squeeze()
的形狀是(batch_size * doc_len, 100)
。
同理,第 2 個卷積核大小為[3,100]
,第 3 個卷積核大小為[4,100]
。卷積+池化得到的輸出形狀也是(batch_size * doc_len, 100)
。
最後,將這 3 個向量在第 2 個維度上做拼接,得到輸出的形狀是(batch_size * doc_len, 300)
。
3.3.2 shape 轉換
把上一步得到的資料的形狀,轉換為(batch_size , doc_len, 300)
名字是sent_reps
。然後,對mask
進行處理。
batch_masks
的形狀是(batch_size , doc_len, 300)
,表示單詞的 mask,經過sent_masks = batch_masks.bool().any(2).float()
得到句子的 mask。含義是:在最後一個維度,判斷是否有單詞,只要有 1 個單詞,那麼整句話的 mask 就是 1,sent_masks
的維度是:(batch_size , doc_len)
。
3.3.3 SentEncoder
SentEncoder 網路結構示意圖如下:
`SentEncoder`包含了 2 層的雙向 LSTM,輸入資料`sent_reps`的形狀是`(batch_size , doc_len, 300)`,LSTM 的 hidden_size 為 256,由於是雙向的,經過 LSTM 後的資料維度是`(batch_size , doc_len, 512)`,然後和 mask 按位置相乘,把沒有單詞的句子的位置改為 0,最後輸出的資料`sent_hiddens`,維度依然是`(batch_size , doc_len, 512)`。
3.3.4 Attention
接著,經過Attention
。Attention
的輸入是sent_hiddens
和sent_masks
。在Attention
裡,sent_hiddens
首先經過線性變化得到key
,維度不變,依然是(batch_size , doc_len, 512)
。
然後key
和query
相乘,得到outputs
。query
的維度是512
,因此output
的維度是(batch_size , doc_len)
,這個就是我們需要的attention
,表示分配到每個句子的權重。下一步需要對這個attetion
做softmax
,並使用sent_masks
,把沒有單詞的句子的權重置為-1e32
,得到masked_attn_scores
。
最後把masked_attn_scores
和key
相乘,得到batch_outputs
,形狀是(batch_size, 512)
。
3.3.5 FC
最後經過FC
層,得到分類概率的向量。
4. 完整程式碼+註釋
4.1 資料處理
匯入包
import random
import numpy as np
import torch
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(levelname)s: %(message)s')
# set seed
seed = 666
random.seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed(seed)
torch.manual_seed(seed)
# set cuda
gpu = 0
use_cuda = gpu >= 0 and torch.cuda.is_available()
if use_cuda:
torch.cuda.set_device(gpu)
device = torch.device("cuda", gpu)
else:
device = torch.device("cpu")
logging.info("Use cuda: %s, gpu id: %d.", use_cuda, gpu)
2020-08-13 17:12:16,510 INFO: Use cuda: False, gpu id: 0.
4.1.1 把資料分成 10 份
# split data to 10 fold
fold_num = 10
data_file = 'train_set.csv'
import pandas as pd
def all_data2fold(fold_num, num=10000):
fold_data = []
f = pd.read_csv(data_file, sep='\t', encoding='UTF-8')
texts = f['text'].tolist()[:num]
labels = f['label'].tolist()[:num]
total = len(labels)
index = list(range(total))
# 打亂資料
np.random.shuffle(index)
# all_texts 和 all_labels 都是 shuffle 之後的資料
all_texts = []
all_labels = []
for i in index:
all_texts.append(texts[i])
all_labels.append(labels[i])
# 構造一個 dict,key 為 label,value 是一個 list,儲存的是該類對應的 index
label2id = {}
for i in range(total):
label = str(all_labels[i])
if label not in label2id:
label2id[label] = [i]
else:
label2id[label].append(i)
# all_index 是一個 list,裡面包括 10 個 list,稱為 10 個 fold,儲存 10 個 fold 對應的 index
all_index = [[] for _ in range(fold_num)]
for label, data in label2id.items():
# print(label, len(data))
batch_size = int(len(data) / fold_num)
# other 表示多出來的資料,other 的資料量是小於 fold_num 的
other = len(data) - batch_size * fold_num
# 把每一類對應的 index,新增到每個 fold 裡面去
for i in range(fold_num):
# 如果 i < other,那麼將一個數據新增到這一輪 batch 的資料中
cur_batch_size = batch_size + 1 if i < other else batch_size
# print(cur_batch_size)
# batch_data 是該輪 batch 對應的索引
batch_data = [data[i * batch_size + b] for b in range(cur_batch_size)]
all_index[i].extend(batch_data)
batch_size = int(total / fold_num)
other_texts = []
other_labels = []
other_num = 0
start = 0
# 由於上面在分 batch 的過程中,每個 batch 的資料量不一樣,這裡是把資料平均到每個 batch
for fold in range(fold_num):
num = len(all_index[fold])
texts = [all_texts[i] for i in all_index[fold]]
labels = [all_labels[i] for i in all_index[fold]]
if num > batch_size: # 如果大於 batch_size 那麼就取 batch_size 大小的資料
fold_texts = texts[:batch_size]
other_texts.extend(texts[batch_size:])
fold_labels = labels[:batch_size]
other_labels.extend(labels[batch_size:])
other_num += num - batch_size
elif num < batch_size: # 如果小於 batch_size,那麼就補全到 batch_size 的大小
end = start + batch_size - num
fold_texts = texts + other_texts[start: end]
fold_labels = labels + other_labels[start: end]
start = end
else:
fold_texts = texts
fold_labels = labels
assert batch_size == len(fold_labels)
# shuffle
index = list(range(batch_size))
np.random.shuffle(index)
# 這裡是為了打亂資料
shuffle_fold_texts = []
shuffle_fold_labels = []
for i in index:
shuffle_fold_texts.append(fold_texts[i])
shuffle_fold_labels.append(fold_labels[i])
data = {'label': shuffle_fold_labels, 'text': shuffle_fold_texts}
fold_data.append(data)
logging.info("Fold lens %s", str([len(data['label']) for data in fold_data]))
return fold_data
# fold_data 是一個 list,有 10 個元素,每個元素是 dict,包括 label 和 text
fold_data = all_data2fold(10)
2020-08-13 17:12:45,012 INFO: Fold lens [1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000]
4.1.2 拆分訓練集、驗證集,讀取測試集
# build train, dev, test data
fold_id = 9
# dev
dev_data = fold_data[fold_id]
# train 取出前 9 個 fold 的資料
train_texts = []
train_labels = []
for i in range(0, fold_id):
data = fold_data[i]
train_texts.extend(data['text'])
train_labels.extend(data['label'])
train_data = {'label': train_labels, 'text': train_texts}
# test 讀取測試集資料
test_data_file = 'test_a.csv'
f = pd.read_csv(test_data_file, sep='\t', encoding='UTF-8')
texts = f['text'].tolist()
test_data = {'label': [0] * len(texts), 'text': texts}
4.1.3建立 Vocab
# build vocab
from collections import Counter
from transformers import BasicTokenizer
basic_tokenizer = BasicTokenizer()
# Vocab 的作用是:
# 1. 建立 詞 和 index 對應的字典,這裡包括 2 份字典,分別是:_id2word 和 _id2extword
# 其中 _id2word 是從新聞得到的, 把詞頻小於 5 的詞替換為了 UNK。對應到模型輸入的 batch_inputs1。
# _id2extword 是從 word2vec.txt 中得到的,有 5976 個詞。對應到模型輸入的 batch_inputs2。
# 後面會有兩個 embedding 層,其中 _id2word 對應的 embedding 是可學習的,_id2extword 對應的 embedding 是從檔案中載入的,是固定的
# 2.建立 label 和 index 對應的字典
class Vocab():
def __init__(self, train_data):
self.min_count = 5
self.pad = 0
self.unk = 1
self._id2word = ['[PAD]', '[UNK]']
self._id2extword = ['[PAD]', '[UNK]']
self._id2label = []
self.target_names = []
self.build_vocab(train_data)
reverse = lambda x: dict(zip(x, range(len(x))))
#建立詞和 index 對應的字典
self._word2id = reverse(self._id2word)
#建立 label 和 index 對應的字典
self._label2id = reverse(self._id2label)
logging.info("Build vocab: words %d, labels %d." % (self.word_size, self.label_size))
#建立詞典
def build_vocab(self, data):
self.word_counter = Counter()
#計算每個詞出現的次數
for text in data['text']:
words = text.split()
for word in words:
self.word_counter[word] += 1
# 去掉頻次小於 min_count = 5 的詞,把詞存到 _id2word
for word, count in self.word_counter.most_common():
if count >= self.min_count:
self._id2word.append(word)
label2name = {0: '科技', 1: '股票', 2: '體育', 3: '娛樂', 4: '時政', 5: '社會', 6: '教育', 7: '財經',
8: '家居', 9: '遊戲', 10: '房產', 11: '時尚', 12: '彩票', 13: '星座'}
self.label_counter = Counter(data['label'])
for label in range(len(self.label_counter)):
count = self.label_counter[label] # 取出 label 對應的次數
self._id2label.append(label)
self.target_names.append(label2name[label]) # 根據label數字取出對應的名字
def load_pretrained_embs(self, embfile):
with open(embfile, encoding='utf-8') as f:
lines = f.readlines()
items = lines[0].split()
# 第一行分別是單詞數量、詞向量維度
word_count, embedding_dim = int(items[0]), int(items[1])
index = len(self._id2extword)
embeddings = np.zeros((word_count + index, embedding_dim))
# 下面的程式碼和 word2vec.txt 的結構有關
for line in lines[1:]:
values = line.split()
self._id2extword.append(values[0]) # 首先新增第一列的單詞
vector = np.array(values[1:], dtype='float64') # 然後新增後面 100 列的詞向量
embeddings[self.unk] += vector
embeddings[index] = vector
index += 1
# unk 的詞向量是所有詞的平均
embeddings[self.unk] = embeddings[self.unk] / word_count
# 除以標準差幹嘛?
embeddings = embeddings / np.std(embeddings)
reverse = lambda x: dict(zip(x, range(len(x))))
self._extword2id = reverse(self._id2extword)
assert len(set(self._id2extword)) == len(self._id2extword)
return embeddings
# 根據單詞得到 id
def word2id(self, xs):
if isinstance(xs, list):
return [self._word2id.get(x, self.unk) for x in xs]
return self._word2id.get(xs, self.unk)
# 根據單詞得到 ext id
def extword2id(self, xs):
if isinstance(xs, list):
return [self._extword2id.get(x, self.unk) for x in xs]
return self._extword2id.get(xs, self.unk)
# 根據 label 得到 id
def label2id(self, xs):
if isinstance(xs, list):
return [self._label2id.get(x, self.unk) for x in xs]
return self._label2id.get(xs, self.unk)
@property
def word_size(self):
return len(self._id2word)
@property
def extword_size(self):
return len(self._id2extword)
@property
def label_size(self):
return len(self._id2label)
vocab = Vocab(train_data)
[1, 1, 0, 0, 2, 0, 6, 2, 1, 4]
4.2 模型
4.2.1 定義 Attention
# build module
import torch.nn as nn
import torch.nn.functional as F
class Attention(nn.Module):
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.weight = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
self.weight.data.normal_(mean=0.0, std=0.05)
self.bias = nn.Parameter(torch.Tensor(hidden_size))
b = np.zeros(hidden_size, dtype=np.float32)
self.bias.data.copy_(torch.from_numpy(b))
self.query = nn.Parameter(torch.Tensor(hidden_size))
self.query.data.normal_(mean=0.0, std=0.05)
def forward(self, batch_hidden, batch_masks):
# batch_hidden: b * doc_len * hidden_size (2 * hidden_size of lstm)
# batch_masks: b x doc_len
# linear
# key: b * doc_len * hidden
key = torch.matmul(batch_hidden, self.weight) + self.bias
# compute attention
# matmul 會進行廣播
#outputs: b * doc_len
outputs = torch.matmul(key, self.query)
# 1 - batch_masks 就是取反,把沒有單詞的句子置為 0
# masked_fill 的作用是 在 為 1 的地方替換為 value: float(-1e32)
masked_outputs = outputs.masked_fill((1 - batch_masks).bool(), float(-1e32))
#attn_scores:b * doc_len
attn_scores = F.softmax(masked_outputs, dim=1)
# 對於全零向量,-1e32的結果為 1/len, -inf為nan, 額外補0
masked_attn_scores = attn_scores.masked_fill((1 - batch_masks).bool(), 0.0)
# sum weighted sources
# masked_attn_scores.unsqueeze(1):# b * 1 * doc_len
# key:b * doc_len * hidden
# batch_outputs:b * hidden
batch_outputs = torch.bmm(masked_attn_scores.unsqueeze(1), key).squeeze(1)
return batch_outputs, attn_scores
4.2.2 定義 WordCNNEncoder
word2vec_path = '../emb/word2vec.txt'
dropout = 0.15
# 輸入是:
# 輸出是:
class WordCNNEncoder(nn.Module):
def __init__(self, vocab):
super(WordCNNEncoder, self).__init__()
self.dropout = nn.Dropout(dropout)
self.word_dims = 100 # 詞向量的長度是 100 維
# padding_idx 表示當取第 0 個詞時,向量全為 0
# 這個 Embedding 層是可學習的
self.word_embed = nn.Embedding(vocab.word_size, self.word_dims, padding_idx=0)
extword_embed = vocab.load_pretrained_embs(word2vec_path)
extword_size, word_dims = extword_embed.shape
logging.info("Load extword embed: words %d, dims %d." % (extword_size, word_dims))
# # 這個 Embedding 層是不可學習的
self.extword_embed = nn.Embedding(extword_size, word_dims, padding_idx=0)
self.extword_embed.weight.data.copy_(torch.from_numpy(extword_embed))
self.extword_embed.weight.requires_grad = False
input_size = self.word_dims
self.filter_sizes = [2, 3, 4] # n-gram window
self.out_channel = 100
# 3 個卷積層,卷積核大小分別為 [2,100], [3,100], [4,100]
self.convs = nn.ModuleList([nn.Conv2d(1, self.out_channel, (filter_size, input_size), bias=True)
for filter_size in self.filter_sizes])
def forward(self, word_ids, extword_ids):
# word_ids: sentence_num * sentence_len
# extword_ids: sentence_num * sentence_len
# batch_masks: sentence_num * sentence_len
sen_num, sent_len = word_ids.shape
# word_embed: sentence_num * sentence_len * 100
# 根據 index 取出詞向量
word_embed = self.word_embed(word_ids)
extword_embed = self.extword_embed(extword_ids)
batch_embed = word_embed + extword_embed
if self.training:
batch_embed = self.dropout(batch_embed)
# batch_embed: sentence_num x 1 x sentence_len x 100
# squeeze 是為了新增一個 channel 的維度,成為 B * C * H * W
# 方便下面做 卷積
batch_embed.unsqueeze_(1)
pooled_outputs = []
# 通過 3 個卷積核做 3 次卷積核池化
for i in range(len(self.filter_sizes)):
# 通過池化公式計算池化後的高度: o = (i-k)/s+1
# 其中 o 表示輸出的長度
# k 表示卷積核大小
# s 表示步長,這裡為 1
filter_height = sent_len - self.filter_sizes[i] + 1
# conv:sentence_num * out_channel * filter_height * 1
conv = self.convs[i](batch_embed)
hidden = F.relu(conv)
# 定義池化層
mp = nn.MaxPool2d((filter_height, 1)) # (filter_height, filter_width)
# pooled:sentence_num * out_channel * 1 * 1 -> sen_num * out_channel
# 也可以通過 squeeze 來刪除無用的維度
pooled = mp(hidden).reshape(sen_num,
self.out_channel)
pooled_outputs.append(pooled)
# 拼接 3 個池化後的向量
# reps: sen_num * (3*out_channel)
reps = torch.cat(pooled_outputs, dim=1)
if self.training:
reps = self.dropout(reps)
return reps
4.2.3 定義 SentEncoder
# build sent encoder
sent_hidden_size = 256
sent_num_layers = 2
class SentEncoder(nn.Module):
def __init__(self, sent_rep_size):
super(SentEncoder, self).__init__()
self.dropout = nn.Dropout(dropout)
self.sent_lstm = nn.LSTM(
input_size=sent_rep_size, # 每個句子經過 CNN 後得到 300 維向量
hidden_size=sent_hidden_size,# 輸出的維度
num_layers=sent_num_layers,
batch_first=True,
bidirectional=True
)
def forward(self, sent_reps, sent_masks):
# sent_reps: b * doc_len * sent_rep_size
# sent_masks: b * doc_len
# sent_hiddens: b * doc_len * hidden*2
# sent_hiddens: batch, seq_len, num_directions * hidden_size
sent_hiddens, _ = self.sent_lstm(sent_reps)
# 對應相乘,用到廣播,是為了只保留有句子的位置的數值
sent_hiddens = sent_hiddens * sent_masks.unsqueeze(2)
if self.training:
sent_hiddens = self.dropout(sent_hiddens)
return sent_hiddens
4.2.4 定義整個模型Attention
把 WordCNNEncoder、SentEncoder、Attention、FC 全部連線起來
# build model
class Model(nn.Module):
def __init__(self, vocab):
super(Model, self).__init__()
self.sent_rep_size = 300 # 經過 CNN 後得到的 300 維向量
self.doc_rep_size = sent_hidden_size * 2 # lstm 最後輸出的向量長度
self.all_parameters = {}
parameters = []
self.word_encoder = WordCNNEncoder(vocab)
parameters.extend(list(filter(lambda p: p.requires_grad, self.word_encoder.parameters())))
self.sent_encoder = SentEncoder(self.sent_rep_size)
self.sent_attention = Attention(self.doc_rep_size)
parameters.extend(list(filter(lambda p: p.requires_grad, self.sent_encoder.parameters())))
parameters.extend(list(filter(lambda p: p.requires_grad, self.sent_attention.parameters())))
# doc_rep_size
self.out = nn.Linear(self.doc_rep_size, vocab.label_size, bias=True)
parameters.extend(list(filter(lambda p: p.requires_grad, self.out.parameters())))
if use_cuda:
self.to(device)
if len(parameters) > 0:
self.all_parameters["basic_parameters"] = parameters
logging.info('Build model with cnn word encoder, lstm sent encoder.')
para_num = sum([np.prod(list(p.size())) for p in self.parameters()])
logging.info('Model param num: %.2f M.' % (para_num / 1e6))
def forward(self, batch_inputs):
# batch_inputs(batch_inputs1, batch_inputs2): b * doc_len * sentence_len
# batch_masks : b * doc_len * sentence_len
batch_inputs1, batch_inputs2, batch_masks = batch_inputs
batch_size, max_doc_len, max_sent_len = batch_inputs1.shape[0], batch_inputs1.shape[1], batch_inputs1.shape[2]
# batch_inputs1: sentence_num * sentence_len
batch_inputs1 = batch_inputs1.view(batch_size * max_doc_len, max_sent_len)
# batch_inputs2: sentence_num * sentence_len
batch_inputs2 = batch_inputs2.view(batch_size * max_doc_len, max_sent_len)
# batch_masks: sentence_num * sentence_len
batch_masks = batch_masks.view(batch_size * max_doc_len, max_sent_len)
# sent_reps: sentence_num * sentence_rep_size
# sen_num * (3*out_channel) = sen_num * 300
sent_reps = self.word_encoder(batch_inputs1, batch_inputs2)
# sent_reps:b * doc_len * sent_rep_size
sent_reps = sent_reps.view(batch_size, max_doc_len, self.sent_rep_size)
# batch_masks:b * doc_len * max_sent_len
batch_masks = batch_masks.view(batch_size, max_doc_len, max_sent_len)
# sent_masks:b * doc_len any(2) 表示在 第二個維度上判斷
# 表示如果如果一個句子中有詞 true,那麼這個句子就是 true,用於給 lstm 過濾
sent_masks = batch_masks.bool().any(2).float() # b x doc_len
# sent_hiddens: b * doc_len * num_directions * hidden_size
# sent_hiddens: batch, seq_len, 2 * hidden_size
sent_hiddens = self.sent_encoder(sent_reps, sent_masks)
# doc_reps: b * (2 * hidden_size)
# atten_scores: b * doc_len
doc_reps, atten_scores = self.sent_attention(sent_hiddens, sent_masks)
# b * num_labels
batch_outputs = self.out(doc_reps)
return batch_outputs
model = Model(vocab)
4.2.5 定義 Optimizer
# build optimizer
learning_rate = 2e-4
decay = .75
decay_step = 1000
class Optimizer:
def __init__(self, model_parameters):
self.all_params = []
self.optims = []
self.schedulers = []
for name, parameters in model_parameters.items():
if name.startswith("basic"):
optim = torch.optim.Adam(parameters, lr=learning_rate)
self.optims.append(optim)
l = lambda step: decay ** (step // decay_step)
scheduler = torch.optim.lr_scheduler.LambdaLR(optim, lr_lambda=l)
self.schedulers.append(scheduler)
self.all_params.extend(parameters)
else:
Exception("no nameed parameters.")
self.num = len(self.optims)
def step(self):
for optim, scheduler in zip(self.optims, self.schedulers):
optim.step()
scheduler.step()
optim.zero_grad()
def zero_grad(self):
for optim in self.optims:
optim.zero_grad()
def get_lr(self):
lrs = tuple(map(lambda x: x.get_lr()[-1], self.schedulers))
lr = ' %.5f' * self.num
res = lr % lrs
return res
4.2.6定義 sentence_split,把文章劃分為句子
#
# 作用是:根據一篇文章,把這篇文章分割成多個句子
# text 是一個新聞的文章
# vocab 是詞典
# max_sent_len 表示每句話的長度
# max_segment 表示最多有幾句話
# 最後返回的 segments 是一個list,其中每個元素是 tuple:(句子長度,句子本身)
def sentence_split(text, vocab, max_sent_len=256, max_segment=16):
words = text.strip().split()
document_len = len(words)
# 劃分句子的索引,句子長度為 max_sent_len
index = list(range(0, document_len, max_sent_len))
index.append(document_len)
segments = []
for i in range(len(index) - 1):
# 根據索引劃分句子
segment = words[index[i]: index[i + 1]]
assert len(segment) > 0
# 把出現太少的詞替換為 UNK
segment = [word if word in vocab._id2word else '<UNK>' for word in segment]
# 新增 tuple:(句子長度,句子本身)
segments.append([len(segment), segment])
assert len(segments) > 0
# 如果大於 max_segment 句話,則局數減少一半,返回一半的句子
if len(segments) > max_segment:
segment_ = int(max_segment / 2)
return segments[:segment_] + segments[-segment_:]
else:
# 否則返回全部句子
return segments
4.2.7 定義 get_examples
裡面呼叫 sentence_split
# 最後返回的資料是一個 list,每個元素是一個 tuple: (label, 句子數量,doc)
# 其中 doc 又是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
def get_examples(data, vocab, max_sent_len=256, max_segment=8):
label2id = vocab.label2id
examples = []
for text, label in zip(data['text'], data['label']):
# label
id = label2id(label)
# sents_words: 是一個list,其中每個元素是 tuple:(句子長度,句子本身)
sents_words = sentence_split(text, vocab, max_sent_len, max_segment)
doc = []
for sent_len, sent_words in sents_words:
# 把 word 轉為 id
word_ids = vocab.word2id(sent_words)
# 把 word 轉為 ext id
extword_ids = vocab.extword2id(sent_words)
doc.append([sent_len, word_ids, extword_ids])
examples.append([id, len(doc), doc])
logging.info('Total %d docs.' % len(examples))
return examples
4.2.8定義 batch_slice
# build loader
# data 引數就是 get_examples() 得到的
# data是一個 list,每個元素是一個 tuple: (label, 句子數量,doc)
# 其中 doc 又是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
def batch_slice(data, batch_size):
batch_num = int(np.ceil(len(data) / float(batch_size)))
for i in range(batch_num):
# 如果 i < batch_num - 1,那麼大小為 batch_size,否則就是最後一批資料
cur_batch_size = batch_size if i < batch_num - 1 else len(data) - batch_size * i
docs = [data[i * batch_size + b] for b in range(cur_batch_size)]
yield docs
4.2.9 定義 data_iter
裡面呼叫 batch_slice
# data 引數就是 get_examples() 得到的
# data是一個 list,每個元素是一個 tuple: (label, 句子數量,doc)
# 其中 doc 又是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
def data_iter(data, batch_size, shuffle=True, noise=1.0):
"""
randomly permute data, then sort by source length, and partition into batches
ensure that the length of sentences in each batch
"""
batched_data = []
if shuffle:
# 這裡是打亂所有資料
np.random.shuffle(data)
# lengths 表示的是 每篇文章的句子數量
lengths = [example[1] for example in data]
noisy_lengths = [- (l + np.random.uniform(- noise, noise)) for l in lengths]
sorted_indices = np.argsort(noisy_lengths).tolist()
sorted_data = [data[i] for i in sorted_indices]
else:
sorted_data = data
# 把 batch 的資料放進一個 list
batched_data.extend(list(batch_slice(sorted_data, batch_size)))
if shuffle:
# 打亂 多個 batch
np.random.shuffle(batched_data)
for batch in batched_data:
yield batch
4.2.10 定義指標計算
# some function
from sklearn.metrics import f1_score, precision_score, recall_score
def get_score(y_ture, y_pred):
y_ture = np.array(y_ture)
y_pred = np.array(y_pred)
f1 = f1_score(y_ture, y_pred, average='macro') * 100
p = precision_score(y_ture, y_pred, average='macro') * 100
r = recall_score(y_ture, y_pred, average='macro') * 100
return str((reformat(p, 2), reformat(r, 2), reformat(f1, 2))), reformat(f1, 2)
# 保留 n 位小數點
def reformat(num, n):
return float(format(num, '0.' + str(n) + 'f'))
4.2.11 定義訓練和測試的方法
包括 batch2tensor
# build trainer
import time
from sklearn.metrics import classification_report
clip = 5.0
epochs = 1
early_stops = 3
log_interval = 50
test_batch_size = 128
train_batch_size = 128
save_model = './cnn.bin'
save_test = './cnn.csv'
class Trainer():
def __init__(self, model, vocab):
self.model = model
self.report = True
# get_examples() 返回的結果是 一個 list
# 每個元素是一個 tuple: (label, 句子數量,doc)
# 其中 doc 又是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
self.train_data = get_examples(train_data, vocab)
self.batch_num = int(np.ceil(len(self.train_data) / float(train_batch_size)))
self.dev_data = get_examples(dev_data, vocab)
self.test_data = get_examples(test_data, vocab)
# criterion
self.criterion = nn.CrossEntropyLoss()
# label name
self.target_names = vocab.target_names
# optimizer
self.optimizer = Optimizer(model.all_parameters)
# count
self.step = 0
self.early_stop = -1
self.best_train_f1, self.best_dev_f1 = 0, 0
self.last_epoch = epochs
def train(self):
logging.info('Start training...')
for epoch in range(1, epochs + 1):
train_f1 = self._train(epoch)
dev_f1 = self._eval(epoch)
if self.best_dev_f1 <= dev_f1:
logging.info(
"Exceed history dev = %.2f, current dev = %.2f" % (self.best_dev_f1, dev_f1))
torch.save(self.model.state_dict(), save_model)
self.best_train_f1 = train_f1
self.best_dev_f1 = dev_f1
self.early_stop = 0
else:
self.early_stop += 1
if self.early_stop == early_stops:
logging.info(
"Eearly stop in epoch %d, best train: %.2f, dev: %.2f" % (
epoch - early_stops, self.best_train_f1, self.best_dev_f1))
self.last_epoch = epoch
break
def test(self):
self.model.load_state_dict(torch.load(save_model))
self._eval(self.last_epoch + 1, test=True)
def _train(self, epoch):
self.optimizer.zero_grad()
self.model.train()
start_time = time.time()
epoch_start_time = time.time()
overall_losses = 0
losses = 0
batch_idx = 1
y_pred = []
y_true = []
for batch_data in data_iter(self.train_data, train_batch_size, shuffle=True):
torch.cuda.empty_cache()
# batch_inputs: (batch_inputs1, batch_inputs2, batch_masks)
# 形狀都是:batch_size * doc_len * sent_len
# batch_labels: batch_size
batch_inputs, batch_labels = self.batch2tensor(batch_data)
# batch_outputs:b * num_labels
batch_outputs = self.model(batch_inputs)
# criterion 是 CrossEntropyLoss,真實標籤的形狀是:N
# 預測標籤的形狀是:(N,C)
loss = self.criterion(batch_outputs, batch_labels)
loss.backward()
loss_value = loss.detach().cpu().item()
losses += loss_value
overall_losses += loss_value
# 把預測值轉換為一維,方便下面做 classification_report,計算 f1
y_pred.extend(torch.max(batch_outputs, dim=1)[1].cpu().numpy().tolist())
y_true.extend(batch_labels.cpu().numpy().tolist())
# 梯度裁剪
nn.utils.clip_grad_norm_(self.optimizer.all_params, max_norm=clip)
for optimizer, scheduler in zip(self.optimizer.optims, self.optimizer.schedulers):
optimizer.step()
scheduler.step()
self.optimizer.zero_grad()
self.step += 1
if batch_idx % log_interval == 0:
elapsed = time.time() - start_time
lrs = self.optimizer.get_lr()
logging.info(
'| epoch {:3d} | step {:3d} | batch {:3d}/{:3d} | lr{} | loss {:.4f} | s/batch {:.2f}'.format(
epoch, self.step, batch_idx, self.batch_num, lrs,
losses / log_interval,
elapsed / log_interval))
losses = 0
start_time = time.time()
batch_idx += 1
overall_losses /= self.batch_num
during_time = time.time() - epoch_start_time
# reformat 保留 4 位數字
overall_losses = reformat(overall_losses, 4)
score, f1 = get_score(y_true, y_pred)
logging.info(
'| epoch {:3d} | score {} | f1 {} | loss {:.4f} | time {:.2f}'.format(epoch, score, f1,
overall_losses,
# 如果預測和真實的標籤都包含相同的類別數目,才能呼叫 classification_report during_time))
if set(y_true) == set(y_pred) and self.report:
report = classification_report(y_true, y_pred, digits=4, target_names=self.target_names)
logging.info('\n' + report)
return f1
# 這裡驗證集、測試集都使用這個函式,通過 test 來區分使用哪個資料集
def _eval(self, epoch, test=False):
self.model.eval()
start_time = time.time()
data = self.test_data if test else self.dev_data
y_pred = []
y_true = []
with torch.no_grad():
for batch_data in data_iter(data, test_batch_size, shuffle=False):
torch.cuda.empty_cache()
# batch_inputs: (batch_inputs1, batch_inputs2, batch_masks)
# 形狀都是:batch_size * doc_len * sent_len
# batch_labels: batch_size
batch_inputs, batch_labels = self.batch2tensor(batch_data)
# batch_outputs:b * num_labels
batch_outputs = self.model(batch_inputs)
# 把預測值轉換為一維,方便下面做 classification_report,計算 f1
y_pred.extend(torch.max(batch_outputs, dim=1)[1].cpu().numpy().tolist())
y_true.extend(batch_labels.cpu().numpy().tolist())
score, f1 = get_score(y_true, y_pred)
during_time = time.time() - start_time
if test:
df = pd.DataFrame({'label': y_pred})
df.to_csv(save_test, index=False, sep=',')
else:
logging.info(
'| epoch {:3d} | dev | score {} | f1 {} | time {:.2f}'.format(epoch, score, f1,
during_time))
if set(y_true) == set(y_pred) and self.report:
report = classification_report(y_true, y_pred, digits=4, target_names=self.target_names)
logging.info('\n' + report)
return f1
# data 引數就是 get_examples() 得到的,經過了分 batch
# batch_data是一個 list,每個元素是一個 tuple: (label, 句子數量,doc)
# 其中 doc 又是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
def batch2tensor(self, batch_data):
'''
[[label, doc_len, [[sent_len, [sent_id0, ...], [sent_id1, ...]], ...]]
'''
batch_size = len(batch_data)
doc_labels = []
doc_lens = []
doc_max_sent_len = []
for doc_data in batch_data:
# doc_data 代表一篇新聞,是一個 tuple: (label, 句子數量,doc)
# doc_data[0] 是 label
doc_labels.append(doc_data[0])
# doc_data[1] 是 這篇文章的句子數量
doc_lens.append(doc_data[1])
# doc_data[2] 是一個 list,每個 元素是一個 tuple: (句子長度,word_ids, extword_ids)
# 所以 sent_data[0] 表示每個句子的長度(單詞個數)
sent_lens = [sent_data[0] for sent_data in doc_data[2]]
# 取出這篇新聞中最長的句子長度(單詞個數)
max_sent_len = max(sent_lens)
doc_max_sent_len.append(max_sent_len)
# 取出最長的句子數量
max_doc_len = max(doc_lens)
# 取出這批 batch 資料中最長的句子長度(單詞個數)
max_sent_len = max(doc_max_sent_len)
# 建立 資料
batch_inputs1 = torch.zeros((batch_size, max_doc_len, max_sent_len), dtype=torch.int64)
batch_inputs2 = torch.zeros((batch_size, max_doc_len, max_sent_len), dtype=torch.int64)
batch_masks = torch.zeros((batch_size, max_doc_len, max_sent_len), dtype=torch.float32)
batch_labels = torch.LongTensor(doc_labels)
for b in range(batch_size):
for sent_idx in range(doc_lens[b]):
# batch_data[b][2] 表示一個 list,是一篇文章中的句子
sent_data = batch_data[b][2][sent_idx] #sent_data 表示一個句子
for word_idx in range(sent_data[0]): # sent_data[0] 是句子長度(單詞數量)
# sent_data[1] 表示 word_ids
batch_inputs1[b, sent_idx, word_idx] = sent_data[1][word_idx]
# # sent_data[2] 表示 extword_ids
batch_inputs2[b, sent_idx, word_idx] = sent_data[2][word_idx]
# mask 表示 哪個位置是有詞,後面計算 attention 時,沒有詞的地方會被置為 0
batch_masks[b, sent_idx, word_idx] = 1
if use_cuda:
batch_inputs1 = batch_inputs1.to(device)
batch_inputs2 = batch_inputs2.to(device)
batch_masks = batch_masks.to(device)
batch_labels = batch_labels.to(device)
return (batch_inputs1, batch_inputs2, batch_masks), batch_labels
2
# train
trainer = Trainer(model, vocab)
trainer.train()
# test
trainer.test()
至此,整個流程就講解完了。希望對你有所幫助。
如果你覺得這篇文章對你有幫助,不妨點個贊,讓我有更多動力寫出好文章。
我的文章會首發在公眾號上,歡迎掃碼關注我的公眾號張賢同學。