1. 程式人生 > >TensorFlow實現word2vec 詳細程式碼解釋

TensorFlow實現word2vec 詳細程式碼解釋

參考1:http://blog.csdn.net/mylove0414/article/details/69789203

參考2:《TensorFlow實戰》

參考3:http://www.jianshu.com/p/f682066f0586

程式碼配合參考3的圖形說明,可以更容易理解。本例子,打算實現中文的word2vec,但最後顯示都是一群亂碼,對應的中文程式碼,也沒有更改。如果有人找到了解決方法,請告知,謝謝。

實驗用到了 python3.5,TensorFlow的環境。

試驗用到的文字可以去參考一,實驗結果如下圖。


實驗結束的程式碼畫圖展示:



由上圖,可知道兩個單詞相似度越高,在壓縮的2位空間圖中就越近,如上個圖的幾個畫圈的點,日常中使用相似度很高。

實驗程式碼詳細解釋:

# coding=utf-8

import collections
import sys
import re  
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import jieba
from sklearn.manifold import TSNE
'''
讀取原始檔,並轉為list輸出
@param filename:檔名
@return: list of words
'''
# # 讀中文的
# def wseg_sentence(input):
#     with open(input, 'r',encoding='utf-8') as fp:
#         ret=[]
#         for line in fp.readlines():
#             word = line.strip()
#             if word !=None and len(word)>0 :
#                 u_ret = jieba.cut(word, cut_all = False) #精準切分模式
#                 ret.append(u_ret)
#                 # print(ret)
#         print(ret)
#         return ret
#讀英文的。
def read_file(filename):
    f=open(filename,'r')
    file_read=f.read()
    words_=re.sub("[^a-zA-Z]+", " ",file_read).lower() #正則匹配,只留下單詞,且大寫改小寫
    words=list(words_.split())  #length of words:1121985

    # #將寫出的格式,儲存在某個文本里
    # m=0
    # for  i in words:
    #     m=m+1
    #     if m<10:
    #         file_object.write(i)
    return words
# words= wseg_sentence('../dataset/hong.txt')
words=read_file('../dataset/heli.txt')
file_object = open('../dataset/1111', 'w')

vocabulary_size=2000  #預定義頻繁單詞庫的長度
count=[['UNK',-1]]    #初始化單詞頻數統計集合

'''
1.給@param “words”中出現過的單詞做頻數統計,取top 1999頻數的單詞放入dictionary中,以便快速查詢。
2.給哈利波特這本“單詞庫”@param“words”編碼,出現在top 1999之外的單詞,統一令其為“UNK”(未知),編號為0,並統計這些單詞的數量。
@return: 哈利波特這本書的編碼data,每個單詞的頻數統計count,詞彙表dictionary及其反轉形式reverse_dictionary
'''
def build_dataset(words):
    #most_common方法: 去top2000的頻數的單詞,建立一個dict,放進去。以詞頻排序
    counter=collections.Counter(words).most_common(vocabulary_size-1) #length of all counter:22159 取top1999頻數的單詞作為vocabulary,其他的作為unknown
    # print('123  :', type(count))  #list型別
    # print(len(counter)) #1999個
    # print(counter)
    #[('the', 51897), ('and', 27525), ('to', 26996), ('he', 22203), ('of', 21851), ('a', 21043), ('harry', 18165), ('was', 15637), ('you', 14627), ('it', 14489),。 ('member', 49), ('fake', 49)。]
    count.extend(counter)
    #搭建dictionary
    dictionary={}
    for word,_ in count:
        dictionary[word]=len(dictionary)
    # print('234   :',dictionary.get('the'))   輸出為1
    data=[]
    #全部單詞轉為編號
    #先判斷這個單詞是否出現在dictionary,如果是,就轉成編號,如果不是,則轉為編號0(代表UNK)
    unk_count=0
    for word in words:
        if word in dictionary:
            index=dictionary[word]
        else:
            index=0
            unk_count+=1
        data.append(index)

    count[0][1]=unk_count
    reverse_dictionary=dict(zip(dictionary.values(),dictionary.keys()))
    # print('aaaaa   ',data)                 #根據文章,將全文每個數,對應的排名標記, 比如 the 出現最多,則對應的數為1,不在前1999的標記為0  【959, 18, 7, 6, 1728, 0, 306, 13, 450, 1175, 32, 180, 265, 3, 9, 205,】
    # print('00000   ',count)               #前1999,個對應的出現的次數,第一個數是 總共多少不在前1999的字數   比如: the:51897
    # print('bbbbb   ',dictionary)          #前1999的數的dict  ['the':1]這種
    # print('1111   :',reverse_dictionary)  #按照dict值來排序,變得有規律起來。{0: 'UNK', 1: 'the', 2: 'and', 3: 'to', 4: 'he', 5: 'of', 6: 'a', 7: 'harry', 8: 'was', 9:}
    return data,count,dictionary,reverse_dictionary

