基於gibbs取樣的topic over time
程式碼參考:https://github.com/ahmaurya/topics_over_time,如有侵權,請告知刪除~
吉布斯取樣(Gibbs sampling)是統計學中用於馬爾科夫蒙特卡洛(MCMC)的一種演算法,用於在難以直接取樣時從某一多變數概率分佈中近似抽取樣本序列。該序列可用於近似聯合分佈、部分變數的邊緣分佈或計算積分(如某一變數的期望值)。某些變數可能為已知變數,故對這些變數並不需要取樣。
吉布斯取樣常用於統計推斷(尤其是貝葉斯推斷)之中。這是一種隨機化演算法,與最大期望演算法等統計推斷中的確定性演算法相區別。與其他MCMC演算法一樣,吉布斯取樣從馬爾科夫鏈中抽取樣本,可以看作是Metropolis–Hastings演算法的特例。
Gibbs Samping 是MCMC中最常用的方法,基本的原理就是通過隨機模擬, 採集期望數量的 目標分佈的樣本,這些樣本構造了一條馬爾可夫鏈,而由這些樣本集,基本可以推斷出目標分佈的引數以及其它的想了解的後驗分佈。但通常如何採集 樣本成為關鍵,應用它的原因是目標分佈的分佈函式未知,但是構成目標分佈的變數的條件分佈是知道的,那麼就可以用隨機模擬的思想,利用貝葉斯公式的特性,從條件概率依次對構成目標分佈的每個變數進行取樣,做跳轉,每次只針對一個維度做迭代,當所有維度都跳轉一次之後所的樣本即可則看作是目標分佈的一次跳轉。
通常若按數學公式是可以直接求出聯合概率的,但隨著變數數量的增大,公式求解,變得異常複雜,遂通過取樣的方式求得聯合概率分佈。Gibbs Samping的基本過程:比如我們已知變數A,B,C,並知p(A|B,C),p(B|A,C),p(C|A,B)。
step1:給ABC隨機賦值,即隨機一個樣本,如(A0,B0,C0);
step2:根據p(A|B0,C0)得A1;
step3:根據p(B|A1,C0)得B1;
step4:根據p(C|A1,B1)得C1;
現得樣本(A1,B1,C1),重複step1到step4,經過若干次迭代過程後,則結果基本趨於變數實際的分佈,即可作為聯合釋出。
通過上面對吉布斯取樣的簡單理解,下面便開始基於gibbs取樣的topic over time演算法的構建。
【資料集】
1、allstopword:停用詞資料集
2、alltime:時間戳資料集
3、alltitle:topic資料集
【定義topicovertime類】
操作之前需要匯入的包如下:
import fileinput import random import scipy.special import numpy as np import scipy.stats import copy
1、定義獲取語料庫和詞典的方法
def GetPnasCorpusAndDictionary(self, documents_path, timestamps_path, stopwords_path):
'''
獲取PNAS語料庫和詞典(去除停用詞)
:param documents_path: 檔案路徑
:param timestamps_path:時間戳路徑
:param stopwords_path:停用詞路徑
:return: 以列表的形式返回時間戳、文字和詞典
'''
documents = []
timestamps = []
dictionary = set()
stopwords = set()
for line in fileinput.input(stopwords_path): # 更新停用詞
stopwords.update(set(line.lower().strip().split()))
for doc in fileinput.input(documents_path): # 將停用詞之外的單詞加到檔案中並且文字中不重複的載入到詞典中
words = [word for word in doc.lower().strip().split() if word not in stopwords]
documents.append(words)
dictionary.update(set(words))
for timestamp in fileinput.input(timestamps_path): # 獲取時間戳
num_titles = int(timestamp.strip().split()[0]) # 時間戳檔案第一列作為主題個數
timestamp = float(timestamp.strip().split()[1]) # 時間戳檔案第二列作為時間戳
timestamps.extend([timestamp for title in range(num_titles)]) # 將時間戳檔案中主題後面對應的時間戳重複對應的主題次數後儲存到時間戳列表中
for line in fileinput.input(stopwords_path): # 更新停用詞
stopwords.update(set(line.lower().strip().split()))
first_timestamp = timestamps[0]
last_timestamp = timestamps[len(timestamps) - 1]
timestamps = [1.0 * (t - first_timestamp) / (last_timestamp - first_timestamp) for t in timestamps] # 通過這個函式獲取新的時間戳列表
dictionary = list(dictionary)
assert len(documents) == len(timestamps) # 斷言,如果為false,則觸發異常
return documents, timestamps, dictionary
2、初始化引數
def InitializeParameters(self, documents, timestamps, dictionary):
'''
初始化引數
:param documents: 檔案內容列表
:param timestamps: 時間戳列表
:param dictionary: 詞典列表
:return:返回存放所有引數的字典
'''
par = {} # 定義存放所有引數的字典
par['dataset'] = 'pnas' # 資料集的名字
par['max_iterations'] = 10 # gibbs取樣的最大迭代次數
par['T'] = 10 # 主題個數
par['D'] = len(documents) # 文字長度
par['V'] = len(dictionary) # 詞典長度
par['N'] = [len(doc) for doc in documents] # 文字中每個字元的長度列表
par['alpha'] = [50.0 / par['T'] for _ in range(par['T'])] # 返回主題個數長度的一個列表,列表內容為公式計算的內容,作為alpha的值
par['beta'] = [0.1 for _ in range(par['V'])] # 返回詞典長度的一個列表,列表內容為0.1,作為beta的值
par['beta_sum'] = sum(par['beta']) # beta值的和
par['psi'] = [[1 for _ in range(2)] for _ in range(par['T'])] # 重複主題個數次的[1,1]
par['betafunc_psi'] = [scipy.special.beta(par['psi'][t][0], par['psi'][t][1]) for t in range(par['T'])] # 返回一個主題個數的列表,內容為beta函式值
par['word_id'] = {dictionary[i]: i for i in range(len(dictionary))} # 返回一個字典列表,形式為{字典內容: 字典下標}
par['word_token'] = dictionary # 字典列表
par['z'] = [[random.randrange(0, par['T']) for _ in range(par['N'][d])] for d in range(par['D'])] # 返回每個字元長度的0-10之間的隨機整數
par['t'] = [[timestamps[d] for _ in range(par['N'][d])] for d in range(par['D'])] # 返回每個字元長度的對應下標的時間戳的值列表
par['w'] = [[par['word_id'][documents[d][i]] for i in range(par['N'][d])] for d in range(par['D'])] # 返回每個字元長度的對應字典內容的字典下標
par['m'] = [[0 for t in range(par['T'])] for d in range(par['D'])] # 返回文字長度的列表,每個元素為主題個數長度的全零列表
par['n'] = [[0 for v in range(par['V'])] for t in range(par['T'])] # 返回主題個數長度的列表,每個元素為字典長度的全零列表
par['n_sum'] = [0 for t in range(par['T'])] # 返回主題個數長度的全零列表
np.set_printoptions(threshold=np.inf) # 設定列印是顯示方式,此時為全部輸出,中間部門不包含省略號
np.seterr(divide='ignore', invalid='ignore') # 設定如何處理浮點錯誤
self.CalculateCounts(par)
return par
對初始化引數的方法中使用的CalculateCounts()方法進行定義:
def CalculateCounts(self, par):
'''
計算總數
:param par: 存放所有引數的字典
:return:
'''
for d in range(par['D']): # 文字長度
for i in range(par['N'][d]): # 文字中每個字串的長度列表
topic_di = par['z'][d][i] # 在文件d中i位置的topic id
word_di = par['w'][d][i] # 在文件d中i位置的Word id
par['m'][d][topic_di] += 1
par['n'][topic_di][word_di] += 1
par['n_sum'][topic_di] += 1
3、獲得主題時間戳
def GetTopicTimestamps(self, par):
'''
獲得主題時間戳
:param par: 存放所有引數的字典
:return: 返回主題時間戳
'''
topic_timestamps = []
for topic in range(par['T']): # 主題個數
current_topic_timestamps = []
current_topic_doc_timestamps = [[(par['z'][d][i] == topic) * par['t'][d][i] for i in range(par['N'][d])] for
d in range(par['D'])]
for d in range(par['D']): # 文字長度
current_topic_doc_timestamps[d] = filter(lambda x: x != 0, current_topic_doc_timestamps[d])
for timestamps in current_topic_doc_timestamps:
current_topic_timestamps.extend(timestamps)
assert current_topic_timestamps != [] # 斷言,如果為false,則觸發異常
topic_timestamps.append(current_topic_timestamps)
return topic_timestamps
4、估計psi的值
def GetMethodOfMomentsEstimatesForPsi(self, par):
'''
估計psi的值
:param par: 存放所有引數的字典
:return: psi的值
'''
topic_timestamps = self.GetTopicTimestamps(par) # 獲得主題時間戳
psi = [[1 for _ in range(2)] for _ in range(len(topic_timestamps))] # 得到主題時間戳的一個列表,列表的每個元素為[1,1]
for i in range(len(topic_timestamps)): # 主題時間戳的長度
current_topic_timestamps = topic_timestamps[i] #獲得當前主題時間戳
timestamp_mean = np.mean(current_topic_timestamps) # 得到當前主題時間戳的均值
timestamp_var = np.var(current_topic_timestamps) # 得到當前主題時間戳的方差
if timestamp_var == 0: # 設定方差不為0
timestamp_var = 1e-6
common_factor = timestamp_mean * (1 - timestamp_mean) / timestamp_var - 1 # 得到公共因子
# 計算得到psi的值
psi[i][0] = 1 + timestamp_mean * common_factor
psi[i][1] = 1 + (1 - timestamp_mean) * common_factor
return psi
5、獲取θ和φ的值
def ComputePosteriorEstimatesOfThetaAndPhi(self, par):
'''
計算theta和phi的值
:param par: 存放所有引數的字典
:return: 返回theta和phi的值
'''
theta = copy.deepcopy(par['m']) # 對par['m']進行深層複製操作
phi = copy.deepcopy(par['n']) # 對par['n']進行深層複製操作
for d in range(par['D']): # 文字長度
if sum(theta[d]) == 0:
theta[d] = np.asarray([1.0 / len(theta[d]) for _ in range(len(theta[d]))])
else:
theta[d] = np.asarray(theta[d])
theta[d] = 1.0 * theta[d] / sum(theta[d])
theta = np.asarray(theta) # 得到θ的值
for t in range(par['T']): # 主題個數
if sum(phi[t]) == 0:
phi[t] = np.asarray([1.0 / len(phi[t]) for _ in range(len(phi[t]))])
else:
phi[t] = np.asarray(phi[t])
phi[t] = 1.0 * phi[t] / sum(phi[t])
phi = np.asarray(phi) # 得到φ的值
return theta, phi
6、進行gibbs取樣
def TopicsOverTimeGibbsSampling(self, par):
'''
topic over time模型進行Gibbs取樣【馬爾科夫鏈蒙特卡洛方法:吉布斯取樣】
:param par: 存放所有引數的字典
:return: 返回theta, phi, psi的值
'''
for iteration in range(par['max_iterations']): # gibbs取樣最大迭代次數
for d in range(par['D']): # 文字長度
for i in range(par['N'][d]): # 文中中每個字元的長度列表
word_di = par['w'][d][i]
t_di = par['t'][d][i]
old_topic = par['z'][d][i]
par['m'][d][old_topic] -= 1
par['n'][old_topic][word_di] -= 1
par['n_sum'][old_topic] -= 1
topic_probabilities = [] # 定義一個主題概率列表
for topic_di in range(par['T']):
psi_di = par['psi'][topic_di]
topic_probability = 1.0 * (par['m'][d][topic_di] + par['alpha'][topic_di])
topic_probability *= ((1 - t_di) ** (psi_di[0] - 1)) * ((t_di) ** (psi_di[1] - 1))
topic_probability /= par['betafunc_psi'][topic_di]
topic_probability *= (par['n'][topic_di][word_di] + par['beta'][word_di])
topic_probability /= (par['n_sum'][topic_di] + par['beta_sum'])
topic_probabilities.append(topic_probability) # 計算主題概率
sum_topic_probabilities = sum(topic_probabilities) # 對主題概率求和
if sum_topic_probabilities == 0:
topic_probabilities = [1.0 / par['T'] for _ in range(par['T'])]
else:
topic_probabilities = [p / sum_topic_probabilities for p in topic_probabilities]
new_topic = list(np.random.multinomial(1, topic_probabilities, size=1)[0]).index(1)
par['z'][d][i] = new_topic
par['m'][d][new_topic] += 1
par['n'][new_topic][word_di] += 1
par['n_sum'][new_topic] += 1
if d % 1000 == 0:
print('Done with iteration {iteration} and document {document}'.format(iteration=iteration,
document=d))
par['psi'] = self.GetMethodOfMomentsEstimatesForPsi(par)
par['betafunc_psi'] = [scipy.special.beta(par['psi'][t][0], par['psi'][t][1]) for t in range(par['T'])]
par['m'], par['n'] = self.ComputePosteriorEstimatesOfThetaAndPhi(par)
return par['m'], par['n'], par['psi']
【執行topic over time演算法】
'''
執行topic over time演算法
'''
from nlp_tot.tot import TopicsOverTime
import numpy as np
import pickle
if __name__ == "__main__":
datapath = 'C:/topics_over_time-master/data/'
resultspath = 'C:/topics_over_time-master/results/pnas_tot/'
documents_path = datapath + 'alltitles'
timestamps_path = datapath + 'alltimes'
stopwords_path = datapath + 'allstopwords'
tot_topic_vectors_path = resultspath + 'pnas_tot_topic_vectors.csv'
tot_topic_mixtures_path = resultspath + 'pnas_tot_topic_mixtures.csv'
tot_topic_shapes_path = resultspath + 'pnas_tot_topic_shapes.csv'
tot_pickle_path = resultspath + 'pnas_tot.pickle'
tot = TopicsOverTime()
documents, timestamps, dictionary = tot.GetPnasCorpusAndDictionary(documents_path, timestamps_path, stopwords_path)
par = tot.InitializeParameters(documents, timestamps, dictionary)
theta, phi, psi = tot.TopicsOverTimeGibbsSampling(par)
np.savetxt(tot_topic_vectors_path, phi, delimiter=',')
np.savetxt(tot_topic_mixtures_path, theta, delimiter=',')
np.savetxt(tot_topic_shapes_path, psi, delimiter=',')
tot_pickle = open(tot_pickle_path, 'wb')
pickle.dump(par, tot_pickle)
tot_pickle.close()
【主題-單詞分佈視覺化以及展示主題隨時間演變的β分佈】
操作之前需要匯入的包:
import scipy.special
import math
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
import pickle
1、主題-單詞分佈視覺化
def VisualizeTopics(phi, words, num_topics, viz_threshold=9e-3):
'''
視覺化主題-單詞分佈
:param phi: phi列表
:param words: 詞典列表
:param num_topics: 主題個數(10)
:param viz_threshold: 閾值
:return:
'''
phi_viz = np.transpose(phi) # 轉置
words_to_display = ~np.all(phi_viz <= viz_threshold, axis=1) # 測試沿橫軸所給定的元素是否小於等於viz_threshold,對結果取非
words_viz = [words[i] for i in range(len(words_to_display)) if words_to_display[i]] # 如果words_to_display[i]為True,儲存words[i]的值到words_viz
phi_viz= phi_viz[words_to_display] # 儲存phi_viz為True的值到phi_viz
# 繪圖操作
fig, ax = plt.subplots()
heatmap = plt.pcolor(phi_viz, cmap=plt.cm.Blues, alpha=0.8)
plt.colorbar() # 給子圖新增漸變色條
# fig.set_size_inches(8, 11)
ax.grid(False)
ax.set_frame_on(False)
ax.set_xticks(np.arange(phi_viz.shape[1]) + 0.5, minor=False)
ax.set_yticks(np.arange(phi_viz.shape[0]) + 0.5, minor=False)
ax.invert_yaxis()
ax.xaxis.tick_top()
# plt.xticks(rotation=45)
for t in ax.xaxis.get_major_ticks():
t.tick1On = False
t.tick2On = False
for t in ax.yaxis.get_major_ticks():
t.tick1On = False
t.tick2On = False
column_labels = words_viz # ['Word ' + str(i) for i in range(1,1000)]
row_labels = ['Topic ' + str(i) for i in range(1, num_topics + 1)]
ax.set_xticklabels(row_labels, minor=False)
ax.set_yticklabels(column_labels, minor=False)
plt.show()
2、主題隨時間演變的beta分佈
def VisualizeEvolution(psi):
'''
主題隨時間演變的beta分佈
:param psi:
:return:
'''
xs = np.linspace(0, 1, num=1000) # 建立位於0-1之間的1000個元素的陣列
fig, ax = plt.subplots()
for i in range(len(psi)):
ys = [math.pow(1-x, psi[i][0]-1) * math.pow(x, psi[i][1]-1) / scipy.special.beta(psi[i][0], psi[i][1]) for x in xs]
ax.plot(xs, ys, label='Topic ' + str(i+1))
ax.legend(loc='best', frameon=False)
plt.show()
3、執行上面的兩個方法
if __name__ == "__main__":
resultspath = 'C:/topics_over_time-master/results/pnas_tot/'
tot_pickle_path = resultspath + 'pnas_tot.pickle'
tot_pickle = open(tot_pickle_path, 'rb')
par = pickle.load(tot_pickle)
VisualizeTopics(par['n'], par['word_token'], par['T'])
VisualizeEvolution(par['psi'])
4、結果如下:
(1)主題-單詞分佈
(2)主題隨時間演變的beta分佈