TensorFlow練手專案一:使用迴圈神經網路(RNN)實現影評情感分類
使用迴圈神經網路(RNN)實現影評情感分類
作為對迴圈神經網路的實踐,我用迴圈神經網路做了個影評情感的分類,即判斷影評的感情色彩是正面的,還是負面的。
選擇使用RNN來做情感分類,主要是因為影評是一段文字,是序列的,而RNN對序列的支援比較好,能夠“記憶”前文。雖然可以提取特徵詞向量,然後交給傳統機器學習模型或全連線神經網路去做,也能取得很好的效果,但只從端對端的角度來看的話,RNN無疑是最合適的。
以下介紹實現過程。
一、資料預處理
資料下載下來之後需要進行解壓,得到rt-polarity.neg
和rt-polarity.pos
檔案,這兩個檔案是Windows-1252
編碼的,先將它轉成unicode
補充一下小知識,當我們開啟一個檔案,發現亂碼,卻又不知道該檔案的編碼是什麼的時候,可以使用python
的chardet
類庫進行判斷,這裡的Windows-1252
就是使用該類庫檢測出來的。
在資料預處理部分,我們要完成如下處理過程:
1.轉碼
即將檔案轉為unicode編碼,方便我們後續操作。讀取檔案,轉換編碼,重新寫入到新檔案即可。不存在技術難點。
2.生成詞彙表
讀取訓練檔案,提取出所有的單詞,並統計各個單詞出現的次數。為了避免低頻詞的干擾,同時減少模型引數,我們只保留部分高頻詞,比如這裡我只儲存出現次數前9999個,同時將低頻詞識別符號<unkown>
3.藉助詞彙表將影評轉化為詞向量
單詞是沒法直接輸入給模型的,所以我們需要將詞彙表中的每個單詞對應於一個編號,將影評資料轉化成詞向量。方便後面生成詞嵌入矩陣。
4.填充詞向量並轉化為np陣列
因為不同評論的長度是不同的,我們要組成batch進行訓練,就需要先將其長度統一。這裡我選擇以最長的影評為標準,對其他較短的影評的空白部分進行填充。然後將其轉化成numpy的陣列。
5.按比例劃分資料集
按照機器學習的慣例,資料集應被劃分為三份,即訓練集、開發集和測試集。當然,有時也會只劃分兩份,即只包括訓練集和開發集。
這裡我劃分成三份,訓練集、開發集和測試集的佔比為[0.8,0.1,0.1]。劃分的方式為輪盤賭法,在numpy中可以使用cumsum
searchsorted
來簡潔地實現輪盤賭法。
6.打亂資料集,寫入檔案
為了取得更好的訓練效果,將資料集隨機打亂。為了保證在訓練和模型調整的過程中訓練集、開發集、測試集不發生改變,將三個資料集寫入到檔案中,使用的時候從檔案中讀取。
下面貼上資料預處理的程式碼,註釋寫的很細,就不多說了。
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午2:28
# @Author : AaronJny
# @Email : [email protected]
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import collections
import settings
import utils
import numpy as np
def create_vocab():
"""
建立詞彙表,寫入檔案中
:return:
"""
# 存放出現的所有單詞
word_list = []
# 從檔案中讀取資料,拆分單詞
with open(settings.NEG_TXT, 'r') as f:
f_lines = f.readlines()
for line in f_lines:
words = line.strip().split()
word_list.extend(words)
with open(settings.POS_TXT, 'r') as f:
f_lines = f.readlines()
for line in f_lines:
words = line.strip().split()
word_list.extend(words)
# 統計單詞出現的次數
counter = collections.Counter(word_list)
sorted_words = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 選取高頻詞
word_list = [word[0] for word in sorted_words]
word_list = ['<unkown>'] + word_list[:settings.VOCAB_SIZE - 1]
# 將詞彙表寫入檔案中
with open(settings.VOCAB_PATH, 'w') as f:
for word in word_list:
f.write(word + '\n')
def create_vec(txt_path, vec_path):
"""
根據詞彙表生成詞向量
:param txt_path: 影評檔案路徑
:param vec_path: 輸出詞向量路徑
:return:
"""
# 獲取單詞到編號的對映
word2id = utils.read_word_to_id_dict()
# 將語句轉化成向量
vec = []
with open(txt_path, 'r') as f:
f_lines = f.readlines()
for line in f_lines:
tmp_vec = [str(utils.get_id_by_word(word, word2id)) for word in line.strip().split()]
vec.append(tmp_vec)
# 寫入檔案中
with open(vec_path, 'w') as f:
for tmp_vec in vec:
f.write(' '.join(tmp_vec) + '\n')
def cut_train_dev_test():
"""
使用輪盤賭法,劃分訓練集、開發集和測試集
打亂,並寫入不同檔案中
:return:
"""
# 三個位置分別存放訓練、開發、測試
data = [[], [], []]
labels = [[], [], []]
# 累加概率 rate [0.8,0.1,0.1] cumsum_rate [0.8,0.9,1.0]
rate = np.array([settings.TRAIN_RATE, settings.DEV_RATE, settings.TEST_RATE])
cumsum_rate = np.cumsum(rate)
# 使用輪盤賭法劃分資料集
with open(settings.POS_VEC, 'r') as f:
f_lines = f.readlines()
for line in f_lines:
tmp_data = [int(word) for word in line.strip().split()]
tmp_label = [1, ]
index = int(np.searchsorted(cumsum_rate, np.random.rand(1) * 1.0))
data[index].append(tmp_data)
labels[index].append(tmp_label)
with open(settings.NEG_VEC, 'r') as f:
f_lines = f.readlines()
for line in f_lines:
tmp_data = [int(word) for word in line.strip().split()]
tmp_label = [0, ]
index = int(np.searchsorted(cumsum_rate, np.random.rand(1) * 1.0))
data[index].append(tmp_data)
labels[index].append(tmp_label)
# 計算一下實際上分割出來的比例
print '最終分割比例', np.array([map(len, data)], dtype=np.float32) / sum(map(len, data))
# 打亂資料,寫入到檔案中
shuffle_data(data[0], labels[0], settings.TRAIN_DATA)
shuffle_data(data[1], labels[1], settings.DEV_DATA)
shuffle_data(data[2], labels[2], settings.TEST_DATA)
def shuffle_data(x, y, path):
"""
填充資料,生成np陣列
打亂資料,寫入檔案中
:param x: 資料
:param y: 標籤
:param path: 儲存路徑
:return:
"""
# 計算影評的最大長度
maxlen = max(map(len, x))
# 填充資料
data = np.zeros([len(x), maxlen], dtype=np.int32)
for row in range(len(x)):
data[row, :len(x[row])] = x[row]
label = np.array(y)
# 打亂資料
state = np.random.get_state()
np.random.shuffle(data)
np.random.set_state(state)
np.random.shuffle(label)
# 儲存資料
np.save(path + '_data', data)
np.save(path + '_labels', label)
def decode_file(infile, outfile):
"""
將檔案的編碼從'Windows-1252'轉為Unicode
:param infile: 輸入檔案路徑
:param outfile: 輸出檔案路徑
:return:
"""
with open(infile, 'r') as f:
txt = f.read().decode('Windows-1252')
with open(outfile, 'w') as f:
f.write(txt)
if __name__ == '__main__':
# 解碼檔案
decode_file(settings.ORIGIN_POS, settings.POS_TXT)
decode_file(settings.ORIGIN_NEG, settings.NEG_TXT)
# 建立詞彙表
create_vocab()
# 生成詞向量
create_vec(settings.NEG_TXT, settings.NEG_VEC)
create_vec(settings.POS_TXT, settings.POS_VEC)
# 劃分資料集
cut_train_dev_test()
二、模型編寫
資料處理好之後,開始模型的編寫。這裡選用迴圈神經網路,建模過程大致如下:
1.使用embedding構建詞嵌入矩陣
在資料預處理中,我們將影評處理成了一個個單詞編號構成的向量,也就是說,一條影評,對應於一個由單詞編號構成的向量。
將這樣的向量進行embedding,即可構建出詞嵌入矩陣。在詞嵌入矩陣中,每個詞由一個向量表示,矩陣中不同向量之間的差異對應於它們表示的詞之間的差異。
2.使用LSTM作為迴圈神經網路的基本單元
長短時記憶網路(LSTM)能夠自動完成前文資訊的“記憶”和“遺忘”,在迴圈神經網路中表現良好,已經成為在迴圈神經網路中大部分人的首選。這裡我選擇使用LSTM作為迴圈神經網路的基本單元。
3.對embedding和LSTM進行隨機失活(dropout)
為了提高模型的泛化能力,並減少引數,我對embedding層和LSTM單元進行dropout。
4.建立深度為2的深度迴圈神經網路
為了提高模型的擬合能力,使用深度迴圈神經網路,我選擇的深度為2。
5.給出二分類概率
對深度迴圈神經網路的最後節點的輸出做邏輯迴歸,通過sigmoid使結果落到0-1之間,代表結果是正類的概率。
損失函式使用交叉熵,優化器選擇Adam。
此部分程式碼如下(注:程式碼中裝飾器的作用為劃分名稱空間以及保證張量運算只被定義一次):
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午2:57
# @Author : AaronJny
# @Email : [email protected]
import tensorflow as tf
import functools
import settings
HIDDEN_SIZE = 128
NUM_LAYERS = 2
def doublewrap(function):
@functools.wraps(function)
def decorator(*args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return function(args[0])
else:
return lambda wrapee: function(wrapee, *args, **kwargs)
return decorator
@doublewrap
def define_scope(function, scope=None, *args, **kwargs):
attribute = '_cache_' + function.__name__
name = scope or function.__name__
@property
@functools.wraps(function)
def decorator(self):
if not hasattr(self, attribute):
with tf.variable_scope(name, *args, **kwargs):
setattr(self, attribute, function(self))
return getattr(self, attribute)
return decorator
class Model(object):
def __init__(self, data, lables, emb_keep, rnn_keep):
"""
神經網路模型
:param data:資料
:param lables: 標籤
:param emb_keep: emb層保留率
:param rnn_keep: rnn層保留率
"""
self.data = data
self.label = lables
self.emb_keep = emb_keep
self.rnn_keep = rnn_keep
self.predict
self.loss
self.global_step
self.ema
self.optimize
self.acc
@define_scope
def predict(self):
"""
定義前向傳播過程
:return:
"""
# 詞嵌入矩陣權重
embedding = tf.get_variable('embedding', [settings.VOCAB_SIZE, HIDDEN_SIZE])
# 使用dropout的LSTM
lstm_cell = [tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), self.rnn_keep) for _ in
range(NUM_LAYERS)]
# 構建迴圈神經網路
cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
# 生成詞嵌入矩陣,並進行dropout
input = tf.nn.embedding_lookup(embedding, self.data)
dropout_input = tf.nn.dropout(input, self.emb_keep)
# 計算rnn的輸出
outputs, last_state = tf.nn.dynamic_rnn(cell, dropout_input, dtype=tf.float32)
# 做二分類問題,這裡只需要最後一個節點的輸出
last_output = outputs[:, -1, :]
# 求最後節點輸出的線性加權和
weights = tf.Variable(tf.truncated_normal([HIDDEN_SIZE, 1]), dtype=tf.float32, name='weights')
bias = tf.Variable(0, dtype=tf.float32, name='bias')
logits = tf.matmul(last_output, weights) + bias
return logits
@define_scope
def ema(self):
"""
定義移動平均
:return:
"""
ema = tf.train.ExponentialMovingAverage(settings.EMA_RATE, self.global_step)
return ema
@define_scope
def loss(self):
"""
定義損失函式,這裡使用交叉熵
:return:
"""
loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=self.label, logits=self.predict)
loss = tf.reduce_mean(loss)
return loss
@define_scope
def global_step(self):
"""
step,沒什麼好說的,注意指定trainable=False
:return:
"""
global_step = tf.Variable(0, trainable=False)
return global_step
@define_scope
def optimize(self):
"""
定義反向傳播過程
:return:
"""
# 學習率衰減
learn_rate = tf.train.exponential_decay(settings.LEARN_RATE, self.global_step, settings.LR_DECAY_STEP,
settings.LR_DECAY)
# 反向傳播優化器
optimizer = tf.train.AdamOptimizer(learn_rate).minimize(self.loss, global_step=self.global_step)
# 移動平均操作
ave_op = self.ema.apply(tf.trainable_variables())
# 組合構成訓練op
with tf.control_dependencies([optimizer, ave_op]):
train_op = tf.no_op('train')
return train_op
@define_scope
def acc(self):
"""
定義模型acc計算過程
:return:
"""
# 對前向傳播的結果求sigmoid
output = tf.nn.sigmoid(self.predict)
# 真負類
ok0 = tf.logical_and(tf.less_equal(output, 0.5), tf.equal(self.label, 0))
# 真正類
ok1 = tf.logical_and(tf.greater(output, 0.5), tf.equal(self.label, 1))
# 一個數組,所有預測正確的都為True,否則False
ok = tf.logical_or(ok0, ok1)
# 先轉化成浮點型,再通過求平均來計算acc
acc = tf.reduce_mean(tf.cast(ok, dtype=tf.float32))
return acc
三、組織資料集
我編寫了一個類用於組織資料,方便訓練和驗證使用。程式碼很簡單,就不多說了,直接貼程式碼:
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午3:33
# @Author : AaronJny
# @Email : [email protected]
import numpy as np
import settings
class Dataset(object):
def __init__(self, data_kind=0):
"""
生成一個數據集物件
:param data_kind: 決定了使用哪種資料集 0-訓練集 1-開發集 2-測試集
"""
self.data, self.labels = self.read_data(data_kind)
self.start = 0 # 記錄當前batch位置
self.data_size = len(self.data) # 樣例數
def read_data(self, data_kind):
"""
從檔案中載入資料
:param data_kind:資料集種類 0-訓練集 1-開發集 2-測試集
:return:
"""
# 獲取資料集路徑
data_path = [settings.TRAIN_DATA, settings.DEV_DATA, settings.TEST_DATA][data_kind]
# 載入
data = np.load(data_path + '_data.npy')
labels = np.load(data_path + '_labels.npy')
return data, labels
def next_batch(self, batch_size):
"""
獲取一個大小為batch_size的batch
:param batch_size: batch大小
:return:
"""
start = self.start
end = min(start + batch_size, self.data_size)
self.start = end
# 當遍歷完成後回到起點
if self.start >= self.data_size:
self.start = 0
# 返回一個batch的資料和標籤
return self.data[start:end], self.labels[start:end]
四、模型訓練
訓練過程中,額外操作主要有兩個:
1.使用移動平均
我使用移動平均的主要目的是使loss曲線儘量平滑,以及提升模型的泛化能力。
2.使用學習率指數衰減
目的是保證前期學習率足夠大,能夠快速降低loss,後期學習率變小,能更好地逼近最優解。
當然,就是說說而已,這次的訓練資料比較簡單,學習率衰減發揮的作用不大。
訓練過程中,定期儲存模型,以及checkpoint。這樣可以在訓練的同時,在驗證指令碼中讀取最新模型進行驗證。
此部分具體程式碼如下:
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午4:41
# @Author : AaronJny
# @Email : [email protected]
import settings
import tensorflow as tf
import models
import dataset
import os
BATCH_SIZE = settings.BATCH_SIZE
# 資料
x = tf.placeholder(tf.int32, [None, None])
# 標籤
y = tf.placeholder(tf.float32, [None, 1])
# emb層的dropout保留率
emb_keep = tf.placeholder(tf.float32)
# rnn層的dropout保留率
rnn_keep = tf.placeholder(tf.float32)
# 建立一個模型
model = models.Model(x, y, emb_keep, rnn_keep)
# 建立資料集物件
data = dataset.Dataset(0)
saver = tf.train.Saver()
with tf.Session() as sess:
# 全域性初始化
sess.run(tf.global_variables_initializer())
# 迭代訓練
for step in range(settings.TRAIN_TIMES):
# 獲取一個batch進行訓練
x, y = data.next_batch(BATCH_SIZE)
loss, _ = sess.run([model.loss, model.optimize],
{model.data: x, model.label: y, model.emb_keep: settings.EMB_KEEP_PROB,
model.rnn_keep: settings.RNN_KEEP_PROB})
# 輸出loss
if step % settings.SHOW_STEP == 0:
print 'step {},loss is {}'.format(step, loss)
# 儲存模型
if step % settings.SAVE_STEP == 0:
saver.save(sess, os.path.join(settings.CKPT_PATH, settings.MODEL_NAME), model.global_step)
五、驗證模型
載入最新模型進行驗證,通過修改資料集物件的引數可以制定訓練/開發/測試集進行驗證。
載入模型的時候,使用移動平均的影子變數覆蓋對應變數。
程式碼如下:
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午5:09
# @Author : AaronJny
# @Email : [email protected]
import settings
import tensorflow as tf
import models
import dataset
import os
import time
# 為了在使用GPU訓練的同時,使用CPU進行驗證
os.environ['CUDA_VISIBLE_DEVICES'] = ''
BATCH_SIZE = settings.BATCH_SIZE
# 資料
x = tf.placeholder(tf.int32, [None, None])
# 標籤
y = tf.placeholder(tf.float32, [None, 1])
# emb層的dropout保留率
emb_keep = tf.placeholder(tf.float32)
# rnn層的dropout保留率
rnn_keep = tf.placeholder(tf.float32)
# 建立一個模型
model = models.Model(x, y, emb_keep, rnn_keep)
# 建立一個數據集物件
data = dataset.Dataset(1) # 0-訓練集 1-開發集 2-測試集
# 移動平均變數
restore_variables = model.ema.variables_to_restore()
# 使用移動平均變數進行覆蓋
saver = tf.train.Saver(restore_variables)
with tf.Session() as sess:
while True:
# 載入最新的模型
ckpt = tf.train.get_checkpoint_state(settings.CKPT_PATH)
saver.restore(sess, ckpt.model_checkpoint_path)
# 計算並輸出acc
acc = sess.run([model.acc],
{model.data: data.data, model.label: data.labels, model.emb_keep: 1.0, model.rnn_keep: 1.0})
print 'acc is ', acc
time.sleep(1)
六、對詞彙表進行操作的幾個方法
把對詞彙表進行操作的幾個方法提取出來了,放到了utils.py
檔案中。
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午2:44
# @Author : AaronJny
# @Email : [email protected]
import settings
def read_vocab_list():
"""
讀取詞彙表
:return:由詞彙表中所有單片語成的列表
"""
with open(settings.VOCAB_PATH, 'r') as f:
vocab_list = f.read().strip().split('\n')
return vocab_list
def read_word_to_id_dict():
"""
生成一個單詞到編號的對映
:return:單詞到編號的字典
"""
vocab_list = read_vocab_list()
word2id = dict(zip(vocab_list, range(len(vocab_list))))
return word2id
def read_id_to_word_dict():
"""
生成一個編號到單詞的對映
:return:編號到單詞的字典
"""
vocab_list = read_vocab_list()
id2word = dict(zip(range(len(vocab_list)), vocab_list))
return id2word
def get_id_by_word(word, word2id):
"""
給定一個單詞和字典,獲得單詞在字典中的編號
:param word: 給定單詞
:param word2id: 單詞到編號的對映
:return: 若單詞在字典中,返回對應的編號 否則,返回word2id['<unkown>']
"""
if word in word2id:
return word2id[word]
else:
return word2id['<unkown>']
七、對模型進行配置
模型的配置引數大多數都被提取出來,單獨放到了settings.py
檔案中,可以在這裡對模型進行配置。
# -*- coding: utf-8 -*-
# @Time : 18-3-14 下午2:44
# @Author : AaronJny
# @Email : [email protected]
# 源資料路徑
ORIGIN_NEG = 'data/rt-polarity.neg'
ORIGIN_POS = 'data/rt-polarity.pos'
# 轉碼後的資料路徑
NEG_TXT = 'data/neg.txt'
POS_TXT = 'data/pos.txt'
# 詞彙表路徑
VOCAB_PATH = 'data/vocab.txt'
# 詞向量路徑
NEG_VEC = 'data/neg.vec'
POS_VEC = 'data/pos.vec'
# 訓練集路徑
TRAIN_DATA = 'data/train'
# 開發集路徑
DEV_DATA = 'data/dev'
# 測試集路徑
TEST_DATA = 'data/test'
# 模型儲存路徑
CKPT_PATH = 'ckpt'
# 模型名稱
MODEL_NAME = 'model'
# 詞彙表大小
VOCAB_SIZE = 10000
# 初始學習率
LEARN_RATE = 0.0001
# 學習率衰減
LR_DECAY = 0.99
# 衰減頻率
LR_DECAY_STEP = 1000
# 總訓練次數
TRAIN_TIMES = 2000
# 顯示訓練loss的頻率
SHOW_STEP = 10
# 儲存訓練模型的頻率
SAVE_STEP = 100
# 訓練集佔比
TRAIN_RATE = 0.8
# 開發集佔比
DEV_RATE = 0.1
# 測試集佔比
TEST_RATE = 0.1
# BATCH大小
BATCH_SIZE = 64
# emb層dropout保留率
EMB_KEEP_PROB = 0.5
# rnn層dropout保留率
RNN_KEEP_PROB = 0.5
# 移動平均衰減率
EMA_RATE = 0.99
八、執行模型
至此,模型構建完成。模型的執行步驟大致如下:
1.確保資料檔案放在了對應路徑中,執行python process_data
對資料進行預處理。
2.執行python train.py
對模型進行訓練,訓練好的模型會自動儲存到對應的路徑中。
3.執行python eval.py
讀取儲存的最新模型,對訓練/開發/測試集進行驗證。
我簡單跑了一下,由於資料集較小,模型的泛化能力不是很好。
當訓練集、開發集、測試集的分佈為[0.8,0.1,0.1],訓練2000個batch_size=64的mini_batch時,模型在各資料集上的acc表現大致如下:
訓練集 0.95
開發集 0.79
測試集 0.80
更多
轉行做機器學習,要學的還很多,文中如有錯誤紕漏之處,懇請諸位大佬拍磚指教…
補充
2018.5.4
有朋友說希望我發一下網路結構圖,所以就抽個時間隨便畫了一下,比較簡陋,湊合著看吧=。=