TextCNN 程式碼詳解(附測試資料集以及GitHub 地址)
前言:本篇是TextCNN系列的第三篇,分享TextCNN的優化經驗
前兩篇可見:
文字分類演算法TextCNN原理詳解(一)
一、textCNN 整體框架
1. 模型架構
圖一:textCNN 模型結構示意
2. 程式碼架構
圖二: 程式碼架構說明
-
text_cnn.py 定義了textCNN 模型網路結構
-
model.py 定義了訓練程式碼
-
data.py 定義了資料預處理操作
-
data_set 存放了測試資料集合. polarity.neg 是負面情感文字, polarity.pos 是正面情感文字
-
train-eval.sh 執行指令碼
3.程式碼地址
專案地址
部分程式碼參考了 此處程式碼
4.訓練效果說明:
圖三:訓練效果展示
二、textCNN model 程式碼介紹
2.1 wordEmbedding
圖四:WordEmbedding 例子說明
簡要說明:
vocab_size: 詞典大小18758
embedding_dim: 詞向量大小 為128
seq_length: 句子長度,設定最長為56
embedding_look: 查表操作 根據每個詞的位置id 去初始化的w中尋找對應id的向量. 得到一個tensor :[batch_size, seq_length, embedding_size] 既 [?, 56, 128], 此處? 表示batch, 即不知道會有多少輸入。
# embedding layer with tf.name_scope("embedding"): self.W = tf.Variable(tf.random_uniform([self._config.vocab_size, self._config.embedding_dim], -1.0, 1.0), name="W") self.char_emb = tf.nn.embedding_lookup(self.W, self.input_x) self.char_emb_expanded = tf.expand_dims(self.char_emb, -1) tf.logging.info("Shape of embedding_chars:{}".format(str(self.char_emb_expanded.shape)))
舉例說明:我們有一個詞典大小為3的詞典,一共對應三個詞 “今天”,“天氣” “很好“,w =[[0,0,0,1],[0,0,1,0],[0,1,0,0]]。
我們有兩個句子,”今天天氣“,經過預處理後輸入是[0,1]. 經過embedding_lookup 後,根據0 去查詢 w 中第一個位置的向量[0,0,0,1], 根據1去查詢 w 中第二個位置的向量[0,0,1,0] 得到我們的char_emb [[0,0,0,1],[0,0,1,0]]
同理,“天氣很好”,預處理後是[1,2]. 經過經過embedding_lookup 後, 得到 char_emb 為[[0,0,1,0],[0,1,0,0]]
因為, 卷積神經網conv2d是需要接受四維向量的,故將char_embdding 增廣一維,從 [?, 56, 128] 增廣到[?, 56, 128, 1]
2.2 Convolution 卷積 + Max-Pooling
圖五:卷積例子說明
簡要說明:
filter_size= 3,4,5. 每個filter 的寬度與詞向量等寬,這樣只能進行一維滑動。
每一種filter卷積後,結果輸出為[batch_size, seq_length - filter_size +1,1,num_filter]的tensor。
# convolution + pooling layer pooled_outputs = [] for i, filter_size in enumerate(self._config.filter_sizes): with tf.variable_scope("conv-maxpool-%s" % filter_size): # convolution layer filter_width = self._config.embedding_dim input_channel_num = 1 output_channel_num = self._config.num_filters filter_shape = [filter_size, filter_width, input_channel_num, output_channel_num] n = filter_size * filter_width * input_channel_num kernal = tf.get_variable(name="kernal", shape=filter_shape, dtype=tf.float32, initializer=tf.random_normal_initializer(stddev=np.sqrt(2.0 / n))) bias = tf.get_variable(name="bias", shape=[output_channel_num], dtype=tf.float32, initializer=tf.zeros_initializer) # apply convolution process # conv shape: [batch_size, max_seq_len - filter_size + 1, 1, output_channel_num] conv = tf.nn.conv2d( input=self.char_emb_expanded, filter=kernal, strides=[1, 1, 1, 1], padding="VALID", name="cov") tf.logging.info("Shape of Conv:{}".format(str(conv.shape))) # apply non-linerity h = tf.nn.relu(tf.nn.bias_add(conv, bias), name="relu") tf.logging.info("Shape of h:{}".format(str(h))) # Maxpooling over the outputs pooled = tf.nn.max_pool( value=h, ksize=[1, self._config.max_seq_length - filter_size + 1, 1, 1], strides=[1, 1, 1, 1], padding="VALID", name="pool" ) tf.logging.info("Shape of pooled:{}".format(str(pooled.shape))) pooled_outputs.append(pooled) tf.logging.info("Shape of pooled_outputs:{}".format(str(np.array(pooled_outputs).shape))) # concatenate all filter's output total_filter_num = self._config.num_filters * len(self._config.filter_sizes) all_features = tf.reshape(tf.concat(pooled_outputs, axis=-1), [-1, total_filter_num]) tf.logging.info("Shape of all_features:{}".format(str(all_features.shape)))
由於我們有三種filter_size, 故會得到三種tensor
第一種 tensor, filter_size 為 3處理後的,[?,56-3+1,1, 128] -> [?,54,1, 128]
第二種 tensor, filter_size 為 4處理後的,[?,56-4+1,1, 128] -> [?,53,1, 128]
第三種 tensor, filter_size 為 5處理後的,[?,56-5+1,1, 128] -> [?,52,1, 128]
再用ksize=[?,seq_length - filter_size + 1,1,1]進行max_pooling,得到[?,1,1,num_filter]這樣的tensor. 經過max_pooling 後
第一種 tensor, [?,54,1, 128] –> [?,1,1, 128]
第二種 tensor, [?,53,1, 128] -> [?,1,1, 128]
第三種 tensor, [?,52,1, 128] -> [?,1,1, 128]
將得到的三種結果進行組合,得到[?,1,1,num_filter*3]的tensor.最後將結果變形一下[-1,num_filter*3],目的是為了下面的全連線
[?,1,1, 128], [?,1,1, 128], [?,1,1, 128] –> [?, 384]
2.3 使用softmax k分類
圖六:softmax 示意
簡要說明:
label_size 為 文字分類類別數目,這裡是二分類,然後得到輸出的結果scores,以及得到預測類別在標籤詞典中對應的數值predicitons。使用交叉墒求loss.
with tf.name_scope("output"): W = tf.get_variable( name="W", shape=[total_filter_num, self._config.label_size], initializer=tf.contrib.layers.xavier_initializer()) b = tf.Variable(tf.constant(0.1, shape=[self._config.label_size]), name="b") l2_loss += tf.nn.l2_loss(W) l2_loss += tf.nn.l2_loss(b) self.scores = tf.nn.xw_plus_b(all_features, W, b, name="scores") self.predictions = tf.argmax(self.scores, 1, name="predictions") # compute loss with tf.name_scope("loss"): losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y) self.loss = tf.reduce_mean(losses) + self._config.l2_reg_lambda * l2_loss
三、 textCNN 訓練模組
簡要說明:利用資料預處理模組載入資料,優化函式選擇adam, 每個batch為64. 進行處理
def train(x_train, y_train, vocab_processor, x_dev, y_dev, model_config): with tf.Graph().as_default(): sess = tf.Session() with sess.as_default(): cnn = TextCNNModel( config=model_config, is_training=FLAGS.is_train ) # Define Training proceduce global_step = tf.Variable(0, name="global_step", trainable=False) optimizer = tf.train.AdamOptimizer(1e-3) grads_and_vars = optimizer.compute_gradients(cnn.loss) train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step) # Checkpoint directory, Tensorflow assumes this directioon already exists so we need to create it checkpoint_dir = os.path.abspath(os.path.join(FLAGS.output_dir, "checkpoints")) checkpoint_prefix = os.path.join(checkpoint_dir, "model") if not os.path.exists(checkpoint_dir): os.makedirs(checkpoint_dir) saver = tf.train.Saver(tf.global_variables(), max_to_keep=FLAGS.keep_checkpoint_max) # Write vocabulary vocab_processor.save(os.path.join(FLAGS.output_dir, "vocab")) # Initialize all variables sess.run(tf.global_variables_initializer()) def train_step(x_batch, y_batch): """ A singel training step :param x_batch: :param y_batch: :return: """ feed_dict = { cnn.input_x: x_batch, cnn.input_y: y_batch } _, step, loss, accuracy = sess.run( [train_op, global_step, cnn.loss, cnn.accuracy], feed_dict) time_str = datetime.datetime.now().isoformat() tf.logging.info("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) def dev_step(x_batch, y_batch, writer=None): """ Evaluates model on a dev set """ feed_dict = { cnn.input_x: x_batch, cnn.input_y: y_batch } step, loss, accuracy = sess.run( [global_step, cnn.loss, cnn.accuracy], feed_dict) time_str = datetime.datetime.now().isoformat() tf.logging.info("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) # Generate batches batches = data.DataSet.batch_iter(list(zip(x_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs) # Training loop, For each batch .. for batch in batches: x_batch, y_batch = zip(*batch) train_step(x_batch, y_batch) current_step = tf.train.global_step(sess, global_step) if current_step % FLAGS.save_checkpoints_steps == 0: tf.logging.info("\nEvaluation:") dev_step(x_dev, y_dev) if current_step % FLAGS.save_checkpoints_steps == 0: path = saver.save(sess, checkpoint_prefix, global_step=current_step) tf.logging.info("Saved model checkpoint to {}\n".format(path))
四、textCNN 資料預處理
簡要說明:處理輸入資料
class DataSet(object): def __init__(self, positive_data_file, negative_data_file): self.x_text, self.y = self.load_data_and_labels(positive_data_file, negative_data_file) def load_data_and_labels(self, positive_data_file, negative_data_file): # load data from files positive_data = list(open(positive_data_file, "r", encoding='utf-8').readlines()) positive_data = [s.strip() for s in positive_data] negative_data = list(open(negative_data_file, "r", encoding='utf-8').readlines()) negative_data = [s.strip() for s in negative_data] # split by words x_text = positive_data + negative_data x_text = [self.clean_str(sent) for sent in x_text] # generate labels positive_labels = [[0, 1] for _ in positive_data] negative_labels = [[1, 0] for _ in negative_data] y = np.concatenate([positive_labels, negative_labels], 0) return [x_text, y] def clean_str(self, string): """ Tokenization/string cleaning for all datasets except for SST. Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py """ string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string) string = re.sub(r"\'s", " \'s", string) string = re.sub(r"\'ve", " \'ve", string) string = re.sub(r"n\'t", " n\'t", string) string = re.sub(r"\'re", " \'re", string) string = re.sub(r"\'d", " \'d", string) string = re.sub(r"\'ll", " \'ll", string) string = re.sub(r",", " , ", string) string = re.sub(r"!", " ! ", string) string = re.sub(r"\(", " \( ", string) string = re.sub(r"\)", " \) ", string) string = re.sub(r"\?", " \? ", string) string = re.sub(r"\s{2,}", " ", string) return string.strip().lower() def batch_iter(data, batch_size, num_epochs, shuffle=True): """ Generates a batch iterator for a dataset. """ data = np.array(data) data_size = len(data) num_batches_per_epoch = int((len(data) - 1) / batch_size) + 1 for epoch in range(num_epochs): # Shuffle the data at each epoch if shuffle: shuffle_indices = np.random.permutation(np.arange(data_size)) shuffled_data = data[shuffle_indices] else: shuffled_data = data for batch_num in range(num_batches_per_epoch): start_index = batch_num * batch_size end_index = min((batch_num + 1) * batch_size, data_size) yield shuffled_data[start_index:end_index]
五、模型訓練
簡要說明:修改code_dir , 執行train-eval.sh 即可執行
#!/bin/bash export CUDA_VISIBLE_DEVICES=0 #如果執行的話,更改code_dir目錄 CODE_DIR="/home/work/work/modifyAI/textCNN" MODEL_DIR=$CODE_DIR/model TRAIN_DATA_DIR=$CODE_DIR/data_set nohup python3 $CODE_DIR/model.py \ --is_train=true \ --num_epochs=200 \ --save_checkpoints_steps=100 \ --keep_checkpoint_max=50 \ --batch_size=64 \ --positive_data_file=$TRAIN_DATA_DIR/polarity.pos \ --negative_data_file=$TRAIN_DATA_DIR/polarity.neg \ --model_dir=$MODEL_DIR > $CODE_DIR/train_log.txt 2>&1 &
六、總結
-
介紹了textCNN基本架構,程式碼架構,專案地址,訓練效果
-
詳細說明textCNN 用tensorflow如何實現
-
介紹了textCNN 模型訓練程式碼以及資料預處理模組
-
詳細說明如何執行該專案
-
下一次會介紹如何調優textCNN 模型
&n