1. 程式人生 > >使用 rnn 訓練詞向量模型

使用 rnn 訓練詞向量模型

詞向量說明如下:

詞向量模型表徵的是詞語與詞語之間的距離和聯絡,詞向量也叫詞嵌入 word embedding
CBOW 模型: 根據某個詞的上下文,計算中間詞出現的概率,預測的是中心詞
Skip-Gram 模型: 跟據中間詞,分別計算它的上下文概率,與 CBOW 模型相反,預測的是上下文

比如 "我喜歡你" 在Skip-Gram 中,取出其中的一個字當做輸入,將其前面和後面的子分別當做標籤,拆分如下:
"喜 我"
"喜 歡"
"歡 喜"
"歡 你"
每一行代表一個樣本,第一列代表輸入,第二列代表標籤。即中間詞取它的上下文為標籤

而 CBOW 模型恰恰相反,上下文取它的中間詞為標籤
"我 喜"
"歡 喜"
"喜 歡"
"你 歡"

tf.nn.nce_loss 計算NCE 的損失值,主要說明如下
def nce_loss(weights, biases, inputs, labels, num_sampled, num_classes,
             num_true=1,
             sampled_values=None,
             remove_accidental_hits=False,
             partition_strategy="mod",
             name="nce_loss")
'''
假設nce_loss之前的輸入資料是 K 維(也就是詞向量的維度) 的,一共有 N 個類(也就是N個詞),那麼
weight.shape = (N, K)
bias.shape = (N)
inputs.shape = (batch_size, K)
labels.shape = (batch_size, num_true) num_true 就是對應的樣本標籤,也就是詞id
num_true : 實際的正樣本個數
num_sampled: 取樣出多少個負樣本
num_classes = N
sampled_values: 取樣出的負樣本,如果是None,就會用不同的sampler去取樣。
remove_accidental_hits: 如果取樣時不小心取樣到的負樣本剛好是正樣本,要不要幹掉
'''

nce_loss的實現邏輯如下:
_compute_sampled_logits: 通過這個函式計算出正樣本和取樣出的負樣本對應的output和label
sigmoid_cross_entropy_with_logits: 通過 sigmoid cross entropy來計算output和label的loss,從而進行反向傳播。
這個函式把最後的問題轉化為了num_sampled+num_real個兩類分類問題,然後每個分類問題用了交叉熵的損傷函式,也就是logistic regression常用的損失函式。
TF裡還提供了一個softmax_cross_entropy_with_logits的函式,和這個有所區別

預設情況下,他會用log_uniform_candidate_sampler去取樣。那麼log_uniform_candidate_sampler是怎麼取樣的呢?他的實現在這裡:
1、會在[0, range_max)中取樣出一個整數k
2、P(k) = (log(k + 2) - log(k + 1)) / log(range_max + 1)
可以看到,k越大,被取樣到的概率越小。
TF的word2vec實現裡,詞頻越大,詞的類別編號也就越小。因此,在TF的word2vec裡,負取樣的過程其實就是優先採詞頻高的詞作為負樣本。

batch 資料生成檔案 datas.py 如下

# -*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf
import random
import collections
from collections import Counter
import jieba

from sklearn.manifold import TSNE
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['font.family'] = 'STSong'
mpl.rcParams['font.size'] = 20

training_file = "人體陰陽與電能.txt"

# 中文字
def get_ch_label(text_file):
    labels = ""
    with open(text_file,"rb") as f:
        for label in f :labels += label.decode("gb2312")
    return labels
    
# 分詞
def fenci(training_data):
    seg_list = jieba.cut(training_data)
    training_ci = " ".join(seg_list)
    training_ci = training_ci.split()
    # 用空格將字串分開
    training_ci = np.array(training_ci)
    training_ci = np.reshape(training_ci,[-1,])
    return training_ci
    
def build_dataset(words,n_words):
    count = [['UNK',-1]]
    # Counter 是計數器,統計詞頻,這裡也就是統計前 n_words - 1 個最高的頻率詞
    count.extend(collections.Counter(words).most_common(n_words - 1))
    dictionary = dict()
    # 建立詞典id
    for word,_ in count:
        dictionary[word] = len(dictionary)
    data = list()
    unk_count = 0
    for word in words:
        if word in dictionary:
            index = dictionary[word]
        else:
            index = 0
            unk_count += 1
        data.append(index)
    count[0][1] = unk_count
    reversed_dictionary = dict(zip(dictionary.values(),dictionary.keys()))
    return data,count,dictionary,reversed_dictionary

