1. 程式人生 > 其它 >[Kaggle] Spam/Ham Email Classification 垃圾郵件分類(BERT)

[Kaggle] Spam/Ham Email Classification 垃圾郵件分類(BERT)

技術標籤:自然語言處理

文章目錄

練習地址:https://www.kaggle.com/c/ds100fa19
相關博文:
[Kaggle] Spam/Ham Email Classification 垃圾郵件分類(spacy)
[Kaggle] Spam/Ham Email Classification 垃圾郵件分類(RNN/GRU/LSTM)

本文使用 huggingface 上的預訓練模型,在預訓練模型的基礎上,使用垃圾郵件資料集,進行訓練 finetune,在kaggle提交測試結果

本文程式碼參考了《自然語言處理動手學Bert文字分類》

1. 資料處理

from datetime import timedelta
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
train = pd.read_csv("train.csv")
test_csv = pd.read_csv("test.csv")
train = train.fillna(" ")
test_csv = test_csv.fillna(" ")
train['all'] = train[
'subject'] + ' ' + train['email'] # 合併兩個特徵 # 切分出一些驗證集,分層抽樣 from sklearn.model_selection import StratifiedShuffleSplit splt = StratifiedShuffleSplit(n_splits=1,test_size=0.2,random_state=1) for train_idx, valid_idx in splt.split(train, train['spam']): train_part = train.loc[train_idx] valid_part =
train.loc[valid_idx] y_train = train_part['spam'] y_valid = valid_part['spam'] X_train = train_part['all'] X_valid = valid_part['all'] X_test = test_csv['subject'] + ' ' + test_csv['email'] y_test = [0]*len(X_test) # 測試集沒有標籤,這麼處理方便程式碼處理 y_test = torch.LongTensor(y_test) # 轉成tensor

2. 下載預訓練模型

預訓練模型

以上模型檔案放在一個資料夾裡,如./bert_hugginggace/

提前安裝包
pip install transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained("./bert_hugginggace")
# distilbert-base-uncased-finetuned-sst-2-english

pretrain_model = AutoModelForSequenceClassification.from_pretrained("./bert_hugginggace")

一些使用的引數

PAD, CLS = '[PAD]', '[CLS]'
max_seq_len = 128
bert_hidden = 768
num_classes = 2
learning_rate = 1e-5
decay = 0.01
num_epochs = 5
early_stop_time = 2000
batch_size = 32
save_path = "./best_model.ckpt" # 最好的模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

3. 載入資料

  • 資料需要編碼成 bert 需要的格式
    需要 token_ids, attention_mask
def load_dataset(texts, labels):
    contents = []
    for t, label in zip(texts, labels):
        token = tokenizer.tokenize(t)
        token = [CLS] + token
        # ['[CLS]', 'subject', ':', 'cell', 'phones', 'coming', 'soon', '<', 'html', '>', ...]
        seq_len = len(token)
        mask = []
        token_ids = tokenizer.convert_tokens_to_ids(token)
        # [101, 3395, 1024, 3526, 11640, 2746, 2574, 1026, 16129, 。。。]
        if len(token) < max_seq_len: # 長度不夠的,pad 補齊
            mask = [1]*len(token) + [0]*(max_seq_len-len(token))
            token_ids = token_ids + [0]*(max_seq_len-len(token))
        else: # 超長的,截斷
            mask = [1]*max_seq_len
            token_ids = token_ids[:max_seq_len]
            seq_len = max_seq_len
        y = [0]*num_classes 
        y[label] = 1 # 處理下標籤,方便後面計算 二元交叉熵損失
        contents.append((token_ids, y, seq_len, mask))
    return contents
  • 編寫資料集迭代器,訓練的時候,每次取出 batch_size 個樣本來更新權重
class datasetIter():
    def __init__(self, datasets, batch_size, device):
        self.datasets = datasets
        self.idx = 0
        self.device = device
        self.batch_size = batch_size
        self.batches = len(datasets)//batch_size
        self.residues = False
        if len(datasets)%batch_size != 0:
            self.residues = True # 剩餘不足 batch_size 個的樣本
    def __next__(self):
        if self.residues and self.idx==self.batches:
            batch_data = self.datasets[self.idx * self.batch_size : len(self.datasets)]
            self.idx += 1
            batch_data = self._to_tensor(batch_data)
            return batch_data
        elif self.idx > self.batches:
            self.idx = 0
            raise StopIteration
        else:
            batch_data = self.datasets[self.idx * self.batch_size : (self.idx+1) * self.batch_size]
            self.idx += 1
            batch_data = self._to_tensor(batch_data)
            return batch_data
    def _to_tensor(self, datasets):
        x = torch.LongTensor([item[0] for item in datasets]).to(self.device)
        y = torch.FloatTensor([item[1] for item in datasets]).to(self.device)
        seq_len = torch.LongTensor([item[2] for item in datasets]).to(self.device)
        mask = torch.LongTensor([item[3] for item in datasets]).to(self.device)
        return (x, seq_len, mask), y
    def __iter__(self):
        return self
    def __len__(self):
        if self.residues:
            return self.batches + 1
        else:
            return self.batches
def build_iter(datasets, batch_size, device):
    iter = datasetIter(datasets,batch_size,device)
    return iter

