(原論文方法復現+word2vec嘗試)--基於CNN訓練50000條新聞文字並實現分類
1、本文建立在https://blog.csdn.net/m0_38088359/article/details/83004972這篇文章的基礎上,並對《Implementing a CNN for Text Classification in TensorFlow》原論文的方法進行復現,去掉了embedding層,用Word2Vec來代替詞向量。
原論文中是用了六個卷積核做特徵提取,分別為兩個2embed_size,兩個3embed_size,兩個4*embed_size的卷積核。在這裡完整地復現了論文,並稍做了引數的調整。
2、完整訓練過程如下(沒有做類的封裝):
(1)讀取資料,做 字元 —> id 的vector轉化:
with open('./cnews/cnews.vocab.txt', encoding='utf8') as file:
vocabulary_list = [k.strip() for k in file.readlines()]
with open('./cnews/cnews.train.txt', encoding='utf8') as file:
line_list = [k.strip() for k in file.readlines()]
train_label_list = [k.split(maxsplit=1)[0] for k in line_list]
train_content_list = [k.split(maxsplit=1)[1] for k in line_list]
with open('./cnews/cnews.test.txt', encoding='utf8') as file:
line_list = [k.strip() for k in file.readlines()]
test_label_list = [k.split(maxsplit=1)[0] for k in line_list]
test_content_list = [k.split(maxsplit=1)[1] for k in line_list]
word2id_dict = dict(((b, a) for a, b in enumerate(vocabulary_list)))
def content2vector(content_list):
content_vector_list = []
for content in content_list:
content_vector = []
for word in content:
if word in word2id_dict:
content_vector.append(word2id_dict[word])
else:
content_vector.append(word2id_dict['<PAD>'])
content_vector_list.append(content_vector)
return content_vector_list
train_vector_list = content2vector(train_content_list)
test_vector_list = content2vector(test_content_list)
(2)對句子的id向量進行統一補零和截斷處理。
import tensorflow.contrib.keras as kr
train_X = kr.preprocessing.sequence.pad_sequences(train_vector_list,600)
test_X = kr.preprocessing.sequence.pad_sequences(test_vector_list,600)
(3)引數初始化:
vocab_size = 5000
kernel_sizes = [2,2,3,3,4,4]
dropout_keep_prob = 0.5
num_kernels = 128
batch_size = 64
seq_length = 600
embed_size = 128
hidden_dim = 256
num_classes = 10
learning_rate = 1e-3
embedding_dim = 128 # 詞向量維度
(4)設定佔位符以及embedding層:
import tensorflow as tf
X_holder = tf.placeholder(tf.int32,[None,seq_length])
Y_holder = tf.placeholder(tf.float32,[None,num_classes])
embedding = tf.get_variable('embedding', [vocab_size, embedding_dim,1])
embedding_inputs = tf.nn.embedding_lookup(embedding, X_holder)
(5)中間的卷基層搭建:
def conv_pool_concate(X_holder,kernel_sizes):
output = []
for kernel_size in kernel_sizes:
conv2 = tf.layers.conv2d(inputs=X_holder,
filters=num_kernels,
kernel_size=[kernel_size,embed_size],
use_bias=True,
padding='VALID',
bias_initializer=tf.zeros_initializer(),
activity_regularizer=tf.nn.relu)
pool = tf.nn.max_pool(value=conv2,
ksize=[1,seq_length-kernel_size+1,1,1],
strides=[1,1,1,1],
padding='VALID')
output.append(pool)
concat_ = tf.concat(output,3)
return concat_
(6)卷基-池化層之後的特徵拼接,先將數值為1的維度全部去掉,然後將通道concat:
concat = conv_pool_concate(embedding_inputs,kernel_sizes)
squeeze = tf.squeeze(concat,axis=[1,2])
squeeze_dim = squeeze.get_shape().as_list()
(7)卷基層之後的模型搭建,與前述文章相近,注意維度即可:
full_connect1 = tf.layers.dense(squeeze,hidden_dim)
full_connect1_dropout = tf.contrib.layers.dropout(full_connect1,keep_prob=0.5)
full_connect1_dropout_activate = tf.nn.relu(full_connect1_dropout)
full_connect2 = tf.layers.dense(full_connect1_dropout_activate,num_classes)
predict_y = tf.nn.softmax(full_connect2)
cross_entry = tf.nn.softmax_cross_entropy_with_logits_v2(labels=Y_holder,logits=full_connect2)
loss = tf.reduce_mean(cross_entry)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train = optimizer.minimize(loss)
correct = tf.equal(tf.argmax(Y_holder,1),tf.argmax(predict_y,1))
accuracy = tf.reduce_mean(tf.cast(correct,tf.float32))
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
(8)構建訓練標籤值,先labelencode,再進行one-hot。
from sklearn.preprocessing import LabelEncoder
from tensorflow.contrib import keras as kr
label = LabelEncoder()
train_Y = kr.utils.to_categorical(label.fit_transform(train_label_list),num_classes=num_classes)
test_Y = kr.utils.to_categorical(label.fit_transform(test_label_list),num_classes=num_classes)
(9)模型訓練:
import random
for i in range(5000):
train_index = random.sample(list(range(len(train_Y))),k=batch_size)
X = train_X[train_index]
Y = train_Y[train_index]
sess.run(train,feed_dict={X_holder:X,Y_holder:Y})
step = i + 1
if step % 100 == 0:
test_index = random.sample(list(range(len(test_Y))), k=batch_size)
x = test_X[test_index]
y = test_Y[test_index]
loss_value, accuracy_value = sess.run([loss, accuracy], {X_holder:x, Y_holder:y})
print('step:%d loss:%.4f accuracy:%.4f' %(step, loss_value, accuracy_value))
訓練過程(某一部分)結果輸出如下:
step:1600 loss:0.3221 accuracy:0.9219
step:1700 loss:0.0952 accuracy:0.9531
step:1800 loss:0.0241 accuracy:1.0000
step:1900 loss:0.0666 accuracy:0.9688
step:2000 loss:0.2998 accuracy:0.9062
step:2100 loss:0.3763 accuracy:0.9062
step:2200 loss:0.0264 accuracy:1.0000
step:2300 loss:0.1595 accuracy:0.9531
step:2400 loss:0.0955 accuracy:0.9688
step:2500 loss:0.2045 accuracy:0.9531
step:2600 loss:0.0479 accuracy:0.9688
step:2700 loss:0.0360 accuracy:0.9844
(10)混淆矩陣以及評價分析:
import pandas as pd
from sklearn.metrics import confusion_matrix
import numpy as np
predict_value = []
for i in range(0,len(test_X),200):
x_test = test_X[i:i+200]
predict = np.array(sess.run(predict_y,feed_dict={X_holder:x_test}))
predict_ = np.argmax(predict,1)
predict_value.extend(predict_)
predict_label = label.inverse_transform(predict_value)
df = pd.DataFrame(confusion_matrix(test_label_list,predict_label),columns=label.classes_,index=label.classes_)
print(df)
import numpy as np
from sklearn.metrics import precision_recall_fscore_support
def eval_model(y_true, y_pred, labels):
# 計算每個分類的Precision, Recall, f1, support
p, r, f1, s = precision_recall_fscore_support(y_true, y_pred)
# 計算總體的平均Precision, Recall, f1, support
tot_p = np.average(p, weights=s)
tot_r = np.average(r, weights=s)
tot_f1 = np.average(f1, weights=s)
tot_s = np.sum(s)
res1 = pd.DataFrame({
u'Label': labels,
u'Precision': p,
u'Recall': r,
u'F1': f1,
u'Support': s
})
res2 = pd.DataFrame({
u'Label': ['總體'],
u'Precision': [tot_p],
u'Recall': [tot_r],
u'F1': [tot_f1],
u'Support': [tot_s]
})
res2.index = [999]
res = pd.concat([res1, res2])
return res[['Label', 'Precision', 'Recall', 'F1', 'Support']]
eval_model(test_label_list, predict_label, label.classes_)
結果如下:
體育 娛樂 家居 房產 教育 時尚 時政 遊戲 科技 財經
體育 990 2 1 0 2 0 0 4 0 1
娛樂 1 973 5 0 4 3 1 8 4 1
家居 1 6 861 1 15 46 26 5 19 20
房產 0 2 0 996 0 0 0 0 0 2
教育 0 34 7 0 821 19 20 60 28 11
時尚 1 8 2 0 3 978 1 3 3 1
時政 0 7 4 1 26 2 931 3 10 16
遊戲 0 10 1 0 0 10 1 974 1 3
科技 0 2 5 0 0 1 4 11 975 2
財經 0 1 0 0 4 0 5 0 4 986
Label Precision Recall F1 Support
0 體育 0.996979 0.9900 0.993477 1000
1 娛樂 0.931100 0.9730 0.951589 1000
2 家居 0.971783 0.8610 0.913043 1000
3 房產 0.997996 0.9960 0.996997 1000
4 教育 0.938286 0.8210 0.875733 1000
5 時尚 0.923513 0.9780 0.949976 1000
6 時政 0.941355 0.9310 0.936149 1000
7 遊戲 0.911985 0.9740 0.941973 1000
8 科技 0.933908 0.9750 0.954012 1000
9 財經 0.945350 0.9860 0.965247 1000
999 總體 0.949226 0.9485 0.947820 10000
從以上結果來看模型訓練效果不錯,其中準確率和召回率都達到了94%+,以及F1值也達到了94%+,可以投入實際使用,也許中間調下結果可以獲得更高的比率:)
3、好了,以下是失敗的嘗試,本作者把embedding層用word2vec來代替,也就是一開始就給模型餵了詞向量,來看看結果如何。結果慘不忍睹:( 發現模型不管怎麼訓練在最後的準確率都不會超過30%。
在這裡詞向量的構建主要還是依照同樣的維度去構建,也即128,但是提前先用word2vec訓練好,並且每個句子的長度也按照以上補零或截斷操作,最後統一為600維度。其中訓練詞向量的預料為整個訓練集與測試集的大小,以保證詞的完整性。
好了,由於一下程式碼有大量重複,這裡只放關鍵的函式部分,以及訓練部分:
import numpy as np
def contentToList(contents):
content_list = []
for cont in contents:
cont_i = []
for word in cont:
if word not in vocabulary_list:
cont_i.append('<PAD>')
else:
cont_i.append(word)
if len(cont_i)>=600:
cont_i = cont_i[-600:]
else:
while len(cont_i)<600:
cont_i.insert(0,'<PAD>')
content_list.append(cont_i)
return np.array(content_list)
from gensim.models.word2vec import Word2Vec
model = Word2Vec(corpus,size=128,window=5,min_count=5,workers=4)
output_dir = u'output_word2vec'
import os
if not os.path.exists(output_dir):
os.mkdir(output_dir)
model_file = os.path.join(output_dir, 'model.w2v')
model.save(model_file)
import numpy as np
def transform_to_matrix(x, padding_size=600, vec_size=128):
res = []
for sen in x:
mat = []
for i in range(padding_size):
if sen[i]=='<PAD>':
mat.append([0] * vec_size)
else:
mat.append(model[sen[i]].tolist())
matrix = np.array(mat)
matrix = matrix.reshape(matrix.shape[0],matrix.shape[1],1)
yield matrix
# res.append(mat)
# matrix = np.array(res)
# matrix = matrix.reshape(matrix.shape[0],matrix.shape[1],matrix.shape[2],1)
# return matrix
X_holder = tf.placeholder(tf.float32,[None,seq_length,embed_size,1])
Y_holder = tf.placeholder(tf.float32,[None,num_classes])
以上由於詞向量一次性轉換的話會消耗大量記憶體,所以才要將將詞向量轉化的函式用generator生成器來代替,這樣子每次值yield一個值,不會造成記憶體空間爆炸。(作者曾因為這個事電腦藍屏了3次:)
好了其他都一樣,接下來訓練:
train_list = contentToList(train_content_list)
test_list = contentToList(test_content_list)
import random
#每個batch單獨進行詞向量拼接,因為一次性拼接的計算量太大,記憶體不足,所以分開拼接,只不過時間比較長
for i in range(5000):
train_index = random.sample(list(range(len(train_Y))),k=batch_size)
x_generator = transform_to_matrix(train_list[train_index])
y_train = train_Y[train_index]
# for z,x in enumerate(x_generator):
# Ys = y_train[z]
# Y = Ys.reshape(1,Ys.shape[0])
# X = x
Y = y_train
X = np.array(list(x_generator))
sess.run(train,feed_dict={X_holder:X,Y_holder:Y})
step = i + 1
if step % 100 == 0:
test_index = random.sample(list(range(len(test_Y))), k=batch_size)
x_generator_test = transform_to_matrix(test_list[test_index])
y_test = test_Y[test_index]
# for u,v in enumerate(x_generator_test):
# ys = y_test[u]
# y = ys.reshape(1,ys.shape[0])
# x = v
y = y_test
x = np.array(list(x_generator_test))
loss_value, accuracy_value = sess.run([loss, accuracy], {X_holder:x, Y_holder:y})
print('step:%d loss:%.4f accuracy:%.4f' %(step, loss_value, accuracy_value))
結果如下(ps:看到這結果讓我不得不提前結束了訓練。。):
step:100 loss:2.3006 accuracy:0.0938
step:200 loss:2.3043 accuracy:0.0312
step:300 loss:2.3036 accuracy:0.1250
step:400 loss:2.3050 accuracy:0.0938
step:500 loss:2.3048 accuracy:0.1250
step:600 loss:2.3015 accuracy:0.1250
step:700 loss:2.2998 accuracy:0.1250
step:800 loss:2.3049 accuracy