data_index = 0
def generate_batch(data,batch_size,num_skips,skip_window):
    global data_index
    assert batch_size % num_skips == 0
    assert num_skips <= 2 * skip_window
    
    batch = np.ndarray(shape = (batch_size),dtype = np.int32)
    labels = np.ndarray(shape = (batch_size,1),dtype = np.int32)
    # 每一個樣本由 skip_window + 當前 target + 後 skip_window 組成
    span = 2 * skip_window + 1
    buffer = collections.deque(maxlen = span)
    if data_index + span > len(data):
        data_index = 0
    
    buffer.extend(data[data_index:data_index + span])
    data_index +=  span
    
    for i in range(batch_size // num_skips ):
        target = skip_window #target 在 buffer 中的索引為 skip_window
        targets_to_avoid = [skip_window]
        for j in range(num_skips):
            while target in targets_to_avoid:
                target = random.randint(0,span - 1)
            targets_to_avoid.append(target)
            batch[i*num_skips + j] = buffer[skip_window]
            labels[i*num_skips + j,0] = buffer[target]
            
        if data_index == len(data):
            buffer = data[:span]
            data_index = span
        else:
            buffer.append(data[data_index])
            data_index += 1
    data_index = (data_index + len(data) - span) % len(data)
    return batch,labels
        
def get_batch(batch_size,num_skips = 2,skip_window = 1):
    # print (collections.Counter(['a','a','b','b','b','c']).most_common(1))
    training_data = get_ch_label(training_file)
    print "總字數",len(training_data)
    # 分詞後的一維詞表
    training_ci = fenci(training_data)
    training_label,count,dictionary,words = build_dataset(training_ci,350)
    words_size = len(dictionary)
    print "字典詞數",words_size
    # print('Sample data',training_label[:10],[words[i] for i in training_label[:10]])
    
    # 獲取batch,labels
    batch,labels = generate_batch(training_label,batch_size = batch_size,num_skips = num_skips,skip_window = skip_window)
    return batch,labels,words,words_size

詞向量訓練和視覺化如下:

# -*- coding:utf-8 -*-
from __future__ import unicode_literals
import sys
reload(sys)
sys.setdefaultencoding("utf-8")

from datas import get_batch,np,tf,plt,TSNE
batch_inputs,batch_labels,words,words_size =  get_batch(batch_size = 200)

batch_size = 128
embedding_size = 128
skip_window = 1
num_skips =2

valid_size = 16
valid_window = words_size / 2
valid_examples = np.random.choice(valid_window,valid_size,replace = False) # 0-valid_window 中的資料取 16 個,不能重複
num_sampled = 64 # 負取樣個數

tf.reset_default_graph()
train_inputs = tf.placeholder(tf.int32,shape = [None])
train_labels = tf.placeholder(tf.int32,shape = [None,1])
valid_dataset = tf.constant(valid_examples,dtype = tf.int32)

with tf.device('/cpu:0'):
    embeddings = tf.Variable(tf.random_uniform([words_size,embedding_size],-1.0,1.0))
    embed = tf.nn.embedding_lookup(embeddings,train_inputs)
    # 計算 NCE 的loss 值
    nce_weights = tf.Variable(tf.truncated_normal([words_size,embedding_size],stddev = 1.0 /tf.sqrt(np.float32(embedding_size))))
    nce_biases = tf.Variable(tf.zeros([words_size]))

    loss = tf.reduce_mean(tf.nn.nce_loss(weights = nce_weights,biases = nce_biases,
                            labels = train_labels,inputs = embed,num_sampled = num_sampled,num_classes = words_size))
    optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)

    # 計算 minibach examples 和所有 embeddings 的 cosine 相似度
    norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings),axis = 1,keep_dims = True)) # 按行單位化
    normalized_embeddings = embeddings / norm # 單位化 embeddings
    validate_embeddings = tf.nn.embedding_lookup(normalized_embeddings,valid_dataset)
    # 就算出的餘弦相似度,是一個矩陣,每一行代表某個 valid example 與每個詞典的相似度
    similarity = tf.matmul(validate_embeddings,normalized_embeddings,transpose_b = True) # 餘弦相似度矩陣

if __name__ == "__main__":
    num_steps = 100001
    with tf.Session(graph = tf.get_default_graph()) as sess:
        tf.initialize_all_variables().run()
        print ('Initialized')
        average_loss = 0
        for step in range(num_steps):
            feed_dict = {train_inputs:batch_inputs,train_labels:batch_labels}
            _,loss_val = sess.run([optimizer,loss],feed_dict = feed_dict)
            average_loss += loss_val
            emv = sess.run(embed,feed_dict = {train_inputs:[37,18]})
            #print "emv----------------------------------",emv[0]
            if step % 1000 ==0:
                average_loss /= 1000
                print 'Average loss at step ',step,':',average_loss
                average_loss = 0
            
                sim = similarity.eval(session = sess)
                for i in range(valid_size):
                    valid_word = words[valid_examples[i]]
                    top_k = 8
                    nearest = (-sim[i,:]).argsort()[1:top_k + 1] # argsort 返回的是陣列值從小到大的索引值
                    log_str = 'Nearest to %s:' % valid_word
                    for k in range(top_k):
                        close_word = words[nearest[k]]
                        log_str = '%s,%s' %(log_str,close_word)
                    print log_str
        
        final_embeddings = sess.run(normalized_embeddings)
    
    # 將詞向量視覺化 
    def plot_with_labels(low_dim_embs,labels,filename = 'tsne.png'):
        assert low_dim_embs.shape[0] >= len(labels),'More labels than embeddings'
        plt.figure(figsize = (18,18))
        for i,label in enumerate(labels):
            x,y = low_dim_embs[i,:]
            plt.scatter(x,y)
            plt.annotate(label.decode("utf-8"),xy = (x,y),xytext = (5,2),textcoords ='offset points',ha = 'right',va = 'bottom')
        plt.savefig(filename)

    try:
        tsne = TSNE(perplexity = 30,n_components = 2,init = 'pca',n_iter = 5000)
        plot_only = 80 # 輸出 100 個詞
        low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only,:])
        labels = [unicode(words[i]) for i in range(plot_only)]
        plot_with_labels(low_dim_embs,labels)
    except:
        print "Save png Error"

執行結果如下:因為只有一篇 1700 字左右,所以效果不是很理想