1. 程式人生 > >word2vec TensorFlow 註釋版本

word2vec TensorFlow 註釋版本

import tensorflow as tf
import numpy as np
import time
import random
from collections import Counter
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

#read data
with open ('./data/text8') as f:
    text = f.read()


def preprocess
(text, freq=5): ''' :param text: :param freq: :return: ''' text = text.lower() text = text.replace('.', ' <PERIOD> ') text = text.replace(',', ' <COMMA> ') text = text.replace('"', ' <QUOTATION_MARK> ') text = text.replace(';', ' <SEMICOLON> '
) text = text.replace('!', ' <EXCLAMATION_MARK> ') text = text.replace('?', ' <QUESTION_MARK> ') text = text.replace('(', ' <LEFT_PAREN> ') text = text.replace(')', ' <RIGHT_PAREN> ') text = text.replace('--', ' <HYPHENS> ') text = text.replace('?'
, ' <QUESTION_MARK> ') text = text.replace(':', ' <COLON> ') words = text.split() #刪除低頻詞 word_counts = Counter(words) trimmed_words = [word for word in words if word_counts[word] > freq] return trimmed_words #清洗並分詞 words = preprocess(text) print(words[:10]) #構建對映表 vocab = set(words) vocab_to_id = {w: c for c, w in enumerate(vocab)} id_to_vocab = {c: w for c, w in enumerate(vocab)} ##對原文字進行vocab到id 的轉換 id_words = [vocab_to_id[w] for w in words] ##取樣和論文中的描述的不同 #這裡用 p (wi) = 1-sqrt(t/freq(wi)) #這個公式表示的是詞wi唄刪除的概率是p(wi) 其中t是超引數 t = 1e-5 threshold = 0.8 #刪除概率閾值 #統計單詞出現頻次 id_word_counts = Counter(id_words) total_count = len(id_words) #計算單詞陪頻率 word_freqs = {w: c/total_count for w, c in id_word_counts.items()} #計算刪除概率 drop_prob = {w : 1 - np.sqrt(t/word_freqs[w]) for w in id_word_counts} #對單詞采樣 train_words = [w for w in id_words if drop_prob[w] < threshold] print(len(train_words)) ##構造batch def get_targets(words, idx, window_size = 2): ''' :param words: :param idx: :param window_size: :return: ''' target_window = np.random.randint(1, window_size+1) #考慮input word前面單詞不夠的情況 start_point = idx - target_window if (idx - target_window) > 0 else 0 end_point = idx + target_window # 視窗上下文的單詞 targets = set(words[start_point: idx] + words[idx+1: end_point+1]) return list(targets) def get_batches(word, batch_size, window_size = 2): ''' :param word: :param batch_size: :param window_size: :return: ''' #計算可以分成多少個batch n_batches = len(word) // batch_size #取整數個batch words = word[:n_batches*batch_size] for idx in range(0, len(words), batch_size): #從 0 開始 步長為 batch_size x, y = [], [] batch = words[idx: idx+batch_size] for i in range(len(batch)): batch_x = batch[i] batch_y = get_targets(batch, i, window_size) #由於一個input word 會對應多個output word 所以長度需要統一 x.extend([batch_x]*len(batch_y)) y.extend(batch_y) yield x, y # 構建網路 #輸入層 train_graph = tf.Graph() with train_graph.as_default(): inputs = tf.placeholder(tf.int32, shape=[None], name='inputs') labels = tf.placeholder(tf.int32, shape=[None,None], name='labels') #embedding 層 vocab_size = len(id_to_vocab) embedding_size = 128 with train_graph.as_default(): embedding = tf.Variable(tf.random_uniform([vocab_size, embedding_size], -1, 1)) #實現look up embed = tf.nn.embedding_lookup(embedding, inputs) ## 負取樣 n_sampled = 100 with train_graph.as_default(): w = tf.Variable(tf.truncated_normal([vocab_size, embedding_size], stddev=0.1)) b = tf.Variable(tf.zeros(vocab_size)) loss = tf.nn.sampled_softmax_loss(w, b, labels, embed, n_sampled, vocab_size) cost = tf.reduce_mean(loss) optimizer = tf.train.AdamOptimizer().minimize(cost) ##驗證詞的相似度 with train_graph.as_default(): #隨機選詞 valid_size = 16 valid_window = 100 #從不同的位置選取8個詞 valid_examples = np.array(random.sample(range(valid_window), valid_size//2)) valid_examples = np.append(valid_examples, random.sample(range(1000,1000+valid_window), valid_size//2)) valid_size = len(valid_examples) valid_dataset = tf.constant(valid_examples, dtype=tf.int32) #向量單位化 norm = tf.sqrt(tf.reduce_sum(tf.square(embedding), 1, keepdims=True)) normalized = embedding / norm valid_embedding = tf.nn.embedding_lookup(normalized, valid_dataset) #餘弦相似度 similarity = tf.matmul(valid_embedding, normalized, transpose_b=True) #訓練 epochs = 10 # 迭代輪數 batch_size = 1000 # batch大小 window_size = 2 # 視窗大小 with train_graph.as_default(): saver = tf.train.Saver() # 檔案儲存 with tf.Session(graph=train_graph) as sess: iteration = 1 loss = 0 sess.run(tf.global_variables_initializer()) for e in range(1, epochs+1): batches = get_batches(train_words, batch_size, window_size) start = time.time() # for x, y in batches: feed = {inputs: x, labels: np.array(y)[:, None]} train_loss, _ = sess.run([cost, optimizer], feed_dict=feed) loss += train_loss print(loss) if iteration % 100 == 0: end = time.time() print("Epoch {}/{}".format(e, epochs), "Iteration: {}".format(iteration), "Avg. Training loss: {:.4f}".format(loss/100), "{:.4f} sec/batch".format((end-start)/100)) loss = 0 start = time.time() # 計算相似的詞 if iteration % 1000 == 0: # 計算similarity sim = similarity.eval() for i in range(valid_size): valid_word = id_to_vocab[valid_examples[i]] top_k = 8 # 取最相似單詞的前8個 nearest = (-sim[i, :]).argsort()[1:top_k+1] log = 'Nearest to [%s]:' % valid_word for k in range(top_k): close_word = id_to_vocab[nearest[k]] log = '%s %s,' % (log, close_word) print(log) iteration += 1 save_path = saver.save(sess, "checkpoints/text8.ckpt") embed_mat = sess.run(normalized) viz_words = 500 tsne = TSNE() embed_tsne = tsne.fit_transform(embed_mat[:viz_words, :]) fig, ax = plt.subplots(figsize=(14, 14)) for idx in range(viz_words): plt.scatter(*embed_tsne[idx, :], color='steelblue') plt.annotate(id_to_vocab[idx], (embed_tsne[idx, 0], embed_tsne[idx, 1]), alpha=0.7) plt.show()