data,count,dictionary,reverse_dictionary=build_dataset(words)   
del words #刪除原始單詞列表,節約記憶體



data_index=0


'''
採用Skip-Gram模式,  生成我們需要的 input label 的那種格式。,可以參看第二個小例子。
生成word2vec訓練樣本
@param batch_size:每個批次訓練多少樣本
@param num_skips: 為每個單詞生成多少樣本(本次實驗是2個),batch_size必須是num_skips的整數倍,這樣可以確保由一個目標詞彙生成的樣本在同一個批次中。
@param skip_window:單詞最遠可以聯絡的距離(本次實驗設為1,即目標單詞只能和相鄰的兩個單詞生成樣本),2*skip_window>=num_skips
'''
def generate_batch(batch_size,num_skips,skip_window):

    global data_index
    assert batch_size%num_skips==0
    assert num_skips<=2*skip_window

    batch=np.ndarray(shape=(batch_size),dtype=np.int32)
    # print(batch)  128個一維陣列
    labels=np.ndarray(shape=(batch_size,1),dtype=np.int32)
    # print(labels)  128個二維陣列
    span=2*skip_window+1   #入隊長度
    # print(span)
    buffer=collections.deque(maxlen=span)
    
    for _ in range(span):  #雙向佇列填入初始值

        buffer.append(data[data_index])
        data_index=(data_index+1)%len(data)  
    # print('cishu :000', batch_size//num_skips)
    for i in range(batch_size//num_skips):  #第一次迴圈,i表示第幾次入雙向佇列deque
        for j in range(span):  #內部迴圈,處理deque
            if j>skip_window:
                batch[i*num_skips+j-1]=buffer[skip_window]
                labels[i*num_skips+j-1,0]=buffer[j]
            elif j==skip_window:
                continue
            else:
                batch[i*num_skips+j]=buffer[skip_window]
                labels[i*num_skips+j,0]=buffer[j]
        buffer.append(data[data_index])  #入隊一個單詞,出隊一個單詞
        data_index=(data_index+1)%len(data)
    # print('batch    :',batch)
    # print('label    :',labels)
    return batch,labels    


#開始訓練
batch_size=128   
embedding_size=128
skip_window=1
num_skips=2
num_sampled=64  #訓練時用來做負樣本的噪聲單詞的數量
#驗證資料
valid_size=16 #抽取的驗證單詞數
valid_window=100 #驗證單詞只從頻數最高的100個單詞中抽取
valid_examples=np.random.choice(valid_window,valid_size,replace=False)#不重複在0——10l裡取16個


graph=tf.Graph()
with graph.as_default():
    train_inputs=tf.placeholder(tf.int32, shape=[batch_size])
    train_labels=tf.placeholder(tf.int32, shape=[batch_size,1])
    valid_dataset=tf.constant(valid_examples,dtype=tf.int32)
    #單詞維度為 2000單詞大小,128向量維度
    embeddings=tf.Variable(tf.random_uniform([vocabulary_size,embedding_size], -1, 1))   #初始化embedding vector
    #使用tf.nn.embedding_lookup查詢輸入train_inputs 對應的向量embed
    embed=tf.nn.embedding_lookup(embeddings, train_inputs) 
    
    #用NCE loss作為優化訓練的目標
    #tf.truncated_normal初始化nce loss中的權重引數 nce_weights,並將nce_biases初始化為0
    nce_weights=tf.Variable(tf.truncated_normal([vocabulary_size,embedding_size], stddev=1.0/np.math.sqrt(embedding_size)))
    nce_bias=tf.Variable(tf.zeros([vocabulary_size]))
    #計算學習出的詞向量embedding在訓練資料上的loss,並使用tf.reduce_mean進行魂彙總
    loss=tf.reduce_mean(tf.nn.nce_loss(nce_weights, nce_bias, embed, train_labels, num_sampled, num_classes=vocabulary_size))
    #學習率為1.0,L2正規化標準化後的enormalized_embedding。
    #通過cos方式來測試  兩個之間的相似性,與向量的長度沒有關係。
    optimizer=tf.train.GradientDescentOptimizer(1.0).minimize(loss)
    norm=tf.sqrt(tf.reduce_sum(tf.square(embeddings), axis=1, keep_dims=True))
    normalized_embeddings=embeddings/norm   #除以其L2範數後得到標準化後的normalized_embeddings
    
    valid_embeddings=tf.nn.embedding_lookup(normalized_embeddings,valid_dataset)    #如果輸入的是64,那麼對應的embedding是normalized_embeddings第64行的vector
    similarity=tf.matmul(valid_embeddings,normalized_embeddings,transpose_b=True)   #計算驗證單詞的嵌入向量與詞彙表中所有單詞的相似性
    print('相似性:',similarity)
    init=tf.global_variables_initializer()
    
num_steps=100001
with tf.Session(graph=graph) as session:
    init.run()
    print("Initialized")
    avg_loss=0
    for step in range(num_steps):
        batch_inputs,batch_labels=generate_batch(batch_size, num_skips, skip_window)  #產生批次訓練樣本
        feed_dict={train_inputs:batch_inputs,train_labels:batch_labels}   #賦值
        _,loss_val=session.run([optimizer,loss],feed_dict=feed_dict)
        avg_loss+=loss_val
        #每2000次,計算一下平均loss並顯示出來。
        if step % 2000 ==0:
            if step>0:
                avg_loss/=2000
            print("Avg loss at step ",step,": ",avg_loss)
            avg_loss=0
            #每10000次,驗證單詞與全部單詞的相似度,並將與每個驗證單詞最相似的8個找出來。
        if step%10000==0:
            sim=similarity.eval()
            for i in range(valid_size):
                valid_word=reverse_dictionary[valid_examples[i]]  #得到驗證單詞
                top_k=8  
                nearest=(-sim[i,:]).argsort()[1:top_k+1]     #每一個valid_example相似度最高的top-k個單詞
                log_str="Nearest to %s:" % valid_word
                for k in range(top_k):
                    close_word=reverse_dictionary[nearest[k]]
                    log_str="%s %s," %(log_str,close_word)
                print(log_str)
    final_embedding=normalized_embeddings.eval()
'''
視覺化Word2Vec散點圖並儲存
'''
def plot_with_labels(low_dim_embs,labels,filename):
    #low_dim_embs 降維到2維的單詞的空間向量
    assert low_dim_embs.shape[0]>=len(labels),"more labels than embedding"
    plt.figure(figsize=(18,18))
    for i,label in enumerate(labels):
        x,y=low_dim_embs[i,:]
        plt.scatter(x, y)
        #展示單詞本身
        plt.annotate(label,xy=(x,y),xytext=(5,2),textcoords='offset points',ha='right',va='bottom')
    plt.savefig(filename)

'''
tsne實現降維,將原始的128維的嵌入向量降到2維
'''
print('123213123')
tsne=TSNE(perplexity=30,n_components=2,init='pca',n_iter=5000)
plot_number=100
low_dim_embs=tsne.fit_transform(final_embedding[:plot_number,:])
labels=[reverse_dictionary[i] for i in range(plot_number)]
plot_with_labels(low_dim_embs, labels, './plot.png')
print('1231231')