1. 程式人生 > 其它 >自然語言處理--keras實現一維卷積網路對IMDB 電影評論資料集構建情感分類器

自然語言處理--keras實現一維卷積網路對IMDB 電影評論資料集構建情感分類器

技術標籤:自然語言處理卷積神經網路python情感分析nlp

為什麼在 NLP 分類任務中選擇 CNN 呢?
1.CNN神經網路可以像處理影象一樣處理文字並“理解”它們
2.主要好處是高效率
3.在許多方面,由於池化層和卷積核大小所造成的限制(雖然可以將卷積核設定得更大),會導致丟棄資訊,但這並不意味著它們不是有用的模型。利用 CNN 能夠有效地對相對較大的資料集進行檢測和預測情感
4.即使依賴 Word2vec 詞嵌入,CNN 也可以在不對映整個語言的條件下,通過較少的詞嵌入表示來執行

import numpy as np
# 處理填充輸入資料的輔助模組
from keras.preprocessing import
sequence # 基礎的 Keras 神經網路模型 from keras.models import Sequential # 模型中常用的層物件 from keras.layers import Dense, Dropout, Activation # 卷積層和池化 from keras.layers import Conv1D, GlobalMaxPooling1D import glob import os from random import shuffle from nltk.tokenize import TreebankWordTokenizer from gensim.models.
keyedvectors import KeyedVectors from nlpia.loaders import get_data from keras.models import model_from_json import tensorflow.compat.v1 as tf tf.disable_v2_behavior() # 為NumPy和TensorFlow設定隨機種子以確保可以得到一致的結果 np.random.seed(123) tf.set_random_seed(123) # 載入資料 # 文件載入預處理 def pre_process_data(filepath):
""" This is dependent on your training data source but we will try to generalize it as best as possible. """ positive_path = os.path.join(filepath, 'pos') negative_path = os.path.join(filepath, 'neg') pos_label = 1 neg_label = 0 dataset = [] # glob是實用的檔名匹配庫,glob.glob()函式將會匹配給定路徑下的所有pattern,並以列表形式返回。 # 用它可以查詢符合特定規則的檔案路徑名 for filename in glob.glob(os.path.join(positive_path, '*.txt')): with open(filename, 'r', encoding='UTF-8') as f: dataset.append((pos_label, f.read())) for filename in glob.glob(os.path.join(negative_path, '*.txt')): with open(filename, 'r', encoding='UTF-8') as f: dataset.append((neg_label, f.read())) shuffle(dataset) return dataset dataset = pre_process_data('xxx\\aclImdb\\aclImdb\\train') # 元組的第一個元素是情感的目標值:1 表示積極情感,0 表示消極情感 print(dataset[0]) # 向量化及分詞器 word_vectors = KeyedVectors.load_word2vec_format('xxx\\googlenews-vectors-negative300.bin.gz', binary=True) ''' 這裡有一些資訊損失。谷歌新聞的 Word2vec 詞彙表中只包含了一部分停用詞, 使很多像“a”這樣的常用詞將在函式處理過程中被丟棄,這個處理不是特別理想,不過大家可 以通過這個方法得到一個資訊有損失情況下的卷積神經網路的基線效能。如果想要避免資訊損 失,大家可以單獨訓練 word2vec 模型,以確保有更好的向量覆蓋率。另外,資料中還有很多類 似於<br\>的 HTML 標籤,它們通常與文字的情感無關,必須從資料中去除。 ''' def tokenize_and_vectorize(dataset): tokenizer = TreebankWordTokenizer() vectorized_data = [] expected = [] for sample in dataset: tokens = tokenizer.tokenize(sample[1]) sample_vecs = [] for token in tokens: try: sample_vecs.append(word_vectors[token]) except KeyError: pass # No matching token in the Google w2v vocab vectorized_data.append(sample_vecs) return vectorized_data # 目標標籤 def collect_expected(dataset): """ Peel off the target values from the dataset """ expected = [] for sample in dataset: expected.append(sample[0]) return expected # vectorized_data結構:[[[詞向量], [], ...], [[], [], ...], ...] vectorized_data = tokenize_and_vectorize(dataset) # [...] expected = collect_expected(dataset) # 劃分訓練集/測試集 split_point = int(len(vectorized_data)*.8) x_train = vectorized_data[:split_point] y_train = expected[:split_point] x_test = vectorized_data[split_point:] y_test = expected[split_point:] # CNN 引數 # maxlen 變數用於設定評論的最大長度, # 因為卷積神經網路的每個輸入必須具有相同的維數,所以需要截斷超出 400 個詞條的樣 # 本,並填充少於 400 個詞條的樣本,填充值可以是 Null 或 0 maxlen = 400 # 在後向傳播誤差和更新權重前,向網路輸入的樣本數量 batch_size = 32 # 傳入卷積神經網路中詞條向量的長度 embedding_dims = 300 # 要訓練的卷積核的數量 filters = 250 # 卷積核大小:每個卷積核將是一個矩陣:embedding_dims × kernel_size, # 在這裡是 250 × 3 kernel_size = 3 # 在普通的前饋網路中傳播鏈端點的神經元的數量 hidden_dims = 250 # 整個訓練資料集在網路中的傳入次數 epochs = 4 # 填充及截斷詞條序列,長度不夠的填充元素為0的詞向量 def pad_trunc(data, maxlen): """ For a given dataset pad with zero vectors or truncate to maxlen """ new_data = [] # Create a vector of 0s the length of our word vectors zero_vector = [] for _ in range(len(data[0][0])): zero_vector.append(0.0) for sample in data: if len(sample) > maxlen: temp = sample[:maxlen] elif len(sample) < maxlen: temp = sample # Append the appropriate number 0 vectors to the list additional_elems = maxlen - len(sample) for _ in range(additional_elems): temp.append(zero_vector) else: temp = sample # 最後將擴充套件後的資料放在擴充套件資料列表的最後 new_data.append(temp) return new_data # 收集經過擴充套件和截斷的資料,並將其轉換為 numpy 陣列,以便在 Keras 中使用 x_train = pad_trunc(x_train, maxlen) x_test = pad_trunc(x_test, maxlen) # 大小為樣本數量×序列長度×詞向量長度 x_train = np.reshape(x_train, (len(x_train), maxlen, embedding_dims)) y_train = np.array(y_train) x_test = np.reshape(x_test, (len(x_test), maxlen, embedding_dims)) y_test = np.array(y_test) # 卷積神經網路 # 構建一個一維 CNN print('Build model...') # Keras 中標準的模型定義方式 model = Sequential() model.add(Conv1D(filters, kernel_size, padding='valid', activation='relu', strides=1, input_shape=(maxlen, embedding_dims))) # 可選的池化方法有 GlobalMaxPooling1D()、MaxPooling1D(n)或 AvgPooling1D(n),其中 # n 表示池化區域大小,預設值為 2 # 全域性最大池化 model.add(GlobalMaxPooling1D()) # 帶 dropout 的全連線層 # 從一個普通的全連線隱藏層開始,然後加入dropout 和 ReLU model.add(Dense(hidden_dims)) model.add(Dropout(0.2)) model.add(Activation('relu')) # 輸出層,是實際的分類器 model.add(Dense(1)) model.add(Activation('sigmoid')) # 編譯 CNN:編譯為初始未訓練狀態 # compile()完成模型的構建 # loss可選:binary_crossentropy 和 categorical_crossentropy... # optimizer可選:隨機梯度下降、Adam和 RSMProp... model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 訓練 CNN,fit()完成模型的訓練: # 通過反向傳播每個樣本的誤差來學習最後面的卷積核和前饋全連線網路之間的權重, # 以及 250 個不同的卷積核各自的權重 # batch_size:反向傳播更新權重之前處理的資料樣本 # 數。每個批次中 n 個樣本的累計誤差會同時處理 # 當訓練中的損失持續減少,而驗證損失 val_loss 在週期結束時與前一週期相比開始增加 # 時,就出現了明顯的過擬合。找到驗證損失曲線開始向上彎曲的中間值是獲得一個好模型的關鍵。 model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test)) # 儲存模型 # 將模型的結構儲存在 JSON 檔案中,並將訓練後的權重儲存在另 # 一個檔案中,以便之後重新例項化 model_structure = model.to_json() with open("cnn_model.json", "w") as json_file: json_file.write(model_structure) model.save_weights("cnn_weights.h5") # 載入儲存的模型 with open("cnn_model.json", "r") as json_file: json_string = json_file.read() model = model_from_json(json_string) model.load_weights('cnn_weights.h5') # 預測 # 測試樣本資料 sample_1 = "I hate that the dismal weather had me down for so long, when will it break! Ugh, when does happiness return? The sun is blinding and the puffy clouds are too thin. I can't wait for the weekend." vec_list = tokenize_and_vectorize([(1, sample_1)]) test_vec_list = pad_trunc(vec_list, maxlen) test_vec = np.reshape(test_vec_list, (len(test_vec_list), maxlen, embedding_dims)) print(model.predict(test_vec)) print(model.predict_classes(test_vec))