1. 程式人生 > >深度有趣 | 13 詞向量的訓練

深度有趣 | 13 詞向量的訓練

簡介

使用TensorFlow實現中文詞向量的訓練,並完成一些簡單的語義任務

回顧

在全棧課程中介紹過如何使用gensim訓練中文詞向量,即詞嵌入(Word Embedding)

如果沒有gensim則安裝

pip install gensim

準備好語料,例如中文維基百科分詞語料

載入庫

# -*- coding: utf-8 -*-

from gensim.models import Word2Vec
from gensim.models.word2vec import LineSentence
import time

訓練模型並儲存,在我的筆記本上訓練共耗時1403秒

t0 = int(time.time())
sentences = LineSentence('wiki.zh.word.text')
model = Word2Vec(sentences, size=128, window=5, min_count=5, workers=4)
print('訓練耗時 %d s' % (int(time.time()) - t0))
model.save('gensim_128')

載入模型並使用

model = Word2Vec.load('gensim_128')
# 相關詞
items = model.wv.most_similar('數學')
for i, item in enumerate(items):
	print(i, item[0], item[1])
# 語義類比
print('=' * 20)
items = model.wv.most_similar(positive=['紐約', '中國'], negative=['北京'])
for i, item in enumerate(items):
	print(i, item[0], item[1])
# 不相關詞
print('=' * 20)
print(model.wv.doesnt_match(['早餐', '午餐', '晚餐', '手機']))
# 計算相關度
print('=' * 20)
print(model.wv.similarity('男人', '女人'))

原理

詞向量是對詞語的一種表示(representation)

  • 有了詞向量之後,就可以將一句話表示成一個向量序列,即一個二維Tensor
  • 如果是多個長度相等的句子,則可以表示為一個三維Tensor

說白了,詞向量就是一個二維矩陣,維度為V*dV是詞的總個數,d是詞向量的維度

One-Hot將每個詞語表示為一個V維向量,僅當前詞語對應的維度為1,其他維度為0

詞嵌入將One-Hot表示的高維稀疏向量,對映為該詞語對應的,低維稠密實值的詞向量

詞向量的訓練主要有兩種方法

  • CBOW(Continuous Bag-of-Words):根據上下文詞語預測當前詞
  • Skip-Gram:根據當前詞預測上下文詞語

CBOW和Skip-gram

這裡我們主要講一下Skip-Gram的原理

輸入為一個詞對應的整數id或One-Hot表示,經過Embedding層後得到對應的詞向量,經過一層對映和softmax處理後,得到每個詞對應的輸出概率

由於詞彙表往往非常大,幾萬、幾十萬甚至幾百萬,因此直接在整個詞彙表上進行多分類將會導致非常大的計算量

一個有效的解決方法是Negative Sampling,即每次隨機取樣一些負樣本

假設詞彙表大小為5W,對於某個輸入詞,已知對應的正確輸出詞,再隨機從詞彙表中選擇N個詞,這N個詞剛好是正確輸出詞的概率非常低,因此可以認為是負樣本

  • 給你一張狗狗圖片,判斷出對應的種類名稱
  • 給你五張狗狗圖片,判斷出每一張是否是哈士奇

這樣一來,就把一個5W分類的多分類問題,變成了N個二分類問題,同樣提供了可學習的梯度,並且大大降低了計算量

在具體實現中,可以使用Noise-Contrastive Estimation(NCE)作為損失函式,在TensorFlow中使用tf.nn.nce_loss()即可

實現

載入庫和語料,一共254419行

# -*- coding: utf-8 -*-

import pickle
import numpy as np
import tensorflow as tf
import collections
from tqdm import tqdm

with open('wiki.zh.word.text', 'rb') as fr:
    lines = fr.readlines()
print('共%d行' % len(lines))
print(lines[0].decode('utf-8'))

一共有148134974個詞

lines = [line.decode('utf-8') for line in lines]
words = ' '.join(lines)
words = words.replace('\n', '').split(' ')
print('共%d個詞' % len(words))

定義詞典

vocab_size = 50000
vocab = collections.Counter(words).most_common(vocab_size - 1)

詞頻統計

count = [['UNK', 0]]
count.extend(vocab)
print(count[:10])

詞和id之間的相互對映

word2id = {}
id2word = {}
for i, w in enumerate(count):
    word2id[w[0]] = i
    id2word[i] = w[0]
print(id2word[100], word2id['數學'])

將語料轉為id序列,一共有22385926個UNK

data = []
for i in tqdm(range(len(lines))):
    line = lines[i].strip('\n').split(' ')
    d = []
    for word in line:
        if word in word2id:
            d.append(word2id[word])
        else:
            d.append(0)
            count[0][1] += 1
    data.append(d)
print('UNK數量%d' % count[0][1])

準備訓練資料

X_train = []
Y_train = []
window = 3
for i in tqdm(range(len(data))):
    d = data[i]
    for j in range(len(d)):
        start = j - window
        end = j + window
        if start < 0:
            start = 0
        if end >= len(d):
            end = len(d) - 1
        
        while start <= end:
            if start == j:
                start += 1
                continue
            else:
                X_train.append(d[j])
                Y_train.append(d[start])
                start += 1
X_train = np.squeeze(np.array(X_train))
Y_train = np.squeeze(np.array(Y_train))
Y_train = np.expand_dims(Y_train, -1)
print(X_train.shape, Y_train.shape)