4. 定義模型

class myModel(nn.Module):
    def __init__(self):
        super(myModel, self).__init__()
        self.pretrain_model = pretrain_model # 預訓練的bert模型
        for param in self.pretrain_model.parameters():
            param.requires_grad = True # 開啟 finetune 開關
    def forward(self, x):
        context = x[0]
        mask = x[2]
        out = self.pretrain_model(context, attention_mask=mask)
        out = torch.sigmoid(out.logits) # sigmoid到 (0,1) 方便計算交叉熵
        return out

5. 訓練

import time
import torch.nn.functional as F

from sklearn import metrics
from transformers.optimization import AdamW
  • 輔助計時函式
def get_time_dif(starttime):
    # calculate used time
    endtime = time.time()
    return timedelta(seconds=int(round(endtime-starttime)))
  • 訓練
def train(model, train_iter, dev_iter, test_iter):
    starttime = time.time() # 記錄開始時間
    model.train()
    optimizer = AdamW(model.parameters(),lr=learning_rate,weight_decay=decay)
    total_batch = 0
    dev_best_loss = float("inf")
    last_improve = 0
    no_improve_flag = False
    model.train()
    for epoch in range(num_epochs):
        print("Epoch {}/{}".format(epoch+1, num_epochs))
        for i, (X, y) in enumerate(train_iter):
            outputs = model(X) # batch_size * num_classes
            model.zero_grad() # 清理梯度增量
            loss = F.binary_cross_entropy(outputs, y)
            loss.backward()
            optimizer.step()
            if total_batch%100 == 0: # 列印訓練資訊
                truelabels = torch.max(y.data, 1)[1].cpu()
                pred = torch.max(outputs, 1)[1].cpu()
                train_acc = metrics.accuracy_score(truelabels, pred)
                # 呼叫 評估函式 檢查驗證集上的效果
                dev_acc, dev_loss = evaluate(model, dev_iter) 
                # 檢查驗證集上的效果, 保留效果最好的
                if dev_loss < dev_best_loss:
                    dev_best_loss = dev_loss
                    torch.save(model.state_dict(), save_path)
                    improve = '*'
                    last_improve = total_batch
                else:
                    improve = ' '
                time_dif = get_time_dif(starttime)
                # 列印訓練資訊,id : >右對齊,n 寬度,.3 小數位數
                msg = 'Iter:{0:>6}, Train Loss:{1:>5.2}, Train Acc:{2:>6.2}, Val Loss:{3:>5.2}, val Acc :{4:>6.2%}, Time:{5} {6}'
                print(msg.format(total_batch, loss.item(),train_acc, dev_loss, dev_acc, time_dif, improve))
                model.train()
            total_batch += 1
            # 如果長時間沒有改進,認為收斂,停止訓練
            if total_batch - last_improve > early_stop_time:
                print("no improve after {} times, stop!".format(early_stop_time))
                no_improve_flag = True
                break
        if no_improve_flag:
            break
    # 呼叫 測試函式,生成預測結果
    test(model, test_iter)
  • 評估函式
def evaluate(model, dev_iter):
    model.eval() # 評估模式
    loss_total = 0
    pred_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)
    with torch.no_grad(): # 不記錄圖的操作,不更新梯度
        for X, y in dev_iter:
            outputs = model(X)
            loss = F.binary_cross_entropy(outputs, y)
            loss_total += loss
            truelabels = torch.max(y.data, 1)[1].cpu()
            pred = torch.max(outputs, 1)[1].cpu().numpy()
            labels_all = np.append(labels_all, truelabels)
            pred_all = np.append(pred_all, pred)
    acc = metrics.accuracy_score(labels_all, pred_all)
    return acc, loss_total/len(dev_iter)
  • 測試函式
def test(model, test_iter):
    model.load_state_dict(torch.load(save_path)) # 載入最佳模型
    model.eval() # 評估模式
    pred_all = np.array([], dtype=int)
    with torch.no_grad():
        for X, y in test_iter:
            outputs = model(X)
            pred = torch.max(outputs, 1)[1].cpu().numpy()
            pred_all = np.append(pred_all, pred)
    # 寫入提交檔案
    id = test_csv['id']
    output = pd.DataFrame({'id':id, 'Class': pred_all})
    output.to_csv("submission_bert.csv",  index=False)
  • 執行主程式
# 確定隨機數
np.random.seed(520)
torch.manual_seed(520)
torch.cuda.manual_seed_all(520)
torch.backends.cudnn.deterministic = True

# 載入資料
train_data = load_dataset(X_train, y_train)
valid_data = load_dataset(X_valid, y_valid)
test_data = load_dataset(X_test, y_test)

# 資料迭代器
train_iter = build_iter(train_data, batch_size, device)
valid_iter = build_iter(valid_data, batch_size, device)
test_iter = build_iter(test_data, batch_size, device)

# 模型
model = myModel().to(device)

# 訓練、評估、測試
train(model, train_iter, valid_iter, test_iter)

6. 提交測試結果

Private Score:0.98714
Public Score:0.99000

沒怎麼調參,準確率接近99%,效果還是很不錯的!

歡迎大家提出意見和指正!多謝!