word2vec TensorFlow 註釋版本
阿新 • • 發佈:2018-11-19
import tensorflow as tf
import numpy as np
import time
import random
from collections import Counter
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
#read data
with open ('./data/text8') as f:
text = f.read()
def preprocess (text, freq=5):
'''
:param text:
:param freq:
:return:
'''
text = text.lower()
text = text.replace('.', ' <PERIOD> ')
text = text.replace(',', ' <COMMA> ')
text = text.replace('"', ' <QUOTATION_MARK> ')
text = text.replace(';', ' <SEMICOLON> ' )
text = text.replace('!', ' <EXCLAMATION_MARK> ')
text = text.replace('?', ' <QUESTION_MARK> ')
text = text.replace('(', ' <LEFT_PAREN> ')
text = text.replace(')', ' <RIGHT_PAREN> ')
text = text.replace('--', ' <HYPHENS> ')
text = text.replace('?' , ' <QUESTION_MARK> ')
text = text.replace(':', ' <COLON> ')
words = text.split()
#刪除低頻詞
word_counts = Counter(words)
trimmed_words = [word for word in words if word_counts[word] > freq]
return trimmed_words
#清洗並分詞
words = preprocess(text)
print(words[:10])
#構建對映表
vocab = set(words)
vocab_to_id = {w: c for c, w in enumerate(vocab)}
id_to_vocab = {c: w for c, w in enumerate(vocab)}
##對原文字進行vocab到id 的轉換
id_words = [vocab_to_id[w] for w in words]
##取樣和論文中的描述的不同
#這裡用 p (wi) = 1-sqrt(t/freq(wi))
#這個公式表示的是詞wi唄刪除的概率是p(wi) 其中t是超引數
t = 1e-5
threshold = 0.8 #刪除概率閾值
#統計單詞出現頻次
id_word_counts = Counter(id_words)
total_count = len(id_words)
#計算單詞陪頻率
word_freqs = {w: c/total_count for w, c in id_word_counts.items()}
#計算刪除概率
drop_prob = {w : 1 - np.sqrt(t/word_freqs[w]) for w in id_word_counts}
#對單詞采樣
train_words = [w for w in id_words if drop_prob[w] < threshold]
print(len(train_words))
##構造batch
def get_targets(words, idx, window_size = 2):
'''
:param words:
:param idx:
:param window_size:
:return:
'''
target_window = np.random.randint(1, window_size+1)
#考慮input word前面單詞不夠的情況
start_point = idx - target_window if (idx - target_window) > 0 else 0
end_point = idx + target_window
# 視窗上下文的單詞
targets = set(words[start_point: idx] + words[idx+1: end_point+1])
return list(targets)
def get_batches(word, batch_size, window_size = 2):
'''
:param word:
:param batch_size:
:param window_size:
:return:
'''
#計算可以分成多少個batch
n_batches = len(word) // batch_size
#取整數個batch
words = word[:n_batches*batch_size]
for idx in range(0, len(words), batch_size): #從 0 開始 步長為 batch_size
x, y = [], []
batch = words[idx: idx+batch_size]
for i in range(len(batch)):
batch_x = batch[i]
batch_y = get_targets(batch, i, window_size)
#由於一個input word 會對應多個output word 所以長度需要統一
x.extend([batch_x]*len(batch_y))
y.extend(batch_y)
yield x, y
# 構建網路
#輸入層
train_graph = tf.Graph()
with train_graph.as_default():
inputs = tf.placeholder(tf.int32, shape=[None], name='inputs')
labels = tf.placeholder(tf.int32, shape=[None,None], name='labels')
#embedding 層
vocab_size = len(id_to_vocab)
embedding_size = 128
with train_graph.as_default():
embedding = tf.Variable(tf.random_uniform([vocab_size, embedding_size], -1, 1))
#實現look up
embed = tf.nn.embedding_lookup(embedding, inputs)
## 負取樣
n_sampled = 100
with train_graph.as_default():
w = tf.Variable(tf.truncated_normal([vocab_size, embedding_size], stddev=0.1))
b = tf.Variable(tf.zeros(vocab_size))
loss = tf.nn.sampled_softmax_loss(w, b, labels, embed, n_sampled, vocab_size)
cost = tf.reduce_mean(loss)
optimizer = tf.train.AdamOptimizer().minimize(cost)
##驗證詞的相似度
with train_graph.as_default():
#隨機選詞
valid_size = 16
valid_window = 100
#從不同的位置選取8個詞
valid_examples = np.array(random.sample(range(valid_window), valid_size//2))
valid_examples = np.append(valid_examples,
random.sample(range(1000,1000+valid_window), valid_size//2))
valid_size = len(valid_examples)
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
#向量單位化
norm = tf.sqrt(tf.reduce_sum(tf.square(embedding), 1, keepdims=True))
normalized = embedding / norm
valid_embedding = tf.nn.embedding_lookup(normalized, valid_dataset)
#餘弦相似度
similarity = tf.matmul(valid_embedding, normalized, transpose_b=True)
#訓練
epochs = 10 # 迭代輪數
batch_size = 1000 # batch大小
window_size = 2 # 視窗大小
with train_graph.as_default():
saver = tf.train.Saver() # 檔案儲存
with tf.Session(graph=train_graph) as sess:
iteration = 1
loss = 0
sess.run(tf.global_variables_initializer())
for e in range(1, epochs+1):
batches = get_batches(train_words, batch_size, window_size)
start = time.time()
#
for x, y in batches:
feed = {inputs: x,
labels: np.array(y)[:, None]}
train_loss, _ = sess.run([cost, optimizer], feed_dict=feed)
loss += train_loss
print(loss)
if iteration % 100 == 0:
end = time.time()
print("Epoch {}/{}".format(e, epochs),
"Iteration: {}".format(iteration),
"Avg. Training loss: {:.4f}".format(loss/100),
"{:.4f} sec/batch".format((end-start)/100))
loss = 0
start = time.time()
# 計算相似的詞
if iteration % 1000 == 0:
# 計算similarity
sim = similarity.eval()
for i in range(valid_size):
valid_word = id_to_vocab[valid_examples[i]]
top_k = 8 # 取最相似單詞的前8個
nearest = (-sim[i, :]).argsort()[1:top_k+1]
log = 'Nearest to [%s]:' % valid_word
for k in range(top_k):
close_word = id_to_vocab[nearest[k]]
log = '%s %s,' % (log, close_word)
print(log)
iteration += 1
save_path = saver.save(sess, "checkpoints/text8.ckpt")
embed_mat = sess.run(normalized)
viz_words = 500
tsne = TSNE()
embed_tsne = tsne.fit_transform(embed_mat[:viz_words, :])
fig, ax = plt.subplots(figsize=(14, 14))
for idx in range(viz_words):
plt.scatter(*embed_tsne[idx, :], color='steelblue')
plt.annotate(id_to_vocab[idx], (embed_tsne[idx, 0], embed_tsne[idx, 1]), alpha=0.7)
plt.show()