使用 rnn 訓練詞向量模型
阿新 • • 發佈:2018-12-18
詞向量說明如下:
詞向量模型表徵的是詞語與詞語之間的距離和聯絡,詞向量也叫詞嵌入 word embedding CBOW 模型: 根據某個詞的上下文,計算中間詞出現的概率,預測的是中心詞 Skip-Gram 模型: 跟據中間詞,分別計算它的上下文概率,與 CBOW 模型相反,預測的是上下文 比如 "我喜歡你" 在Skip-Gram 中,取出其中的一個字當做輸入,將其前面和後面的子分別當做標籤,拆分如下: "喜 我" "喜 歡" "歡 喜" "歡 你" 每一行代表一個樣本,第一列代表輸入,第二列代表標籤。即中間詞取它的上下文為標籤 而 CBOW 模型恰恰相反,上下文取它的中間詞為標籤 "我 喜" "歡 喜" "喜 歡" "你 歡" tf.nn.nce_loss 計算NCE 的損失值,主要說明如下 def nce_loss(weights, biases, inputs, labels, num_sampled, num_classes, num_true=1, sampled_values=None, remove_accidental_hits=False, partition_strategy="mod", name="nce_loss") ''' 假設nce_loss之前的輸入資料是 K 維(也就是詞向量的維度) 的,一共有 N 個類(也就是N個詞),那麼 weight.shape = (N, K) bias.shape = (N) inputs.shape = (batch_size, K) labels.shape = (batch_size, num_true) num_true 就是對應的樣本標籤,也就是詞id num_true : 實際的正樣本個數 num_sampled: 取樣出多少個負樣本 num_classes = N sampled_values: 取樣出的負樣本,如果是None,就會用不同的sampler去取樣。 remove_accidental_hits: 如果取樣時不小心取樣到的負樣本剛好是正樣本,要不要幹掉 ''' nce_loss的實現邏輯如下: _compute_sampled_logits: 通過這個函式計算出正樣本和取樣出的負樣本對應的output和label sigmoid_cross_entropy_with_logits: 通過 sigmoid cross entropy來計算output和label的loss,從而進行反向傳播。 這個函式把最後的問題轉化為了num_sampled+num_real個兩類分類問題,然後每個分類問題用了交叉熵的損傷函式,也就是logistic regression常用的損失函式。 TF裡還提供了一個softmax_cross_entropy_with_logits的函式,和這個有所區別 預設情況下,他會用log_uniform_candidate_sampler去取樣。那麼log_uniform_candidate_sampler是怎麼取樣的呢?他的實現在這裡: 1、會在[0, range_max)中取樣出一個整數k 2、P(k) = (log(k + 2) - log(k + 1)) / log(range_max + 1) 可以看到,k越大,被取樣到的概率越小。 TF的word2vec實現裡,詞頻越大,詞的類別編號也就越小。因此,在TF的word2vec裡,負取樣的過程其實就是優先採詞頻高的詞作為負樣本。
batch 資料生成檔案 datas.py 如下
# -*- coding:utf-8 -*- import numpy as np import tensorflow as tf import random import collections from collections import Counter import jieba from sklearn.manifold import TSNE import matplotlib as mpl import matplotlib.pyplot as plt mpl.rcParams['font.sans-serif'] = ['SimHei'] mpl.rcParams['font.family'] = 'STSong' mpl.rcParams['font.size'] = 20 training_file = "人體陰陽與電能.txt" # 中文字 def get_ch_label(text_file): labels = "" with open(text_file,"rb") as f: for label in f :labels += label.decode("gb2312") return labels # 分詞 def fenci(training_data): seg_list = jieba.cut(training_data) training_ci = " ".join(seg_list) training_ci = training_ci.split() # 用空格將字串分開 training_ci = np.array(training_ci) training_ci = np.reshape(training_ci,[-1,]) return training_ci def build_dataset(words,n_words): count = [['UNK',-1]] # Counter 是計數器,統計詞頻,這裡也就是統計前 n_words - 1 個最高的頻率詞 count.extend(collections.Counter(words).most_common(n_words - 1)) dictionary = dict() # 建立詞典id for word,_ in count: dictionary[word] = len(dictionary) data = list() unk_count = 0 for word in words: if word in dictionary: index = dictionary[word] else: index = 0 unk_count += 1 data.append(index) count[0][1] = unk_count reversed_dictionary = dict(zip(dictionary.values(),dictionary.keys())) return data,count,dictionary,reversed_dictionary data_index = 0 def generate_batch(data,batch_size,num_skips,skip_window): global data_index assert batch_size % num_skips == 0 assert num_skips <= 2 * skip_window batch = np.ndarray(shape = (batch_size),dtype = np.int32) labels = np.ndarray(shape = (batch_size,1),dtype = np.int32) # 每一個樣本由 skip_window + 當前 target + 後 skip_window 組成 span = 2 * skip_window + 1 buffer = collections.deque(maxlen = span) if data_index + span > len(data): data_index = 0 buffer.extend(data[data_index:data_index + span]) data_index += span for i in range(batch_size // num_skips ): target = skip_window #target 在 buffer 中的索引為 skip_window targets_to_avoid = [skip_window] for j in range(num_skips): while target in targets_to_avoid: target = random.randint(0,span - 1) targets_to_avoid.append(target) batch[i*num_skips + j] = buffer[skip_window] labels[i*num_skips + j,0] = buffer[target] if data_index == len(data): buffer = data[:span] data_index = span else: buffer.append(data[data_index]) data_index += 1 data_index = (data_index + len(data) - span) % len(data) return batch,labels def get_batch(batch_size,num_skips = 2,skip_window = 1): # print (collections.Counter(['a','a','b','b','b','c']).most_common(1)) training_data = get_ch_label(training_file) print "總字數",len(training_data) # 分詞後的一維詞表 training_ci = fenci(training_data) training_label,count,dictionary,words = build_dataset(training_ci,350) words_size = len(dictionary) print "字典詞數",words_size # print('Sample data',training_label[:10],[words[i] for i in training_label[:10]]) # 獲取batch,labels batch,labels = generate_batch(training_label,batch_size = batch_size,num_skips = num_skips,skip_window = skip_window) return batch,labels,words,words_size
詞向量訓練和視覺化如下:
# -*- coding:utf-8 -*- from __future__ import unicode_literals import sys reload(sys) sys.setdefaultencoding("utf-8") from datas import get_batch,np,tf,plt,TSNE batch_inputs,batch_labels,words,words_size = get_batch(batch_size = 200) batch_size = 128 embedding_size = 128 skip_window = 1 num_skips =2 valid_size = 16 valid_window = words_size / 2 valid_examples = np.random.choice(valid_window,valid_size,replace = False) # 0-valid_window 中的資料取 16 個,不能重複 num_sampled = 64 # 負取樣個數 tf.reset_default_graph() train_inputs = tf.placeholder(tf.int32,shape = [None]) train_labels = tf.placeholder(tf.int32,shape = [None,1]) valid_dataset = tf.constant(valid_examples,dtype = tf.int32) with tf.device('/cpu:0'): embeddings = tf.Variable(tf.random_uniform([words_size,embedding_size],-1.0,1.0)) embed = tf.nn.embedding_lookup(embeddings,train_inputs) # 計算 NCE 的loss 值 nce_weights = tf.Variable(tf.truncated_normal([words_size,embedding_size],stddev = 1.0 /tf.sqrt(np.float32(embedding_size)))) nce_biases = tf.Variable(tf.zeros([words_size])) loss = tf.reduce_mean(tf.nn.nce_loss(weights = nce_weights,biases = nce_biases, labels = train_labels,inputs = embed,num_sampled = num_sampled,num_classes = words_size)) optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss) # 計算 minibach examples 和所有 embeddings 的 cosine 相似度 norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings),axis = 1,keep_dims = True)) # 按行單位化 normalized_embeddings = embeddings / norm # 單位化 embeddings validate_embeddings = tf.nn.embedding_lookup(normalized_embeddings,valid_dataset) # 就算出的餘弦相似度,是一個矩陣,每一行代表某個 valid example 與每個詞典的相似度 similarity = tf.matmul(validate_embeddings,normalized_embeddings,transpose_b = True) # 餘弦相似度矩陣 if __name__ == "__main__": num_steps = 100001 with tf.Session(graph = tf.get_default_graph()) as sess: tf.initialize_all_variables().run() print ('Initialized') average_loss = 0 for step in range(num_steps): feed_dict = {train_inputs:batch_inputs,train_labels:batch_labels} _,loss_val = sess.run([optimizer,loss],feed_dict = feed_dict) average_loss += loss_val emv = sess.run(embed,feed_dict = {train_inputs:[37,18]}) #print "emv----------------------------------",emv[0] if step % 1000 ==0: average_loss /= 1000 print 'Average loss at step ',step,':',average_loss average_loss = 0 sim = similarity.eval(session = sess) for i in range(valid_size): valid_word = words[valid_examples[i]] top_k = 8 nearest = (-sim[i,:]).argsort()[1:top_k + 1] # argsort 返回的是陣列值從小到大的索引值 log_str = 'Nearest to %s:' % valid_word for k in range(top_k): close_word = words[nearest[k]] log_str = '%s,%s' %(log_str,close_word) print log_str final_embeddings = sess.run(normalized_embeddings) # 將詞向量視覺化 def plot_with_labels(low_dim_embs,labels,filename = 'tsne.png'): assert low_dim_embs.shape[0] >= len(labels),'More labels than embeddings' plt.figure(figsize = (18,18)) for i,label in enumerate(labels): x,y = low_dim_embs[i,:] plt.scatter(x,y) plt.annotate(label.decode("utf-8"),xy = (x,y),xytext = (5,2),textcoords ='offset points',ha = 'right',va = 'bottom') plt.savefig(filename) try: tsne = TSNE(perplexity = 30,n_components = 2,init = 'pca',n_iter = 5000) plot_only = 80 # 輸出 100 個詞 low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only,:]) labels = [unicode(words[i]) for i in range(plot_only)] plot_with_labels(low_dim_embs,labels) except: print "Save png Error"
執行結果如下:因為只有一篇 1700 字左右,所以效果不是很理想