樸素貝葉斯新聞分類器詳解
機器學習的三要素是模型、策略(使用Cost Function計算這個模型是不是好的)和優化演算法(不斷的尋找最優引數,找到一個引數後用策略判斷一下是不是可以,不行再找)。
一個具體的機器學習流程是怎麼樣的呢,下面使用樸素貝葉斯進行新聞分類進行一個完整的介紹。
1、特徵表示
一篇新聞中,可以把新聞中出現的詞作為特徵向量表示出來,如 X = {昨日,是,國內,投資,市場…}
2、特徵選擇
特徵中由於一些詞對分類沒有比較顯著的幫助,甚至會有導致一些噪音,我們需要去除,如“是”、“昨日”等,經過選擇的特徵可能是 X = {國內,投資,市場…}
3、模型選擇
這裡選擇樸素貝葉斯分類器,關於樸素貝葉斯可以參看劉未鵬的這篇
樸素貝葉斯本身非常簡單,但是很多情況下這種簡單的分類模型卻很有效,在我對新聞進行分類測試的過程中,很容易就能達到93%以上的準確率,個別分類的精度能達到99%。
樸素貝葉斯模型的基本公式為:
P(yi|X)=P(yi)*P(X|yi)P(X)
其中,各個引數的含義為:
$P(y_{i} |
X)$ 當前文件X屬於分類Yi的概率 |
---|
- $P(y{i})分類y{i}$的先驗概率,即在整個訓練集中這種分類的比率
- P(X) 文件X的先驗概率,每個文件在資料集中都是1/N的概率,可忽略
$P(X |
y{i})在分類y{i}$中,文件X存在的概率 |
---|
對於第4條,文件X存在於yi中的概率,可以按照文件X中每個詞在Yi中的概率相乘獲得,即:
P(X|yi)=∏jP(xj|yi)
所以貝葉斯公式可以變形為:
P(yi|X)=P(yi)*∏jP(xj|yi)P(X)
其中,上面的那個引數$P(y{i})和P(x{j} |
y_{i})$可以根據最大似然估計法來估算,即: |
---|
P(yi)=Count(yi)Count(X)
表示當前分類的先驗概率為當前分類下的文件數除以所有文件.
P(xj|yi)=Count(xj)Count(yi)
表示當前分類下出現的所有Xj詞數除以當前分類下所有的詞數.
這裡需要注意的是,$P(x_{j} |
y_{i})$的計算方式主要有兩種,上面所說的是新聞分類實踐中效果較好的一種,叫做多項分佈(Multinomial Distribution),就是單個文件中重複出現某個詞的時候,出現幾次計數幾次,分母為所有詞彙的總數。 |
---|
另一種計算方式為,01分佈(Binary Distribution),即單個文件中出現多次的詞只計數一次,分母為所有文章的個數,而不是詞的個數。
可能出現的問題一:
在進行預測的時候,如某篇文章包含“中國澳門”這個詞,使用上面變形後的貝葉斯公式計算該文章是“體育”分類的時候,假如“體育”分類下從來沒有出現過“中國澳門”這個詞,就會導致
P(xj|yi)=fracCount(xj)Count(xi)=0
進一步導致整個貝葉斯概率為0,這是不合理的,所以我們要避免沒有出現過的詞概率為0的情況。
這裡我們只需要使用一個平滑引數,讓上式的分子分母同時加上一個小值即可,假如分子加上 lambda ,則分母需要加上 N * lambda,其中N為所有單詞的去重後數量(這是因為分子為每一個詞彙都要計算一次)。
這樣就變成了:
P(xj|yi)=fracCount(xj)+NCount(xi)+N*λ
可能出現的問題二:
由於貝葉斯公式中是所有的$P(x{j}|y{i})$求積,概率求積很可能遇到浮點數溢位的情況,這時候我們需要變通一下,把求積轉換成求和,只需要對貝葉斯公式中分子求log即可(log(a * b) = log(a) + log(b)):
4、訓練資料準備
我所使用的訓練資料集為一批已經分好詞的文字檔案,檔名中包含它們所屬的分類(auto、sports、business),為了讓模型訓練的時候更方便的讀取和使用,我們把資料集按照一定比例(如80%)分為訓練集和測試集:
#!/usr/bin/env python# encoding: utf-8""" author: [email protected]"""import osimport randomimport reclass DataPrepare(object):
"""處理原始資料,為機器學習模型的訓練作準備"""
def __init__(self, input_dir, train_data_file, test_data_file, train_file_percentage):
self.input_dir = input_dir
self.train_data_file = open(train_data_file,'w')
self.test_data_file = open(test_data_file,'w')
self.train_file_percentage = train_file_percentage
self.unique_words = []
# 每一個單詞都使用一個數字型別的id表示,python索引的時候才會快一些
self.word_ids = {}
def __del__(self):
self.train_data_file.close()
self.test_data_file.close()
def prepare(self):
file_num = 0
output_file = self.test_data_file
for file_name in os.listdir(self.input_dir):
# arr = (1234,'business')
arr = re.findall(r'(d+)(w+)',file_name)[0]
category = arr[1]
# 隨即函式按照train_file_percentage指定的百分比來選擇訓練和測試資料及
if random.random() < self.train_file_percentage:
output_file = self.train_data_file
else:
output_file = self.test_data_file
# 讀取檔案獲得片語
words = []
with open(self.input_dir + '/' + file_name,'r') as f:
words = f.read().decode('utf-8').split()
output_file.write(category + ' ')
for word in words:
if word not in self.word_ids:
self.unique_words.append(word)
# 可以取Hash,這裡為了簡便期間,直接使用當前陣列的長度(也是唯一的)
self.word_ids[word] = len(self.unique_words)
output_file.write(str(self.word_ids[word]) + " ")
output_file.write("#"+file_name+"n")
# 原始檔案較多,需要互動顯示進度
file_num += 1
if file_num % 100 == 0:
print file_num,' files processed'
print file_num, " files loaded!"
print len(self.unique_words), " unique words found!"if __name__ == '__main__':
dp = DataPrepare('newsdata','news.train','news.test',0.8)
dp.prepare()
5、模型訓練
在模型訓練的部分,我們需要的是求出模型公式中所有需要的引數,這樣預測的時候可以直接呼叫用來預測一個新聞的分類。
模型訓練的目標是獲得一個概率矩陣:
分類 單詞1 單詞2 ... 單詞n
體育 0.0123 0.0003 ... 0.00014
商業 0.0034 0.0351 ... 0.1342
需要注意的是,某個單詞可能不在其中一個分類中,這時候該單詞在該分類下的概率就是上面提到的拉普拉斯平滑取得的預設概率,由於這種單詞可能非常多,所以我們可以單獨使用一個map來儲存預設概率,遇到某分類下沒有的單詞的時候不再增加新的儲存空間。
#!/usr/bin/env python# coding: utf-8""" author: [email protected]"""class NavieBayes(object):
"""樸素貝葉斯模型"""
def __init__(self,train_data_file,model_file):
self.train_data_file = open(train_data_file,'r')
self.model_file = open(model_file,'w')
# 儲存每一種型別出現的次數
self.class_count = {}
# 儲存每一種型別下各個單詞出現的次數
self.class_word_count = {}
# 唯一單詞總數
self.unique_words = {}
# ~~~~~~~~~~ NavieBayes引數 ~~~~~~~~~~~~#
# 每個類別的先驗概率
self.class_probabilities = {}
# 拉普拉斯平滑,防止概率為0的情況出現
self.laplace_smooth = 0.1
# 模型訓練結果集
self.class_word_prob_matrix = {}
# 當某個單詞在某類別下不存在時,預設的概率(拉普拉斯平滑後)
self.class_default_prob = {}
def __del__(self):
self.train_data_file.close()
self.model_file.close()
def loadData(self):
line_num = 0
line = self.train_data_file.readline().strip()
while len(line) > 0:
words = line.split('#')[0].split()
category = words[0]
if category not in self.class_count:
self.class_count[category] = 0
self.class_word_count[category] = {}
self.class_word_prob_matrix[category] = {}
self.class_count[category] += 1
for word in words[1:]:
word_id = int(word)
if word_id not in self.unique_words:
self.unique_words[word_id] = 1
if word_id not in self.class_word_count[category]:
self.class_word_count[category][word_id] = 1
else:
self.class_word_count[category][word_id] += 1
line = self.train_data_file.readline().strip()
line_num += 1
if line_num % 100 == 0:
print line_num,' lines processed'
print line_num,' training instances loaded'
print len(self.class_count), " categories!", len(self.unique_words), "words!"
def computeModel(self):
# 計算P(Yi)
news_count = 0
for count in self.class_count.values():
news_count += count
for class_id in self.class_count.keys():
self.class_probabilities[class_id] = float(self.class_count[class_id]) / news_count
# 計算P(X|Yi) <===> 計算所有 P(Xi|Yi)的積 <===> 計算所有 Log(P(Xi|Yi)) 的和
for class_id in self.class_word_count.keys():
# 當前類別下所有單詞的總數
sum = 0.0
for word_id in self.class_word_count[class_id].keys():
sum += self.class_word_count[class_id][word_id]
count_Yi = (float)(sum + len(self.unique_words)*self.laplace_smooth)
# 計算單個單詞在某類別下的概率,儲存在結果矩陣中,所有當前類別沒有的單詞賦予預設概率(即使用拉普拉斯平滑)
for word_id in self.class_word_count[class_id].keys():
self.class_word_prob_matrix[class_id][word_id] = (float)(self.class_word_count[class_id][word_id]+self.laplace_smooth) / count_Yi
self.class_default_prob[class_id] = (float)(self.laplace_smooth) / count_Yi
print class_id,' matrix finished, length = ',len(self.class_word_prob_matrix[class_id])
return
def saveModel(self):
# 把每個分類的先驗概率寫入檔案
for class_id in self.class_probabilities.keys():
self.model_file.write(class_id)
self.model_file.write(' ')
self.model_file.write(str(self.class_probabilities[class_id]))
self.model_file.write(' ')
self.model_file.write(str(self.class_default_prob[class_id]))
self.model_file.write('#')
self.model_file.write('n')
# 把每個單詞在當前類別的概率寫入檔案
for class_id in self.class_word_prob_matrix.keys():
self.model_file.write(class_id + ' ')
for word_id in self.class_word_prob_matrix[class_id].keys():
self.model_file.write(str(word_id) + ' ' + str(self.class_word_prob_matrix[class_id][word_id]))
self.model_file.write(' ')
self.model_file.write('n')
return
def train(self):
self.loadData()
self.computeModel()
self.saveModel()if __name__ == '__main__':
nb = NavieBayes('news.train','news.model')
nb.train()
6、預測(分類)和評價
預測部分直接使用樸素貝葉斯公式,計算當前新聞分別屬於各個分類的概率,選擇概率最大的那個分類輸出。
由於第5步已經計算出來概率矩陣和P(yi)的值,所以預測的時候直接呼叫樸素貝葉斯函式即可,對測試資料集預測後計算其準確性、精確度等即可。
#!/usr/bin/env python#coding: utf-8""" author: [email protected]"""import mathclass NavieBayesPredict(object):
"""使用訓練好的模型進行預測"""
def __init__(self, test_data_file, model_data_file, result_file):
self.test_data_file = open(test_data_file,'r')
self.model_data_file = open(model_data_file,'r')
# 對測試資料集預測的結果檔案
self.result_file = open(result_file,'w')
# 每個類別的先驗概率
self.class_probabilities = {}
# 拉普拉斯平滑,防止概率為0的情況出現
self.laplace_smooth = 0.1
# 模型訓練結果集
self.class_word_prob_matrix = {}
# 當某個單詞在某類別下不存在時,預設的概率(拉普拉斯平滑後)
self.class_default_prob = {}
# 所有單詞
self.unique_words = {}
# 實際的新聞分類
self.real_classes = []
# 預測的新聞分類
self.predict_classes = []
def __del__(self):
self.test_data_file.close()
self.model_data_file.close()
self.result_file.close()
def loadModel(self):
# 從模型檔案的第一行讀取類別的先驗概率
class_probs = self.model_data_file.readline().split('#')
for cls in class_probs:
arr = cls.split()
if len(arr) == 3:
self.class_probabilities[arr[0]] = float(arr[1])
self.class_default_prob[arr[0]] = float(arr[2])
# 從模型檔案讀取單詞在每個類別下的概率
line = self.model_data_file.readline().strip()
while len(line) > 0:
arr = line.split()
assert(len(arr) % 2 == 1)
assert(arr[0] in self.class_probabilities)
self.class_word_prob_matrix[arr[0]] = {}
i = 1
while i < len(arr):
word_id = int(arr[i])
probability = float(arr[i+1])
if word_id not in self.unique_words:
self.unique_words[word_id] = 1
self.class_word_prob_matrix[arr[0]][word_id] = probability
i += 2
line = self.model_data_file.readline().strip()
print len(self.class_probabilities), " classes loaded!", len(self.unique_words), "words!"
def caculate(self):
# 讀取測試資料集
line = self.test_data_file.readline().strip()
while len(line) > 0:
arr = line.split()
class_id = arr[0]
words = arr[1:len(arr)-1]
# 把真實的分類儲存起來
self.real_classes.append(class_id)
# 預測當前行(一個新聞)屬於各個分類的概率
class_score = {}
for key in self.class_probabilities.keys():
class_score[key] = math.log(self.class_probabilities[key])
for word_id in words:
word_id = int(word_id)
if word_id not in self.unique_words:
continue
for class_id in self.class_probabilities.keys():
if word_id not in self.class_word_prob_matrix[class_id]:
class_score[class_id] += math.log(self.class_default_prob[class_id])
else:
class_score[class_id] += math.log(self.class_word_prob_matrix[class_id][word_id])
# 對於當前新聞,所屬的概率最高的分類
max_class_score = max(class_score.values())
for key in class_score.keys():
if class_score[key] == max_class_score:
self.predict_classes.append(key)
line = self.test_data_file.readline().strip()
print len(self.real_classes),len(self.predict_classes)
def evaluation(self):
# 評價當前分類器的準確性
accuracy = 0
i = 0
while i < len(self.real_classes):
if self.real_classes[i] == self.predict_classes[i]:
accuracy += 1
i += 1
accuracy = (float)(accuracy)/(float)(len(self.real_classes))
print "Accuracy:",accuracy
# 評測精度和召回率
# 精度是指所有預測中,正確的預測
# 召回率是指所有物件中被正確預測的比率
for class_id in self.class_probabilities:
correctNum = 0
allNum = 0
predNum = 0
i = 0
while i < len(self.real_classes):
if self.real_classes[i] == class_id:
allNum += 1
if self.predict_classes[i] == self.real_classes[i]:
correctNum += 1
if self.predict_classes[i] == class_id:
predNum += 1
i += 1
precision = (float)(correctNum)/(float)(predNum)
recall = (float)(correctNum)/(float)(allNum)
print class_id,' -> precision = ',precision,' recall = ',recall
def predict(self):
self.loadModel()
self.caculate()
self.evaluation()if __name__ == '__main__':
nbp = NavieBayesPredict('news.test','news.model','news.result')
nbp.predict()