基於RNN的文字分類模型(Tensorflow)
阿新 • • 發佈:2019-01-02
基於LSTM(Long-Short Term Memory,長短時記憶人工神經網路,RNN的一種)搭建一個文字意圖分類的深度學習模型(基於Python3和Tensorflow1.2),其結構圖如下:
如圖1所示,整個模型包括兩部分
第一部分:句子特徵提取
Step1 讀取資料(這裡是經過結巴分詞後的句子),按比例劃分訓練集和驗證集,這裡每個句子都生成了相應的mask向量,用以標記每個輸入文字的實際長度(在後期的模型中根據mask向量將padding為0部分所對應的隱藏層輸出砍掉)。這裡有幾個可選項:
1. reverse: 考慮到句子中越靠後的詞重要程度越高,因此可對句子進行逆序輸入;
2. enhance: 樣本數較小的時候可選擇資料增強,即打亂句子順序來構建新樣本;
3. sort_by_len: 對句子按照長短進行排序
4. shuffle:打亂樣本順序,隨機取樣
import numpy as np import sys sys.path.append("..") import random # file path # dataset_path = '/data/PycharmProjects/question_matching_framework/work_space/example/dataset/aaa' def load_cn_data_from_files(classify_files): count = len(classify_files) x_text = [] y = [] for index in range(count): classify_file = classify_files[index] lines = list(open(classify_file, "r").readlines()) label = [0] * count label[index] = 1 labels = [label for _ in lines] if index == 0: x_text = lines y = labels else: x_text = x_text + lines y = np.concatenate([y, labels]) x_text = [clean_str_cn(sent) for sent in x_text] return [x_text, y] def clean_str_cn(string): """ Tokenization/string cleaning for all datasets except for SST. Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py """ return string.strip().lower() def load_data(classify_files, config, sort_by_len=True, enhance = True, reverse=True): x_text, y = load_cn_data_from_files(classify_files) new_text = [] if reverse == True: for text in x_text: text_list = text.strip().split(' ') text_list.reverse() reversed_text = ' '.join(text_list) new_text.append(reversed_text) x_text = new_text else: pass y = list(y) original_dataset = list(zip(x_text, y)) if enhance == True: num_sample = len(original_dataset) # shuffle for i in range(num_sample): text_list = original_dataset[i][0].split(' ') random.shuffle(text_list) text_shuffled = ' '.join(text_list) label_shuffled = original_dataset[i][1] x_text.append(text_shuffled) y.append(label_shuffled) else: pass # Randomly shuffle data shuffle_indices = list(range(len(y))) random.shuffle(shuffle_indices) # print(shuffle_indices) x_shuffled = [] y_shuffled_tmp = [] for shuffle_indice in shuffle_indices: x_shuffled.append(x_text[shuffle_indice]) y_shuffled_tmp.append(y[shuffle_indice]) y_shuffled = np.array(y_shuffled_tmp) # train_set length n_samples = len(x_shuffled) # shuffle and generate train and valid data set sidx = np.random.permutation(n_samples) n_train = int(np.round(n_samples * (1. - config.valid_portion))) print("Train/Test split: {:d}/{:d}".format(n_train, (n_samples - n_train))) valid_set_x = [x_shuffled[s] for s in sidx[n_train:]] valid_set_y = [y_shuffled[s] for s in sidx[n_train:]] train_set_x = [x_shuffled[s] for s in sidx[:n_train]] train_set_y = [y_shuffled[s] for s in sidx[:n_train]] train_set = (train_set_x, train_set_y) valid_set = (valid_set_x, valid_set_y) # test_set = (x_test, y_test) # test_set_x, test_set_y = test_set valid_set_x, valid_set_y = valid_set train_set_x, train_set_y = train_set def len_argsort(seq): return sorted(range(len(seq)), key=lambda x: len(seq[x])) if sort_by_len: sorted_index = len_argsort(valid_set_x) valid_set_x = [valid_set_x[i] for i in sorted_index] valid_set_y = [valid_set_y[i] for i in sorted_index] sorted_index = len_argsort(train_set_x) train_set_x = [train_set_x[i] for i in sorted_index] train_set_y = [train_set_y[i] for i in sorted_index] train_set=(train_set_x,train_set_y) valid_set=(valid_set_x,valid_set_y) max_len = config.num_step def generate_mask(data_set): set_x = data_set[0] mask_x = np.zeros([max_len, len(set_x)]) for i,x in enumerate(set_x): x_list = x.split(' ') if len(x_list) < max_len: mask_x[0:len(x_list), i] = 1 else: mask_x[:, i] = 1 new_set = (set_x, data_set[1], mask_x) return new_set train_set = generate_mask(train_set) valid_set = generate_mask(valid_set) train_data = (train_set[0], train_set[1], train_set[2]) valid_data = (valid_set[0], valid_set[1], valid_set[2]) return train_data, valid_data # return batch data set def batch_iter(data,batch_size, shuffle = True): # get data set and label x, y, mask_x = data # mask_x = np.array(mask_x) mask_x = np.asarray(mask_x).T.tolist() data_size = len(x) if shuffle: shuffle_indices = list(range(data_size)) random.shuffle(shuffle_indices) shuffled_x = [] shuffled_y = [] shuffled_mask_x = [] for shuffle_indice in shuffle_indices: shuffled_x.append(x[shuffle_indice]) shuffled_y.append(y[shuffle_indice]) shuffled_mask_x.append(mask_x[shuffle_indice]) else: shuffled_x = x shuffled_y = y shuffled_mask_x = mask_x shuffled_mask_x = np.asarray(shuffled_mask_x).T # .tolist() shuffled_x = np.array(shuffled_x) shuffled_y = np.array(shuffled_y) shuffled_mask_x = np.array(shuffled_mask_x) # num_batches_per_epoch=int((data_size-1)/batch_size) + 1 num_batches_per_epoch = data_size // batch_size for batch_index in range(num_batches_per_epoch): start_index=batch_index*batch_size end_index=min((batch_index+1)*batch_size,data_size) return_x = shuffled_x[start_index:end_index] return_y = shuffled_y[start_index:end_index] return_mask_x = shuffled_mask_x[:,start_index:end_index] yield (return_x,return_y,return_mask_x)
Step2 對輸入到模型中的句子進行Word Embedding,將每個詞表示成一個數值型的詞向量。這個過程中對於不同長度的問題文字,pad和截斷成一樣長度的。太短的就補空格,太長的就截斷。從而構建維數一致的模型句向量輸入。(這裡呼叫了別人訓練好的詞向量模型word2vec.bin)
x_embedded = wv.embedding_lookup(len(list(x)), config.num_step, config.embed_dim, list(x), 0)
第二部分:基於RNN的分類器模型
每個詞經過embedding之後,進入LSTM層,這裡用的是標準的LSTM,然後經過一個時間序列得到的n 個隱藏LSTM神經單元的向量,這些向量經過mean pooling層之後,可以得到一個向量h,然後緊接著是一個Softmax層,得到一個類別分佈概率向量,取概率值最大的類別作為最終預測結果。
import inspect
import tensorflow as tf
class RNN_Model(object):
def __init__(self, config, num_classes, is_training=True):
keep_prob = config.keep_prob
batch_size = config.batch_size
num_step = config.num_step
embed_dim = config.embed_dim
self.embedded_x = tf.placeholder(tf.float32, [None, num_step, embed_dim], name="embedded_chars")
self.target = tf.placeholder(tf.int64, [None, num_classes], name='target')
self.mask_x = tf.placeholder(tf.float32, [num_step, None], name="mask_x")
hidden_neural_size=config.hidden_neural_size
hidden_layer_num=config.hidden_layer_num
# build LSTM network
def lstm_cell():
if 'reuse' in inspect.signature(tf.contrib.rnn.BasicLSTMCell.__init__).parameters:
return tf.contrib.rnn.BasicLSTMCell(hidden_neural_size, forget_bias=0.0,
state_is_tuple=True,
reuse=tf.get_variable_scope().reuse)
else:
return tf.contrib.rnn.BasicLSTMCell(
hidden_neural_size, forget_bias=0.0, state_is_tuple=True)
attn_cell = lstm_cell
if is_training and keep_prob < 1:
def attn_cell():
return tf.contrib.rnn.DropoutWrapper(
lstm_cell(), output_keep_prob=config.keep_prob)
cell = tf.contrib.rnn.MultiRNNCell(
[attn_cell() for _ in range(hidden_layer_num)], state_is_tuple=True)
self._initial_state = cell.zero_state(batch_size, dtype=tf.float32)
inputs = self.embedded_x
if keep_prob < 1:
inputs = tf.nn.dropout(inputs, keep_prob)
out_put = []
state = self._initial_state
with tf.variable_scope("LSTM_layer"):
for time_step in range(num_step):
if time_step > 0: tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(inputs[:, time_step,:],state)
out_put.append(cell_output)
out_put=out_put*self.mask_x[:,:,None]
with tf.name_scope("mean_pooling_layer"):
out_put = tf.reduce_sum(out_put,0)/(tf.reduce_sum(self.mask_x,0)[:,None])
with tf.name_scope("Softmax_layer_and_output"):
softmax_w = tf.get_variable("softmax_w",[hidden_neural_size,num_classes],dtype=tf.float32)
softmax_b = tf.get_variable("softmax_b",[num_classes],dtype=tf.float32)
# self.logits = tf.matmul(out_put,softmax_w)
# self.scores = tf.add(self.logits, softmax_b, name='scores')
self.scores = tf.nn.xw_plus_b(out_put, softmax_w, softmax_b, name="scores")
with tf.name_scope("loss"):
self.loss = tf.nn.softmax_cross_entropy_with_logits(labels=self.target, logits=self.scores + 1e-10)
self.cost = tf.reduce_mean(self.loss)
with tf.name_scope("accuracy"):
self.prediction = tf.argmax(self.scores, 1, name="prediction")
correct_prediction = tf.equal(self.prediction, tf.argmax(self.target, 1))
self.correct_num = tf.reduce_sum(tf.cast(correct_prediction, tf.float32))
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
self.probability = tf.nn.softmax(self.scores, name="probability")
# add summary
loss_summary = tf.summary.scalar("loss", self.cost)
# add summary
accuracy_summary = tf.summary.scalar("accuracy_summary", self.accuracy)
if not is_training:
return
self.global_step = tf.Variable(0, name="global_step", trainable=False)
self.lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars), config.max_grad_norm)
# Keep track of gradient values and sparsity (optional)
grad_summaries = []
for g, v in zip(grads, tvars):
if g is not None:
grad_hist_summary = tf.summary.histogram("{}/grad/hist".format(v.name), g)
sparsity_summary = tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g))
grad_summaries.append(grad_hist_summary)
grad_summaries.append(sparsity_summary)
self.grad_summaries_merged = tf.summary.merge(grad_summaries)
self.summary = tf.summary.merge([loss_summary,accuracy_summary,self.grad_summaries_merged])
optimizer = tf.train.GradientDescentOptimizer(self.lr)
optimizer.apply_gradients(zip(grads, tvars))
self.train_op=optimizer.apply_gradients(zip(grads, tvars))
self.new_lr = tf.placeholder(tf.float32,shape=[],name="new_learning_rate")
self._lr_update = tf.assign(self.lr,self.new_lr)
def assign_new_lr(self,session,lr_value):
session.run(self._lr_update,feed_dict={self.new_lr:lr_value})
舉例(QA中的問題意圖分類):
輸入:你好 呀
意圖類別:greeting
具體程式碼參見程式碼