使用CNN進行文字分類
阿新 • • 發佈:2019-02-19
nlp文字分類,可以使用全連線神經網路文字分類,rnn文字分類。CNN在文字分類中發展很快,
本例使用tensorflow佈置,構造一個CNN文字分類器,相應的使用方法和scikit-learn一樣僅僅需要三步(模型實體化,模型訓練,模型預測)
相應程式碼如下(檔名為:TextCNNClassifier.py):
# coding: utf-8 import tensorflow as tf import numpy as np import os class NN_config(object): def __init__(self, vocab_size, num_filters,filter_steps,num_seqs=1000,num_classes=2, embedding_size=200): self.vocab_size = vocab_size self.num_filters = num_filters self.filter_steps = filter_steps self.num_seqs = num_seqs self.num_classes = num_classes self.embedding_size = embedding_size class CALC_config(object): def __init__(self, learning_rate=0.0075, batch_size=64, num_epoches=20, l2_ratio=0.0): self.learning_rate = learning_rate self.batch_size = batch_size self.num_epoches = num_epoches self.l2_ratio = l2_ratio class TextCNNClassifier(object): ''' A class used to define text classifier use convolution network the form of class like keras or scikit-learn ''' def __init__(self,config_nn, config_calc): self.num_seqs = config_nn.num_seqs self.num_classes = config_nn.num_classes self.embedding_size = config_nn.embedding_size self.vocab_size = config_nn.vocab_size self.num_filters = config_nn.num_filters self.filter_steps = config_nn.filter_steps self.learning_rate = config_calc.learning_rate self.batch_size = config_calc.batch_size self.num_epoches = config_calc.num_epoches self.l2_ratio = config_calc.l2_ratio #tf.reset_default_graph() self.build_placeholder() self.build_embedding_layer() self.build_nn() self.build_cost() self.build_optimizer() self.saver = tf.train.Saver() def build_placeholder(self): with tf.name_scope('inputs_to_data'): self.inputs = tf.placeholder( tf.int32,shape=[None, self.num_seqs],name='inputs') self.targets = tf.placeholder(tf.float32,shape=[None, self.num_classes], name='targets') self.keep_prob = tf.placeholder(tf.float32, name='nn_keep_prob') print('self.inputs.shape:',self.inputs.shape) def build_embedding_layer(self): with tf.device('/cpu:0'),tf.name_scope('embeddings'): embeddings = tf.Variable(tf.truncated_normal(shape=[self.vocab_size,self.embedding_size],stddev=0.1),\ name = 'embeddings') x = tf.nn.embedding_lookup(embeddings, self.inputs) x = tf.expand_dims(x, axis=-1) self.x = tf.cast(x, tf.float32 ) print('x shape is:',self.x.get_shape()) def build_nn(self): conv_out = [] for i , filter_step in enumerate(self.filter_steps): with tf.name_scope("conv-network-%s"%filter_step): filter_shape = [filter_step,self.embedding_size, 1,self.num_filters] filters = tf.Variable(tf.truncated_normal(shape=filter_shape, stddev=0.1), \ name='filters') bias = tf.Variable(tf.constant(0.0, shape=[self.num_filters]), name='bias') # h_conv : shape =batch_szie * (num_seqs-filter_step+1) * 1 * num_filters h_conv = tf.nn.conv2d(self.x, filter=filters, strides = [1,1,1,1], padding='VALID', name='hidden_conv') h_relu = tf.nn.relu(tf.nn.bias_add(h_conv,bias),name='relu') ksize = [1,self.num_seqs-filter_step+1,1,1] #h_pooling: shape = batch_size * 1 * 1 * num_filters h_pooling = tf.nn.max_pool(h_relu, ksize=ksize, strides=[1,1,1,1], padding='VALID', name='pooling') conv_out.append(h_pooling) self.tot_filters_units = self.num_filters * len(self.filter_steps) self.h_pool = tf.concat(conv_out,axis=3) self.h_pool_flattern =tf.reshape(self.h_pool, shape=[-1, self.tot_filters_units]) with tf.name_scope('dropout'): self.h_pool_drop = tf.nn.dropout(self.h_pool_flattern, self.keep_prob) def build_cost(self): with tf.name_scope('cost'): W = tf.get_variable(shape=[self.tot_filters_units, self.num_classes],name='W',\ initializer = tf.contrib.layers.xavier_initializer()) bias = tf.Variable(tf.constant(0.1, shape=[self.num_classes],name='bias')) self.scores = tf.nn.xw_plus_b(self.h_pool_drop, W, bias, name='scores') self.predictions = tf.argmax(self.scores,axis=1,name='predictions') l2_loss = tf.constant(0.0,name='l2_loss') l2_loss += tf.nn.l2_loss(W) l2_loss += tf.nn.l2_loss(bias) losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.targets) self.loss = tf.reduce_mean(losses) + self.l2_ratio*l2_loss with tf.name_scope('accuracy'): pred = tf.equal(self.predictions, tf.argmax(self.targets, axis=1)) self.accuracy = tf.reduce_mean(tf.cast(pred,tf.float32)) def build_optimizer(self): with tf.name_scope('optimizer'): optimizer = tf.train.AdamOptimizer(self.learning_rate) grad_and_vars = optimizer.compute_gradients(self.loss) self.train_op = optimizer.apply_gradients(grad_and_vars) def random_batches(self,data,shuffle=True): data = np.array(data) data_size = len(data) num_batches_per_epoch = int((data_size-1)/self.batch_size) + 1 if shuffle : shuffle_index = np.random.permutation(np.arange(data_size)) shuffled_data = data[shuffle_index] else: shuffled_data = data #del data for epoch in range(self.num_epoches): for batch_num in range(num_batches_per_epoch): start = batch_num * self.batch_size end = min(start + self.batch_size,data_size) yield shuffled_data[start:end] def fit(self,data): #self.graph = tf.Graph() #with self.graph.as_default(): self.session = tf.Session() with self.session as sess: #self.saver = tf.train.Saver(tf.global_variables()) sess.run(tf.global_variables_initializer()) batches = self.random_batches(list(data)) accuracy_list = [] loss_list = [] #prediction_list = [] iterations = 0 # model saving save_path = os.path.abspath(os.path.join(os.path.curdir, 'models')) if not os.path.exists(save_path): os.makedirs(save_path) for batch in batches: iterations += 1 x_batch, y_batch = zip(*batch) x_batch = np.array(x_batch) y_batch = np.array(y_batch) feed = { self.inputs: x_batch, self.targets: y_batch, self.keep_prob: 0.5} batch_pred, batch_accuracy, batch_cost, _ = sess.run([self.predictions, self.accuracy,\ self.loss, self.train_op], feed_dict=feed) accuracy_list.append(batch_accuracy) loss_list.append(batch_cost) if iterations % 10 == 0: print('The trainning step is {0}'.format(iterations),\ 'trainning_loss: {:.3f}'.format(loss_list[-1]), \ 'trainning_accuracy: {:.3f}'.format(accuracy_list[-1])) if iterations % 100 == 0: self.saver.save(sess, os.path.join(save_path, 'model'), global_step = iterations) self.saver.save(sess, os.path.join(save_path, 'model'), global_step = iterations) def load_model(self,start_path=None): if start_path == None: start_path = os.path.abspath(os.path.join(os.path.curdir,'models')) print('default start_path is',start_path) #star = start_path ckpt = tf.train.get_checkpoint_state('./models') print('This is out checking of ckpt:',ckpt.model_checkpoint_path) #self.saver = tf.train.import_meta_graph(ckpt.model_checkpoint_path+'.meta') self.session = tf.Session() self.saver.restore(self.session, ckpt.model_checkpoint_path) print('Restored from {} completed'.format(ckpt.model_checkpoint_path)) else: self.session = tf.Session() self.saver.restore(self.session,start_path) print('Restored from {} completed'.format(start_path)) def predict_accuracy(self,data,test=True): # loading_model self.load_model() sess = self.session iterations = 0 accuracy_list = [] predictions = [] self.num_epoches = 1 batches = self.random_batches(data,shuffle=False) for batch in batches: iterations += 1 x_inputs, y_inputs = zip(*batch) x_inputs = np.array(x_inputs) y_inputs = np.array(y_inputs) feed = {self.inputs: x_inputs, self.targets: y_inputs, self.keep_prob: 1.0 } batch_pred, batch_accuracy, batch_loss = sess.run([self.predictions,\ self.accuracy, self.loss], feed_dict=feed) accuracy_list.append(batch_accuracy) predictions.append(batch_pred) print('The trainning step is {0}'.format(iterations),\ 'trainning_accuracy: {:.3f}'.format(accuracy_list[-1])) accuracy = np.mean(accuracy_list) predictions = [list(pred) for pred in predictions] predictions = [p for pred in predictions for p in pred] predictions = np.array(predictions) if test : return predictions, accuracy else: return accuracy def predict(self, data): # load_model self.load_model() sess = self.session iterations = 0 predictions_list = [] self.num_epoches = 1 batches = self.random_batches(data) for batch in batches: x_inputs = batch feed = {self.inputs : x_inputs, self.keep_prob:1.0} batch_pred = sess.run([self.predictions],feed_dict=feed) predictions_list.append(batch_pred) predictions = [list(pred) for pred in predictions_list] predictions = [p for pred in predictions for p in pred] predictions = np.array(predictions).reshape(-1,1) print(predictions) return predictions
3、在程式碼實現過程中需要用到原有網址上資料處理程式碼主要儲存在一個為:data_helpers.py檔案中(此處重點推薦其正則表示式處理方法,感覺太好不忍改動)。相應程式碼如下:
import numpy as np import re import itertools from collections import Counter import os def clean_str(string): """ Tokenization/string cleaning for all datasets except for SST. Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py """ string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string) string = re.sub(r"\'s", " \'s", string) string = re.sub(r"\'ve", " \'ve", string) string = re.sub(r"n\'t", " n\'t", string) string = re.sub(r"\'re", " \'re", string) string = re.sub(r"\'d", " \'d", string) string = re.sub(r"\'ll", " \'ll", string) string = re.sub(r",", " , ", string) string = re.sub(r"!", " ! ", string) string = re.sub(r"\(", " \( ", string) string = re.sub(r"\)", " \) ", string) string = re.sub(r"\?", " \? ", string) string = re.sub(r"\s{2,}", " ", string) return string.strip().lower() def load_data_and_labels(positive_data_file, negative_data_file): """ Loads MR polarity data from files, splits the data into words and generates labels. Returns split sentences and labels. """ # Load data from files positive_examples = list(open(positive_data_file, "r",encoding='utf8').readlines()) positive_examples = [s.strip() for s in positive_examples] negative_examples = list(open(negative_data_file, "r",encoding='utf8').readlines()) negative_examples = [s.strip() for s in negative_examples] # Split by words x_text = positive_examples + negative_examples x_text = [clean_str(sent) for sent in x_text] # Generate labels positive_labels = [[0, 1] for _ in positive_examples] negative_labels = [[1, 0] for _ in negative_examples] y = np.concatenate([positive_labels, negative_labels], 0) return [x_text, y]
4、模型訓練檔案,儲存檔名如:data_trainning.py,相應程式碼如下:
import tensorflow as tf import numpy as np import os from tensorflow.contrib import learn import data_helpers from TextCNNClassifier import NN_config, CALC_config, TextCNNClassifier # Data Preparation # ================================================== positive_data_file = "./data/rt-polaritydata/rt-polarity.pos" negative_data_file = "./data/rt-polaritydata/rt-polarity.neg" dev_sample_percentage = 0.1 # Load data print("Loading data...") x_text, y = data_helpers.load_data_and_labels(positive_data_file, negative_data_file) # Build vocabulary max_document_length = max([len(x.split(" ")) for x in x_text]) vocab_processor = learn.preprocessing.VocabularyProcessor(max_document_length) x = np.array(list(vocab_processor.fit_transform(x_text))) print('vocabulary length is:',len(vocab_processor.vocabulary_)) # Randomly shuffle data np.random.seed(10) shuffle_indices = np.random.permutation(np.arange(len(y))) x_shuffled = x[shuffle_indices] y_shuffled = y[shuffle_indices] # Split train/test set # TODO: This is very crude, should use cross-validation dev_sample_index = -1 * int(dev_sample_percentage * float(len(y))) x_train, x_dev = x_shuffled[:dev_sample_index], x_shuffled[dev_sample_index:] y_train, y_dev = y_shuffled[:dev_sample_index], y_shuffled[dev_sample_index:] print('The leangth of X_train is {}'.format(len(x_train))) print('The length of x_dev is {}'.format(len(x_dev))) #------------------------------------------------------------------------------ # ---------------- model processing ------------------------------------------ #------------------------------------------------------------------------------ num_seqs = max_document_length num_classes = 2 num_filters = 128 filter_steps = [5,6,7] embedding_size = 200 vocab_size = len(vocab_processor.vocabulary_) learning_rate = 0.001 batch_size = 128 num_epoches = 20 l2_ratio = 0.0 trains = list(zip(x_train, y_train)) devs = list(zip(x_dev,y_dev)) config_nn = NN_config(num_seqs = num_seqs, num_classes = num_classes, num_filters = num_filters, filter_steps = filter_steps, embedding_size= embedding_size, vocab_size = vocab_size) config_calc = CALC_config(learning_rate = learning_rate, batch_size = batch_size, num_epoches = num_epoches, l2_ratio = l2_ratio) print('this is checking list:\\\\\n', 'num_seqs:{}\n'.format(num_seqs),\ 'num_classes:{} \n'.format(num_classes),\ 'embedding_size:{}\n'.format(embedding_size),\ 'num_filters:{}\n'.format(num_filters),\ 'vocab_size:{}\n'.format(vocab_size),\ 'filter_steps:',filter_steps) print('this is check calc list:\\\\\n', 'learning_rate :{}\n'.format(learning_rate),\ 'num_epoches: {} \n'.format(num_epoches),\ 'batch_size: {} \n'.format(batch_size),\ 'l2_ratio : {} \n'.format(l2_ratio)) text_model = TextCNNClassifier(config_nn,config_calc) text_model.fit(trains) accuracy = text_model.predict_accuracy(devs,test=False) print('the dev accuracy is :',accuracy) predictions = text_model.predict(x_dev) #print(predictions)
5、訓練結果:
train 資料檔案訓練accuracy=1.0 相應測試文件中,不同訓練次結果其測試結果有些偏差,最好的為:74.03%,最壞的為:72.503%
資源來源於網路