深度學習專案實戰--對於評論的情感分析
阿新 • • 發佈:2018-11-10
標籤: 機器學習
該專案通過分析影評進行判斷該評價的情感方向
專案準備:
1. 關於影評的資料集
2. 關於影評的情感標籤
3. python的各種運算庫
關於影評的資料集與情感標籤點選此處下載
運算庫請自行下載
(好吧,圖文無關)
實現思想
對影評的每一個單詞進行提取,通過神經網路找到單詞之間與情緒的聯絡,進而進行預測
實現效果
準確率達85%以上,運算速率可達7000條影評/秒
現在開始我們的專案程式碼
1.首先我們要讀入影評與情感標籤
g = open('reviews.txt' ,'r') # What we know!
reviews = list(map(lambda x:x[:-1],g.readlines()))
g.close()
g = open('labels.txt','r') # What we WANT to know!
labels = list(map(lambda x:x[:-1].upper(),g.readlines()))
g.close()
2.引入需要使用的庫
from collections import Counter
import numpy as np
import time
import sys
import numpy as np
3.實現神經網路
class SentimentNetwork:
def __init__(self, reviews,labels,min_count = 10,polarity_cutoff = 0.1,hidden_nodes = 10, learning_rate = 0.1):
np.random.seed(1)
################神經網路的資料預處理#################
self.pre_process_data(reviews, labels, polarity_cutoff, min_count)
##########神經網路的資料初始化###########
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
###################################################
#############神經網路的資料預處理函式實現#############
###################################################
def pre_process_data(self, reviews, labels, polarity_cutoff, min_count):
###建立三個計數器分別對正面,負面,所有,進行計數
positive_counts = Counter()
negative_counts = Counter()
total_counts = Counter()
#對正面評論的單詞進行計數
for i in range(len(reviews)):
if(labels[i] == 'POSITIVE'):
for word in reviews[i].split(" "):
positive_counts[word] += 1
total_counts[word] += 1
#對負面評價的單詞進行計數
else:
for word in reviews[i].split(" "):
negative_counts[word] += 1
total_counts[word] += 1
###建立一個比率計數器
pos_neg_ratios = Counter()
###對正面與反面的評論單詞的比率進行計數
###正面比率大則為正數,反面比率大則為負數
for term,cnt in list(total_counts.most_common()):
if(cnt >= 50):
pos_neg_ratio = positive_counts[term] / float(negative_counts[term]+1)
pos_neg_ratios[term] = pos_neg_ratio
for word,ratio in pos_neg_ratios.most_common():
if(ratio > 1):
pos_neg_ratios[word] = np.log(ratio)
else:
pos_neg_ratios[word] = -np.log((1 / (ratio + 0.01)))
###只對出現總次數大於min_count以及比率介於±polarity_cutoff之間的單詞進行統計
review_vocab = set()
for review in reviews:
for word in review.split(" "):
if(total_counts[word] > min_count):
if(word in pos_neg_ratios.keys()):
if((pos_neg_ratios[word] >= polarity_cutoff) or (pos_neg_ratios[word] <= -polarity_cutoff)):
review_vocab.add(word)
else:
review_vocab.add(word)
#將詞彙錶轉換為一個列表,這樣我們就可以通過索引訪問單詞。
self.review_vocab = list(review_vocab)
# 對單詞所對應的標籤進行填充
label_vocab = set()
for label in labels:
label_vocab.add(label)
# 將標籤詞彙錶轉換為一個列表,這樣我們就可以通過索引訪問標籤。
self.label_vocab = list(label_vocab)
#儲存影評和標籤詞彙陣列的大小。
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
#對索引的影評與標籤重新編寫字典
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
###################################################
##########神經網路的資料初始化的函式實現###############
###################################################
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
#輸入層、隱藏層、輸出層的節點數量
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
#學習速率
self.learning_rate = learning_rate
#權重初始化
self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
#初始化隱藏層資料
self.layer_1 = np.zeros((1,hidden_nodes))
#############標籤數字化##############
def get_target_for_label(self,label):
if(label == 'POSITIVE'):
return 1
else:
return 0
###############啟用函式:sigmoid函式################
def sigmoid(self,x):
return 1 / (1 + np.exp(-x))
###############sigmoid函式的倒數################
def sigmoid_output_2_derivative(self,output):
return output * (1 - output)
###########################################################
###################訓練函式程式碼實現########################
###########################################################
def train(self, training_reviews_raw, training_labels):
#標記條影評中每個出現的單詞,對應在字典中記錄下來作為輸入層
training_reviews = list()
for review in training_reviews_raw:
indices = set()
for word in review.split(" "):
if(word in self.word2index.keys()):
indices.add(self.word2index[word])
training_reviews.append(list(indices))
# 確保每個影評都有且僅有一個標籤與其對應
assert(len(training_reviews) == len(training_labels))
#記錄預測正確的數量
correct_so_far = 0
# 記錄時間
start = time.time()
#對每條影評學習的迴圈
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
#### 實現前向傳播 ####
# 隱藏層的計算
self.layer_1 *= 0
for index in review:
self.layer_1 += self.weights_0_1[index]
# 輸出層的計算
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
### 反向傳播的實現 ###
# 輸出誤差計算
layer_2_error = layer_2 - self.get_target_for_label(label)
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
# 反向傳播誤差計算
layer_1_error = layer_2_delta.dot(self.weights_1_2.T)
layer_1_delta = layer_1_error
# 更新權重
self.weights_1_2 -= self.layer_1.T.dot(layer_2_delta) * self.learning_rate
for index in review:
self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate # update input-to-hidden weights with gradient descent step
# 對預測情況進行判斷
if(layer_2 >= 0.5 and label == 'POSITIVE'):
correct_so_far += 1
elif(layer_2 < 0.5 and label == 'NEGATIVE'):
correct_so_far += 1
# 對預測以及學習情況進行即時輸出
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) \
+ " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
if(i % 2500 == 0):
print("")
###################################################
###################測試函式的實現##################
###################################################
def test(self, testing_reviews, testing_labels):
#用於直接測試的函式,沒有train函式的權重更新
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct) + " #Tested:" + str(i+1) \
+ " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
#################################################
####################run函式的實現################
#################################################
def run(self, review):
#該函式通過資料的前向傳播直接輸出預測結果
self.layer_1 *= 0
unique_indices = set()
for word in review.lower().split(" "):
if word in self.word2index.keys():
unique_indices.add(self.word2index[word])
for index in unique_indices:
self.layer_1 += self.weights_0_1[index]
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
if(layer_2[0] >= 0.5):
return "POSITIVE"
else:
return "NEGATIVE"
最後開始對資料進行學習
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000],min_count=20,polarity_cutoff=0.8,learning_rate=0.01)
mlp.train(reviews[:-1000],labels[:-1000])
通過測試對學習效果進行評定
mlp.test(reviews[-1000:],labels[-1000:])