TensorFlow使用CNN實現中文文字分類
讀研期間使用過TensorFlow實現過簡單的CNN情感分析(分類),當然這是比較low的二分類情況,後來進行多分類情況。但之前的學習基本上都是在英文詞庫上訓練的。斷斷續續,想整理一下手頭的專案資料,於是就拾起讀研期間的文字分類的小專案,花了一點時間,把原來英文文字分類的專案,應用在中文文字分類,效果還不錯,在THUCNews中文資料集上,準確率93.9%左右,老規矩,先上原始碼地址
Github專案原始碼:https://github.com/PanJinquan/nlp-learning-tutorials/tree/master/THUCNews, 記得給個“Star”哈
目錄
TensorFlow使用CNN實現中文文字分類
一、專案介紹
1.1 目錄結構
1.2 THUCNews資料集
二、CNN模型結構
三、文字預處理
1、jieba中文分詞
2、gensim訓練word2vec模型
四、訓練過程
五、測試過程
一、專案介紹
1.1 目錄結構
Github專案原始碼:https://github.com/PanJinquan/nlp-learning-tutorials/tree/master/THUCNews, 記得給個“Star”哈
其他資源地址:
1.THUCTC官方資料集,連結: http://thuctc.thunlp.org/message
2.THUCTC百度網盤,連結: https://pan.baidu.com/s/1DT5xY9m2yfu1YGaGxpWiBQ 提取碼: bbpe
3.已經訓練好的word2vec模型:連結: https://pan.baidu.com/s/1n4ZgiF0gbY0zsK0706wZiw 提取碼: mtrj
4.使用詞向量處理的THUCNews資料:連結: https://pan.baidu.com/s/12Hdf36QafQ3y6KgV_vLTsw 提取碼: m9dx
1.2 THUCNews資料集
THUCNews是根據新浪新聞RSS訂閱頻道2005~2011年間的歷史資料篩選過濾生成,包含74萬篇新聞文件(2.19 GB),均為UTF-8純文字格式。我們在原始新浪新聞分類體系的基礎上,重新整合劃分出14個候選分類類別:財經、彩票、房產、股票、家居、教育、科技、社會、時尚、時政、體育、星座、遊戲、娛樂。相關介紹,可以看這裡http://thuctc.thunlp.org/
下載地址:
1.官方資料集下載連結: http://thuctc.thunlp.org/message
2.百度網盤下載連結: https://pan.baidu.com/s/1DT5xY9m2yfu1YGaGxpWiBQ 提取碼: bbpe
二、CNN模型結構
CNN文字分類的網路結,如下:
下面是使用TensorFlow實現的CNN文字分類網路:TextCNN,
max_sentence_length = 300 # 最大句子長度,也就是說文字樣本中字詞的最大長度,不足補零,多餘的截斷
embedding_dim = 128 #詞向量長度,即每個字詞的維度
filter_sizes = [3, 4, 5, 6] #卷積核大小
num_filters = 200 # Number of filters per filter size 卷價個數
base_lr=0.001 # 學習率
dropout_keep_prob = 0.5
l2_reg_lambda = 0.0 # "L2 regularization lambda (default: 0.0)
import tensorflow as tf
import numpy as np
class TextCNN(object):
'''
A CNN for text classification
Uses and embedding layer, followed by a convolutional, max-pooling and softmax layer.
'''
def __init__(
self, sequence_length, num_classes,
embedding_size, filter_sizes, num_filters, l2_reg_lambda=0.0):
# Placeholders for input, output, dropout
self.input_x = tf.placeholder(tf.float32, [None, sequence_length, embedding_size], name = "input_x")
self.input_y = tf.placeholder(tf.float32, [None, num_classes], name = "input_y")
self.dropout_keep_prob = tf.placeholder(tf.float32, name = "dropout_keep_prob")
# Keeping track of l2 regularization loss (optional)
l2_loss = tf.constant(0.0)
# Embedding layer
# self.embedded_chars = [None(batch_size), sequence_size, embedding_size]
# self.embedded_chars = [None(batch_size), sequence_size, embedding_size, 1(num_channels)]
self.embedded_chars = self.input_x
self.embedded_chars_expended = tf.expand_dims(self.embedded_chars, -1)
# Create a convolution + maxpool layer for each filter size
pooled_outputs = []
for i, filter_size in enumerate(filter_sizes):# "filter_sizes", "3,4,5",
with tf.name_scope("conv-maxpool-%s" % filter_size):
# Convolution layer
filter_shape = [filter_size, embedding_size, 1, num_filters] # num_filters= 200
W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name="W")
b = tf.Variable(tf.constant(0.1, shape=[num_filters]), name="b")
conv = tf.nn.conv2d(self.embedded_chars_expended,
W,
strides=[1,1,1,1],
padding="VALID",
name="conv")
# Apply nonlinearity
h = tf.nn.relu(tf.nn.bias_add(conv, b), name = "relu")
# Maxpooling over the outputs
pooled = tf.nn.max_pool(
h,
ksize=[1, sequence_length - filter_size + 1, 1, 1],
strides=[1,1,1,1],
padding="VALID",
name="pool")
pooled_outputs.append(pooled)
# Combine all the pooled features
num_filters_total = num_filters * len(filter_sizes)
self.h_pool = tf.concat(pooled_outputs, 3)
# self.h_pool = tf.concat(3, pooled_outputs)
self.h_pool_flat = tf.reshape(self.h_pool, [-1, num_filters_total])
# Add dropout
with tf.name_scope("dropout"):
self.h_drop = tf.nn.dropout(self.h_pool_flat, self.dropout_keep_prob)
# Final (unnomalized) scores and predictions
with tf.name_scope("output"):
W = tf.get_variable(
"W",
shape = [num_filters_total, num_classes],
initializer = tf.contrib.layers.xavier_initializer())
b = tf.Variable(tf.constant(0.1, shape=[num_classes], name = "b"))
l2_loss += tf.nn.l2_loss(W)
l2_loss += tf.nn.l2_loss(b)
self.scores = tf.nn.xw_plus_b(self.h_drop, W, b, name = "scores")
self.predictions = tf.argmax(self.scores, 1, name = "predictions")
# Calculate Mean cross-entropy loss
with tf.name_scope("loss"):
losses = tf.nn.softmax_cross_entropy_with_logits(logits = self.scores, labels = self.input_y)
self.loss = tf.reduce_mean(losses) + l2_reg_lambda * l2_loss
# Accuracy
with tf.name_scope("accuracy"):
correct_predictions = tf.equal(self.predictions, tf.argmax(self.input_y, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name = "accuracy")
三、文字預處理
本部落格使用jieba工具進行中文分詞,使用詞進行訓練會比使用字進行訓練,效果更好。
這部分:已經在《使用gensim訓練中文語料word2vec》https://blog.csdn.net/guyuealian/article/details/84072158,詳解講解,自己看吧!
1、jieba中文分詞
這個需要自己安裝:pip install jieba 或者pip3 install jieba
2、gensim訓練word2vec模型
這裡使用jieba工具對THUCNews資料集進行分詞,並利用gensim訓練基於THUCNews的word2vec模型,這裡提供已經訓練好的word2vec模型:連結: https://pan.baidu.com/s/1n4ZgiF0gbY0zsK0706wZiw 提取碼: mtrj
2、THUCNews資料處理
有了word2vec模型,我就可以用word2vec詞向量處理THUCNews資料:先使用jieba工具將中文句子轉為字詞,再將字詞根據word2vec模型轉為embadding 的索引,有了索引就可以獲得詞向量embadding 。這裡並把這些索引資料儲存為npy檔案。後續訓練時,CNN網路只需要讀取這些npy檔案,並將索引轉為embadding,就可以進行訓練了。
處理好的THUCNews資料下載地址:連結: https://pan.baidu.com/s/12Hdf36QafQ3y6KgV_vLTsw 提取碼: m9dx
下面的程式碼實現的功能:使用jieba工具將中文句子轉為字詞,再將字詞根據word2vec模型轉為embadding 的索引矩陣,然後把這些索引矩陣儲存下來(*.npy檔案),原始碼中batchSize=20000表示:將20000中文TXT檔案處理成字詞,轉為索引矩陣並儲存為一個*.npy檔案,相當於將20000中文TXT檔案儲存為一個*.npy檔案,主要是為了壓縮資料,避免單個檔案過大的情況。
# -*-coding: utf-8 -*-
"""
@Project: nlp-learning-tutorials
@File : create_word2vec.py
@Author : panjq
@E-mail : [email protected]
@Date : 2018-11-08 17:37:21
"""
from gensim.models import Word2Vec
import random
import numpy as np
import os
import math
from utils import files_processing,segment
def info_npy(file_list):
sizes=0
for file in file_list:
data = np.load(file)
print("data.shape:{}".format(data.shape))
size = data.shape[0]
sizes+=size
print("files nums:{}, data nums:{}".format(len(file_list), sizes))
return sizes
def save_multi_file(files_list,labels_list,word2vec_path,out_dir,prefix,batchSize,max_sentence_length,labels_set=None,shuffle=False):
'''
將檔案內容對映為索引矩陣,並且將資料儲存為多個檔案
:param files_list:
:param labels_list:
:param word2vec_path: word2vec模型的位置
:param out_dir: 檔案儲存的目錄
:param prefix: 儲存檔案的字首名
:param batchSize: 將多個檔案內容儲存為一個檔案
:param labels_set: labels集合
:return:
'''
if not os.path.exists(out_dir):
os.mkdir(out_dir)
# 把該目錄下的所有檔案都刪除
files_processing.delete_dir_file(out_dir)
if shuffle:
random.seed(100)
random.shuffle(files_list)
random.seed(100)
random.shuffle(labels_list)
sample_num = len(files_list)
w2vModel=load_wordVectors(word2vec_path)
if labels_set is None:
labels_set= files_processing.get_labels_set(label_list)
labels_list, labels_set = files_processing.labels_encoding(labels_list, labels_set)
labels_list=labels_list.tolist()
batchNum = int(math.ceil(1.0 * sample_num / batchSize))
for i in range(batchNum):
start = i * batchSize
end = min((i + 1) * batchSize, sample_num)
batch_files = files_list[start:end]
batch_labels = labels_list[start:end]
# 讀取檔案內容,字詞分割
batch_content = files_processing.read_files_list_to_segment(batch_files,
max_sentence_length,
padding_token='<PAD>',
segment_type='word')
# 將字詞轉為索引矩陣
batch_indexMat = word2indexMat(w2vModel, batch_content, max_sentence_length)
batch_labels=np.asarray(batch_labels)
batch_labels = batch_labels.reshape([len(batch_labels), 1])
# 儲存*.npy檔案
filename = os.path.join(out_dir,prefix + '{0}.npy'.format(i))
labels_indexMat = cat_labels_indexMat(batch_labels, batch_indexMat)
np.save(filename, labels_indexMat)
print('step:{}/{}, save:{}, data.shape{}'.format(i,batchNum,filename,labels_indexMat.shape))
def cat_labels_indexMat(labels,indexMat):
indexMat_labels = np.concatenate([labels,indexMat], axis=1)
return indexMat_labels
def split_labels_indexMat(indexMat_labels,label_index=0):
labels = indexMat_labels[:, 0:label_index+1] # 第一列是labels
indexMat = indexMat_labels[:, label_index+1:] # 其餘是indexMat
return labels, indexMat
def load_wordVectors(word2vec_path):
w2vModel = Word2Vec.load(word2vec_path)
return w2vModel
def word2vector_lookup(w2vModel, sentences):
'''
將字詞轉換為詞向量
:param w2vModel: word2vector模型
:param sentences: type->list[list[str]]
:return: sentences對應的詞向量,type->list[list[ndarray[list]]
'''
all_vectors = []
embeddingDim = w2vModel.vector_size
embeddingUnknown = [0 for i in range(embeddingDim)]
for sentence in sentences:
this_vector = []
for word in sentence:
if word in w2vModel.wv.vocab:
v=w2vModel[word]
this_vector.append(v)
else:
this_vector.append(embeddingUnknown)
all_vectors.append(this_vector)
all_vectors=np.array(all_vectors)
return all_vectors
def word2indexMat(w2vModel, sentences, max_sentence_length):
'''
將字詞word轉為索引矩陣
:param w2vModel:
:param sentences:
:param max_sentence_length:
:return:
'''
nums_sample=len(sentences)
indexMat = np.zeros((nums_sample, max_sentence_length), dtype='int32')
rows = 0
for sentence in sentences:
indexCounter = 0
for word in sentence:
try:
index = w2vModel.wv.vocab[word].index # 獲得單詞word的下標
indexMat[rows][indexCounter] = index
except :
indexMat[rows][indexCounter] = 0 # Vector for unkown words
indexCounter = indexCounter + 1
if indexCounter >= max_sentence_length:
break
rows+=1
return indexMat
def indexMat2word(w2vModel, indexMat, max_sentence_length=None):
'''
將索引矩陣轉為字詞word
:param w2vModel:
:param indexMat:
:param max_sentence_length:
:return:
'''
if max_sentence_length is None:
row,col =indexMat.shape
max_sentence_length=col
sentences=[]
for Mat in indexMat:
indexCounter = 0
sentence=[]
for index in Mat:
try:
word = w2vModel.wv.index2word[index] # 獲得單詞word的下標
sentence+=[word]
except :
sentence+=['<PAD>']
indexCounter = indexCounter + 1
if indexCounter >= max_sentence_length:
break
sentences.append(sentence)
return sentences
def save_indexMat(indexMat,path):
np.save(path, indexMat)
def load_indexMat(path):
indexMat = np.load(path)
return indexMat
def indexMat2vector_lookup(w2vModel,indexMat):
'''
將索引矩陣轉為詞向量
:param w2vModel:
:param indexMat:
:return: 詞向量
'''
all_vectors = w2vModel.wv.vectors[indexMat]
return all_vectors
def pos_neg_test():
positive_data_file = "./data/ham_5000.utf8"
negative_data_file = './data/spam_5000.utf8'
word2vec_path = 'out/trained_word2vec.model'
sentences, labels = files_processing.load_pos_neg_files(positive_data_file, negative_data_file)
# embedding_test(positive_data_file,negative_data_file)
sentences, max_document_length = segment.padding_sentences(sentences, '<PADDING>', padding_sentence_length=190)
# train_wordVectors(sentences,embedding_size=128,word2vec_path=word2vec_path) # 訓練word2vec,並儲存word2vec_path
w2vModel=load_wordVectors(word2vec_path) #載入訓練好的word2vec模型
'''
轉換詞向量提供有兩種方法:
[1]直接轉換:根據字詞直接對映到詞向量:word2vector_lookup
[2]間接轉換:先將字詞轉為索引矩陣,再由索引矩陣對映到詞向量:word2indexMat->indexMat2vector_lookup
'''
# [1]根據字詞直接對映到詞向量
x1=word2vector_lookup(w2vModel, sentences)
# [2]先將字詞轉為索引矩陣,再由索引矩陣對映到詞向量
indexMat_path = 'out/indexMat.npy'
indexMat=word2indexMat(w2vModel, sentences, max_sentence_length=190) # 將字詞轉為索引矩陣
save_indexMat(indexMat, indexMat_path)
x2=indexMat2vector_lookup(w2vModel, indexMat) # 索引矩陣對映到詞向量
print("x.shape = {}".format(x2.shape))# shape=(10000, 190, 128)->(樣本個數10000,每個樣本的字詞個數190,每個字詞的向量長度128)
if __name__=='__main__':
# THUCNews_path='/home/ubuntu/project/tfTest/THUCNews/test'
# THUCNews_path='/home/ubuntu/project/tfTest/THUCNews/spam'
THUCNews_path='/home/ubuntu/project/tfTest/THUCNews/THUCNews'
# 讀取所有檔案列表
files_list, label_list = files_processing.gen_files_labels(THUCNews_path)
max_sentence_length=300
word2vec_path="../../word2vec/models/THUCNews_word2Vec/THUCNews_word2Vec_128.model"
# 獲得標籤集合,並儲存在本地
# labels_set=['星座','財經','教育']
# labels_set = files_processing.get_labels_set(label_list)
labels_file='../data/THUCNews_labels.txt'
# files_processing.write_txt(labels_file, labels_set)
# 將資料劃分為train val資料集
train_files, train_label, val_files, val_label= files_processing.split_train_val_list(files_list, label_list, facror=0.9, shuffle=True)
# contents, labels=files_processing.read_files_labels(files_list,label_list)
# word2vec_path = 'out/trained_word2vec.model'
train_out_dir='../data/train_data'
prefix='train_data'
batchSize=20000
labels_set=files_processing.read_txt(labels_file)
# labels_set2 = files_processing.read_txt(labels_file)
save_multi_file(files_list=train_files,
labels_list=train_label,
word2vec_path=word2vec_path,
out_dir=train_out_dir,
prefix=prefix,
batchSize=batchSize,
max_sentence_length=max_sentence_length,
labels_set=labels_set,
shuffle=True)
print("*******************************************************")
val_out_dir='../data/val_data'
prefix='val_data'
save_multi_file(files_list=val_files,
labels_list=val_label,
word2vec_path=word2vec_path,
out_dir=val_out_dir,
prefix=prefix,
batchSize=batchSize,
max_sentence_length=max_sentence_length,
labels_set=labels_set,
shuffle=True)
四、訓練過程
訓練程式碼如下,注意,Github上不能上傳大檔案,所以你需要把上面提供的檔案都下載下來,並放在對應的檔案目錄,就可以訓練了。
訓練中需要讀取訓練資料,即*.npy檔案,*.npy檔案儲存的是索引資料,因此需要轉為CNN的embadding資料:這個過程由函式:indexMat2vector_lookup完成:train_batch_data = create_word2vec.indexMat2vector_lookup(w2vModel, train_batch_data)
#! /usr/bin/env python
# encoding: utf-8
import tensorflow as tf
import numpy as np
import os
from text_cnn import TextCNN
from utils import create_batch_data, create_word2vec, files_processing
def train(train_dir,val_dir,labels_file,word2vec_path,batch_size,max_steps,log_step,val_step,snapshot,out_dir):
'''
訓練...
:param train_dir: 訓練資料目錄
:param val_dir: val資料目錄
:param labels_file: labels檔案目錄
:param word2vec_path: 詞向量模型檔案
:param batch_size: batch size
:param max_steps: 最大迭代次數
:param log_step: log顯示間隔
:param val_step: 測試間隔
:param snapshot: 儲存模型間隔
:param out_dir: 模型ckpt和summaries輸出的目錄
:return:
'''
max_sentence_length = 300
embedding_dim = 128
filter_sizes = [3, 4, 5, 6]
num_filters = 200 # Number of filters per filter size
base_lr=0.001# 學習率
dropout_keep_prob = 0.5
l2_reg_lambda = 0.0 # "L2 regularization lambda (default: 0.0)
allow_soft_placement = True # 如果你指定的裝置不存在,允許TF自動分配裝置
log_device_placement = False # 是否列印裝置分配日誌
print("Loading data...")
w2vModel = create_word2vec.load_wordVectors(word2vec_path)
labels_set = files_processing.read_txt(labels_file)
labels_nums = len(labels_set)
train_file_list = create_batch_data.get_file_list(file_dir=train_dir, postfix='*.npy')
train_batch = create_batch_data.get_data_batch(train_file_list, labels_nums=labels_nums, batch_size=batch_size,
shuffle=False, one_hot=True)
val_file_list = create_batch_data.get_file_list(file_dir=val_dir, postfix='*.npy')
val_batch = create_batch_data.get_data_batch(val_file_list, labels_nums=labels_nums, batch_size=batch_size,
shuffle=False, one_hot=True)
print("train data info *****************************")
train_nums=create_word2vec.info_npy(train_file_list)
print("val data info *****************************")
val_nums = create_word2vec.info_npy(val_file_list)
print("labels_set info *****************************")
files_processing.info_labels_set(labels_set)
# Training
with tf.Graph().as_default():
session_conf = tf.ConfigProto(allow_soft_placement = allow_soft_placement,log_device_placement = log_device_placement)
sess = tf.Session(config = session_conf)
with sess.as_default():
cnn = TextCNN(sequence_length = max_sentence_length,
num_classes = labels_nums,
embedding_size = embedding_dim,
filter_sizes = filter_sizes,
num_filters = num_filters,
l2_reg_lambda = l2_reg_lambda)
# Define Training procedure
global_step = tf.Variable(0, name="global_step", trainable=False)
optimizer = tf.train.AdamOptimizer(learning_rate=base_lr)
# optimizer = tf.train.MomentumOptimizer(learning_rate=0.01, momentum=0.9)
grads_and_vars = optimizer.compute_gradients(cnn.loss)
train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
# Keep track of gradient values and sparsity (optional)
grad_summaries = []
for g, v in grads_and_vars:
if g is not None:
grad_hist_summary = tf.summary.histogram("{}/grad/hist".format(v.name), g)
sparsity_summary = tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g))
grad_summaries.append(grad_hist_summary)
grad_summaries.append(sparsity_summary)
grad_summaries_merged = tf.summary.merge(grad_summaries)
# Output directory for models and summaries
print("Writing to {}\n".format(out_dir))
# Summaries for loss and accuracy
loss_summary = tf.summary.scalar("loss", cnn.loss)
acc_summary = tf.summary.scalar("accuracy", cnn.accuracy)
# Train Summaries
train_summary_op = tf.summary.merge([loss_summary, acc_summary, grad_summaries_merged])
train_summary_dir = os.path.join(out_dir, "summaries", "train")
train_summary_writer = tf.summary.FileWriter(train_summary_dir, sess.graph)
# Dev summaries
dev_summary_op = tf.summary.merge([loss_summary, acc_summary])
dev_summary_dir = os.path.join(out_dir, "summaries", "dev")
dev_summary_writer = tf.summary.FileWriter(dev_summary_dir, sess.graph)
# Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it
checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
checkpoint_prefix = os.path.join(checkpoint_dir, "model")
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=5)
# Initialize all variables
sess.run(tf.global_variables_initializer())
def train_step(x_batch, y_batch):
"""
A single training step
"""
feed_dict = {
cnn.input_x: x_batch,
cnn.input_y: y_batch,
cnn.dropout_keep_prob: dropout_keep_prob
}
_, step, summaries, loss, accuracy = sess.run(
[train_op, global_step, train_summary_op, cnn.loss, cnn.accuracy],
feed_dict)
if step % log_step==0:
print("training: step {}, loss {:g}, acc {:g}".format(step, loss, accuracy))
train_summary_writer.add_summary(summaries, step)
def dev_step(x_batch, y_batch, writer=None):
"""
Evaluates model on a dev set
"""
feed_dict = {
cnn.input_x: x_batch,
cnn.input_y: y_batch,
cnn.dropout_keep_prob: 1.0
}
step, summaries, loss, accuracy = sess.run(
[global_step, dev_summary_op, cnn.loss, cnn.accuracy],
feed_dict)
if writer:
writer.add_summary(summaries, step)
return loss, accuracy
for i in range(max_steps):
train_batch_data, train_batch_label = create_batch_data.get_next_batch(train_batch)
train_batch_data = create_word2vec.indexMat2vector_lookup(w2vModel, train_batch_data)
train_step(train_batch_data, train_batch_label)
current_step = tf.train.global_step(sess, global_step)
if current_step % val_step == 0:
val_losses = []
val_accs = []
# for k in range(int(val_nums/batch_size)):
for k in range(100):
val_batch_data, val_batch_label = create_batch_data.get_next_batch(val_batch)
val_batch_data = create_word2vec.indexMat2vector_lookup(w2vModel, val_batch_data)
val_loss, val_acc=dev_step(val_batch_data, val_batch_label, writer=dev_summary_writer)
val_losses.append(val_loss)
val_accs.append(val_acc)
mean_loss = np.array(val_losses, dtype=np.float32).mean()
mean_acc = np.array(val_accs, dtype=np.float32).mean()
print("--------Evaluation:step {}, loss {:g}, acc {:g}".format(current_step, mean_loss, mean_acc))
if current_step % snapshot == 0:
path = saver.save(sess, checkpoint_prefix, global_step=current_step)
print("Saved model checkpoint to {}\n".format(path))
def main():
# Data preprocess
labels_file = 'data/THUCNews_labels.txt'
word2vec_path = "../word2vec/models/THUCNews_word2Vec/THUCNews_word2Vec_128.model"
max_steps = 100000 # 迭代次數
batch_size = 128
out_dir = "./models" # 模型ckpt和summaries輸出的目錄
train_dir = './data/train_data'
val_dir = './data/val_data'
train(train_dir=train_dir,
val_dir=val_dir,
labels_file=labels_file,
word2vec_path=word2vec_path,
batch_size=batch_size,
max_steps=max_steps,
log_step=50,
val_step=500,
snapshot=1000,
out_dir=out_dir)
if __name__=="__main__":
main()
五、測試過程
這裡提供兩種測試方法:
(1):text_predict(files_list, labels_file, models_path, word2vec_path, batch_size)
該方法,可以直接測試待分類的中文文字
(2):batch_predict(val_dir,labels_file,models_path,word2vec_path,batch_size)
該方法,用於批量測試,val_dir目錄儲存的是測試資料的npy檔案,這些檔案都是上面用word2vec詞向量處理THUCNews資料檔案。
#! /usr/bin/env python
# encoding: utf-8
import tensorflow as tf
import numpy as np
import os
from text_cnn import TextCNN
from utils import create_batch_data, create_word2vec, files_processing
import math
def text_predict(files_list, labels_file, models_path, word2vec_path, batch_size):
'''
預測...
:param val_dir: val資料目錄
:param labels_file: labels檔案目錄
:param models_path: 模型檔案
:param word2vec_path: 詞向量模型檔案
:param batch_size: batch size
:return:
'''
max_sentence_length = 300
embedding_dim = 128
filter_sizes = [3, 4, 5, 6]
num_filters = 200 # Number of filters per filter size
l2_reg_lambda = 0.0 # "L2 regularization lambda (default: 0.0)
print("Loading data...")
w2vModel = create_word2vec.load_wordVectors(word2vec_path)
labels_set = files_processing.read_txt(labels_file)
labels_nums = len(labels_set)
sample_num=len(files_list)
labels_list=[-1]
labels_list=labels_list*sample_num
with tf.Graph().as_default():
sess = tf.Session()
with sess.as_default():
cnn = TextCNN(sequence_length = max_sentence_length,
num_classes = labels_nums,
embedding_size = embedding_dim,
filter_sizes = filter_sizes,
num_filters = num_filters,
l2_reg_lambda = l2_reg_lambda)
# Initialize all variables
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, models_path)
def pred_step(x_batch):
"""
predictions model on a dev set
"""
feed_dict = {
cnn.input_x: x_batch,
cnn.dropout_keep_prob: 1.0
}
pred = sess.run([cnn.predictions],feed_dict)
return pred
batchNum = int(math.ceil(1.0 * sample_num / batch_size))
for i in range(batchNum):
start = i * batch_size
end = min((i + 1) * batch_size, sample_num)
batch_files = files_list[start:end]
# 讀取檔案內容,字詞分割
batch_content= files_processing.read_files_list_to_segment(batch_files,
max_sentence_length,
padding_token='<PAD>')
# [1]將字詞轉為索引矩陣,再對映為詞向量
batch_indexMat = create_word2vec.word2indexMat(w2vModel, batch_content, max_sentence_length)
val_batch_data = create_word2vec.indexMat2vector_lookup(w2vModel, batch_indexMat)
# [2]直接將字詞對映為詞向量
# val_batch_data = create_word2vec.word2vector_lookup(w2vModel,batch_content)
pred=pred_step(val_batch_data)
pred=pred[0].tolist()
pred=files_processing.labels_decoding(pred,labels_set)
for k,file in enumerate(batch_files):
print("{}, pred:{}".format(file,pred[k]))
def batch_predict(val_dir,labels_file,models_path,word2vec_path,batch_size):
'''
預測...
:param val_dir: val資料目錄
:param labels_file: labels檔案目錄
:param models_path: 模型檔案
:param word2vec_path: 詞向量模型檔案
:param batch_size: batch size
:return:
'''
max_sentence_length = 300
embedding_dim = 128
filter_sizes = [3, 4, 5, 6]
num_filters = 200 # Number of filters per filter size
l2_reg_lambda = 0.0 # "L2 regularization lambda (default: 0.0)
print("Loading data...")
w2vModel = create_word2vec.load_wordVectors(word2vec_path)
labels_set = files_processing.read_txt(labels_file)
labels_nums = len(labels_set)
val_file_list = create_batch_data.get_file_list(file_dir=val_dir, postfix='*.npy')
val_batch = create_batch_data.get_data_batch(val_file_list, labels_nums=labels_nums, batch_size=batch_size,
shuffle=False, one_hot=True)
print("val data info *****************************")
val_nums = create_word2vec.info_npy(val_file_list)
print("labels_set info *****************************")
files_processing.info_labels_set(labels_set)
# Training
with tf.Graph().as_default():
sess = tf.Session()
with sess.as_default():
cnn = TextCNN(sequence_length = max_sentence_length,
num_classes = labels_nums,
embedding_size = embedding_dim,
filter_sizes = filter_sizes,
num_filters = num_filters,
l2_reg_lambda = l2_reg_lambda)
# Initialize all variables
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess, models_path)
def dev_step(x_batch, y_batch):
"""
Evaluates model on a dev set
"""
feed_dict = {
cnn.input_x: x_batch,
cnn.input_y: y_batch,
cnn.dropout_keep_prob: 1.0
}
loss, accuracy = sess.run(
[cnn.loss, cnn.accuracy],
feed_dict)
return loss, accuracy
val_losses = []
val_accs = []
for k in range(int(val_nums/batch_size)):
# for k in range(int(10)):
val_batch_data, val_batch_label = create_batch_data.get_next_batch(val_batch)
val_batch_data = create_word2vec.indexMat2vector_lookup(w2vModel, val_batch_data)
val_loss, val_acc=dev_step(val_batch_data, val_batch_label)
val_losses.append(val_loss)
val_accs.append(val_acc)
print("--------Evaluation:step {}, loss {:g}, acc {:g}".format(k, val_loss, val_acc))
mean_loss = np.array(val_losses, dtype=np.float32).mean()
mean_acc = np.array(val_accs, dtype=np.float32).mean()
print("--------Evaluation:step {}, mean loss {:g}, mean acc {:g}".format(k, mean_loss, mean_acc))
def main():
# Data preprocess
labels_file = 'data/THUCNews_labels.txt'
# word2vec_path = 'word2vec/THUCNews_word2vec300.model'
word2vec_path = "../word2vec/models/THUCNews_word2Vec/THUCNews_word2Vec_128.model"
models_path='models/checkpoints/model-30000'
batch_size = 128
val_dir = './data/val_data'
batch_predict(val_dir=val_dir,
labels_file=labels_file,
models_path=models_path,
word2vec_path=word2vec_path,
batch_size=batch_size)
test_path='/home/ubuntu/project/tfTest/THUCNews/my_test'
files_list = files_processing.get_files_list(test_path,postfix='*.txt')
text_predict(files_list, labels_file, models_path, word2vec_path, batch_size)
if __name__=="__main__":
main()