python_NLP實戰之中文分詞技術
一、規則分詞
1.1 正向最大匹配演算法
# 正向最大匹配演算法 MM法 規則分詞 class MM(object): def __init__(self): self.window_size=3 def cut(self,text): result=[] index=0 text_length=len(text) dic=['研究','研究生','生命','命','的','起源'] while text_length>index: for size in range(self.window_size+index,index,-1): piece=text[index:size] if piece in dic: index=size-1 break index=index+1 result.append(piece+'-------') print(result) if __name__=='__main__': text='研究生命的起源' tokenizer=MM() print(tokenizer.cut(text))
1.2 逆向最大匹配演算法
# RMM逆向最大匹配演算法 規則分詞 class RMM(object): def __init__(self): self.window_size=3 def cut(self,text): result=[] index=len(text) dic=['研究','研究生','生命','命','的','起源'] while index>0: for size in range(index-self.window_size,index): piece=text[size:index] if piece in dic: index=size+1 break index=index-1 result.append(piece+'------') result.reverse() print(result) if __name__=='__main__': text = '研究生命的起源' tokenizer = RMM() print(tokenizer.cut(text))
二、統計分詞
2.1 HMM模型
初始概率分佈
z1可能是狀態1,狀態2 ... 狀態n,於是z1就有個N點分佈:
Z1 |
狀態1 |
狀態2 |
... |
狀態n |
概率 |
P1 |
P2 |
... |
Pn |
即:Z1對應個n維的向量。
上面這個n維的向量就是初始概率分佈,記做π。
狀態轉移矩陣
但Z2就不能簡單的“同上”完事了,因為Z2和Z1不獨立,所以Z2是狀態1的概率有:Z1是狀態1時Z2是狀態1,Z1是狀態2時Z2是狀態1,..., Z1是狀態n時Z2是狀態1,於是就是下面的表
Z2 Z1 |
狀態1 |
狀態2 |
... |
狀態n |
狀態1 |
P11 |
P12 |
... |
P1n |
狀態2 |
P21 |
P22 |
... |
P2n |
... |
... |
... |
... |
... |
狀態n |
Pn1 |
Pn2 |
... |
Pnn |
即:Z1->Z2對應個n*n的矩陣。
同理:Zi -> Zi+1對應個n*n的矩陣。
上面這些n*n的矩陣被稱為狀態轉移矩陣,用An*n表示。
當然了,真要說的話,Zi -> Zi+1的狀態轉移矩陣一定都不一樣,但在實際應用中一般將這些狀態轉移矩陣定為同一個,即:只有一個狀態轉移矩陣。
圖1的第一行就搞定了,下面是第二行。
觀測矩陣
如果對於zi有:狀態1, 狀態2, ..., 狀態n,那zi的每一個狀態都會從下面的m個觀測中產生一個:觀測1, 觀測2, ..., 觀測m,所以有如下矩陣:
X Z |
觀測1 |
觀測2 |
... |
觀測m |
狀態1 |
P11 |
P12 |
... |
P1m |
狀態2 |
P21 |
P22 |
... |
P2m |
... |
... |
... |
... |
... |
狀態n |
Pn1 |
Pn2 |
... |
Pnm |
這可以用一個n*m的矩陣表示,也就是觀測矩陣,記做Bn*m。
由於HMM用上面的π,A,B就可以描述了,於是我們就可以說:HMM由初始概率分佈π、狀態轉移概率分佈A以及觀測概率分佈B確定,為了方便表達,把A, B, π 用 λ 表示,即:
λ = (A, B, π)
例子
假設我們相對如下這行話進行分詞:
歡迎來到我的部落格
再假設我們是這樣分的:找到“終止字”,然後根據終止字來分詞。即:對於這行字,“迎、到、我、的、客”是終止字,於是最終這麼分詞:歡迎/來到/我/的/部落格
下面用上面的知識對這個例子建立HMM的A, B, π:
初始概率分佈的確定:
1,對於每個樣本,我們的目標是確定其是不是“終止字”,因此對於每個樣本,其狀態只有n=2個:狀態1 -- 是、狀態2 -- 不是。
2,因此初始概率分佈π為:
π = {p1,p2}
P1:整個句子中第一個字是非終止字的概率
P2:整個句子中第一個字是終止字的概率
狀態轉移矩陣的確定:
剛才已經知道狀態有n=2個,於是狀態轉移矩陣就立馬得出了,即狀態轉移矩陣是個n*n的矩陣,如下:
A=
p11:非終止字 -> 非終止字的概率。
p12:非終止字 -> 終止字的概率。
p21:終止字 -> 非終止字的概率。
p22:終止字 -> 終止字的概率。
觀測矩陣的確定:
如果我們的目標文字使用Unicode編碼,那麼上面的任何一個字都是0~65535中的一個數,於是我們的觀測就會有m=65536個,於是觀測矩陣就是個n*m的矩陣,如下:
B=
p1,0:Unicode編碼中0對應的漢字是非終止字的概率
p1,65535:Unicode編碼中65535對應的漢字是非終止字的概率
p2,0:Unicode編碼中0對應的漢字是終止字的概率
p2,65535:Unicode編碼中65535對應的漢字是終止字的概率
PS:為什麼x會有65535個觀測啊?“歡迎來到我的部落格”這個明明只有8個字。原因是因為真正的HMM面臨的情況,即:現有了 Z1=“非終止字”這個狀態,然後根據這個狀態從65535個字中選出x1=“歡”這個字,然後根據狀態轉移矩陣,下一次轉移到了Z2 =“終止字”,然後根據Z2從65535個字中選出了x2=“迎”這個字,這樣,最終生成了這句話。
# 統計分詞
# 1、先建立語言模型
# 2、對句子進行單詞劃分,對劃分結果進行概率計算,獲得概率最大的分詞方式
# HMM
class HMM(object):
def __init__(self):
import os
# 儲存訓練的模型
self.model_file='./data/hmm_model.pkl'
# 狀態特徵值集合
self.state_list=['B','M','E','S']
# 判斷是否需要重新載入模型
self.load_para=False
def try_load_model(self,trained):
if trained:
import pickle
with open(self.model_file,'rb' ) as f:
self.A_dic=pickle.load(f)
self.B_dic=pickle.load(f)
self.Pi_dic=pickle.load(f)
self.load_para=True
else:
# 狀態轉移概率 (狀態-》狀態的條件概率)
self.A_dic={}
# 發射概率 (狀態-》詞語的條件概率
self.B_dic={}
# 狀態的初始概率
self.Pi_dic={}
self.load_para=False
# 計算轉移概率,初始概率,發射概率
def train(self,path):
# 重置幾個概率矩陣
self.try_load_model(False)
# 統計狀態出現次數
Count_dic={}
def init_parameters():
for state in self.state_list:
self.A_dic[state]={s:0.0 for s in self.state_list}
self.Pi_dic[state]=0.0
self.B_dic[state]={}
Count_dic[state]=0
def makeLabel(text):
out_text=[]
if len(text)==1:
out_text.append(['S'])
else:
out_text+=['B']+['M']*(len(text)-2)+['E']
return out_text
init_parameters()
line_num=-1
words=set()
with open(path,encoding='utf-8') as f:
for line in f:
line_num+=1
line=line.strip()
if not line:
continue
word_list=[i for i in line if i!='']
words |=set(word_list)
linelist=line.split()
line_state=[]
for w in linelist:
line_state.extend(makeLabel(w))
assert len(word_list)==len(line_state)
for k, v in enumerate(line_state):
Count_dic[v]+=1
if k==0:
self.Pi_dic[v]+=1
else:
self.A_dic[line_state[k-1]][v]+=1
self.B_dic[line_state[k]][word_list[k]]=self.B_dic[line_state[k]].get(word_list[k],0)+1.0
self.Pi_dic={k: v*1.0/line_num for k,v in self.Pi_dic.items()}
self.A_dic={k:{k1: v1/Count_dic[k] for k1,v1 in v.items()}for k,v in self.A_dic.items()}
self.B_dic={k: {k1:(v1+1)/Count_dic[k] for k1,v1 in v.items()} for k,v in self.B_dic.items()}
import pickle
with open(self.model_file,'wb') as f:
pickle.dump(self.A_dic,f)
pickle.dump(self.B_dic,f)
pickle.dump(self.Pi_dic,f)
return self
def viterbi(self,text,states,start_p,train_p,emit_p):
V=[{}]
path={}
for y in states:
V[0][y]=start_p[y]*emit_p[y].get(text[0],0)
path[y]=[y]
for t in range(1,len(text)):
V.append({})
newpath={}
neverSeen=text[t] not in emit_p['S'].keys() and \
text[t] not in emit_p['M'].keys() and \
text[t] not in emit_p['E'].keys() and \
text[t] not in emit_p['B'].keys()
for y in states:
emitP=emit_p[y].get(text[t],0) if not neverSeen else 1.0
(prob,state)=max([(V[len(text)-1][y],y) for y in ('E','M')])
else:
(prob, state) = max([(V[len(text) - 1][y], y) for y in states])
return (prob,path[state])
def cut(self,text):
import os
if not self.load_para:
self.try_load_model(os.path.exists(self.model_file))
prob,pos_list=self.viterbi(text,self.state_list,self.Pi_dic,self.A_dic,self.B_dic)
begin,next=0,0
for i ,char in enumerate(text):
pos=pos_list[i]
if pos=='B':
begin=i
elif pos=='E':
yield text[begin:i+1]
next=i+1
elif pos=='S':
yield char
next=i+1
if next<len(text):
yield text[next:]
hmm=HMM()
hmm.train('./data/trainCorpus.txt_utf8')
2.2 CRF
三、中文分詞工具_JieBa
jieba分詞結合了基於規則和基於統計的兩種方法
基於漢字成詞的HMM模型,採用了Verterbi演算法進行推導
3.1高頻詞提取
高頻詞就是NLP中的TF策略
進行資料的讀取
def get_content(path):
with open(path,'r',encoding='utf-8',errors='ignore') as f:
content=''
for l in f:
l=l.strip()
content+=l
return content
def stop_words(path):
with open(path,encoding='utf-8') as f:
return [l.strip() for l in f]
定義高頻詞統計的函式,輸入是一個詞的陣列
def get_TF(words,topK=10):
tf_dic={ }
for w in words:
tf_dic[w]=tf_dic.get(w,0)+1
return sorted(tf_dic.items(),key =lambda x:x[1],reverse=True)[:topK]
def main():
import glob
import random
import jieba
files=glob.glob('./data/news/C000013/*.txt')
corpus=[get_content(x) for x in files]
sample_inx=random.randint(0,len(corpus))
split_words=[x for x in jieba.cut(corpus[sample_inx]) if x not in stop_words('./data/stop_words.utf8')]
print('樣本之一:' + corpus[sample_inx])
print('樣本分詞效果:' + '/ '.join(split_words))
print('樣本的topK(10)詞:' + str(get_TF(split_words)))
main()