定義模型引數

batch_size = 128
embedding_size = 128
valid_size = 16
valid_range = 100
valid_examples = np.random.choice(valid_range, valid_size, replace=False)
num_negative_samples = 64

定義模型

X = tf.placeholder(tf.int32, shape=[batch_size], name='X')
Y = tf.placeholder(tf.int32, shape=[batch_size, 1], name='Y')
valid = tf.placeholder(tf.int32, shape=[None], name='valid')

embeddings = tf.Variable(tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0))
embed = tf.nn.embedding_lookup(embeddings, X)

nce_weights = tf.Variable(tf.truncated_normal([vocab_size, embedding_size], stddev=1.0 / np.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocab_size]))

loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights, biases=nce_biases, labels=Y, inputs=embed, num_sampled=num_negative_samples, num_classes=vocab_size))

optimizer = tf.train.AdamOptimizer().minimize(loss)

將詞向量歸一化,並計算和給定詞之間的相似度

norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), axis=1, keep_dims=True))
normalized_embeddings = embeddings / norm

valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid)
similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)

訓練模型

sess = tf.Session()
sess.run(tf.global_variables_initializer())

offset = 0
losses = []
for i in tqdm(range(1000000)):
    if offset + batch_size >= X_train.shape[0]:
        offset = (offset + batch_size) % X_train.shape[0]
        
    X_batch = X_train[offset: offset + batch_size]
    Y_batch = Y_train[offset: offset + batch_size]
    
    _, loss_ = sess.run([optimizer, loss], feed_dict={X: X_batch, Y: Y_batch})
    losses.append(loss_)
    
    if i % 2000 == 0 and i > 0:
        print('Iteration %d Average Loss %f' % (i, np.mean(losses)))
        losses = []
        
    if i % 10000 == 0:
        sim = sess.run(similarity, feed_dict={valid: valid_examples})
        for j in range(valid_size):
            valid_word = id2word[valid_examples[j]]
            top_k = 5
            nearests = (-sim[j, :]).argsort()[1: top_k + 1]
            s = 'Nearest to %s:' % valid_word
            for k in range(top_k):
                s += ' ' + id2word[nearests[k]]
            print(s)
            
    offset += batch_size

儲存模型、最終詞向量、對映字典

saver = tf.train.Saver()
saver.save(sess, './tf_128')

final_embeddings = sess.run(normalized_embeddings)
with open('tf_128.pkl', 'wb') as fw:
    pickle.dump({'embeddings': final_embeddings, 'word2id': word2id, 'id2word': id2word}, fw, protocol=4)

在單機上使用訓練好的模型和詞向量

載入庫和得到的詞向量、對映字典

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import pickle

with open('tf_128.pkl', 'rb') as fr:
    data = pickle.load(fr)
    final_embeddings = data['embeddings']
    word2id = data['word2id']
    id2word = data['id2word']

獲取頻次最高的前200個非單字詞,對其詞向量進行tSNE降維視覺化

word_indexs = []
count = 0
plot_only = 200
for i in range(1, len(id2word)):
    if len(id2word[i]) > 1:
        word_indexs.append(i)
        count += 1
        if count == plot_only:
            break

tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
two_d_embeddings = tsne.fit_transform(final_embeddings[word_indexs, :])
labels = [id2word[i] for i in word_indexs]

plt.figure(figsize=(15, 12))
for i, label in enumerate(labels):
    x, y = two_d_embeddings[i, :]
    plt.scatter(x, y)
    plt.annotate(label, (x, y), ha='center', va='top', fontproperties='Microsoft YaHei')
plt.savefig('詞向量降維視覺化.png')

可以看到,語義相關的詞確實都處於相近的位置

詞向量降維視覺化

可以載入TensorFlow模型,給valid指定一些詞對應的id以獲取相似詞

sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.import_meta_graph('tf_128.meta')
saver.restore(sess, tf.train.latest_checkpoint('.'))
graph = tf.get_default_graph()
valid = graph.get_tensor_by_name('valid:0')
similarity = graph.get_tensor_by_name('MatMul_1:0')

word = '數學'
sim = sess.run(similarity, feed_dict={valid: [word2id[word]]})
top_k = 10
nearests = (-sim[0, :]).argsort()[1: top_k + 1]
s = 'Nearest to %s:' % word
for k in range(top_k):
    s += ' ' + id2word[nearests[k]]
print(s)

和數學最相關的10個詞

Nearest to 數學: 理論 物理學 應用 物理 科學 化學 定義 哲學 生物學 天文學

使用詞向量完成其他語義任務

# 計算相關度
def cal_sim(w1, w2):
    return np.dot(final_embeddings[word2id[w1]], final_embeddings[word2id[w2]])
print(cal_sim('男人', '女人'))

# 相關詞
word = '數學'
sim = [[id2word[i], cal_sim(word, id2word[i])] for i in range(len(id2word))]
sim.sort(key=lambda x:x[1], reverse=True)
top_k = 10
for i in range(top_k):
    print(sim[i + 1])

# 不相關詞
def find_mismatch(words):
    vectors = [final_embeddings[word2id[word]] for word in words]
    scores = {word: np.mean([cal_sim(word, w) for w in words]) for word in words}
    scores = sorted(scores.items(), key=lambda x:x[1])
    return scores[0][0]
print(find_mismatch(['早餐', '午餐', '晚餐', '手機']))

參考

視訊講解課程