基於CRF的中文命名實體識別模型
阿新 • • 發佈:2018-11-30
條件隨機場(Conditional Random Fields,簡稱 CRF)是給定一組輸入序列條件下另一組輸出序列的條件概率分佈模型,在自然語言處理中得到了廣泛應用。
新建corpus_process類
import re import sklearn_crfsuite from sklearn_crfsuite import metrics from sklearn.externals import joblib class CorpusProcess(object): def __init__(self): """初始化""" self.train_corpus_path = "D://input_py//day15//1980_01rmrb.txt" self.process_corpus_path = "D://input_py//day15//result-rmrb.txt" self._maps = {u't': u'T',u'nr': u'PER', u'ns': u'ORG',u'nt': u'LOC'} def read_corpus_from_file(self, file_path): """讀取語料""" f = open(file_path, 'r')#,encoding='utf-8' lines = f.readlines() f.close() return lines def write_corpus_to_file(self, data, file_path): """寫語料""" f = open(file_path, 'wb') f.write(data) f.close() def q_to_b(self,q_str): """全形轉半形""" b_str = "" for uchar in q_str: inside_code = ord(uchar) if inside_code == 12288: # 全形空格直接轉換 inside_code = 32 elif 65374 >= inside_code >= 65281: # 全形字元(除空格)根據關係轉化 inside_code -= 65248 b_str += chr(inside_code) return b_str def b_to_q(self,b_str): """半形轉全形""" q_str = "" for uchar in b_str: inside_code = ord(uchar) if inside_code == 32: # 半形空格直接轉化 inside_code = 12288 elif 126 >= inside_code >= 32: # 半形字元(除空格)根據關係轉化 inside_code += 65248 q_str += chr(inside_code) return q_str def pre_process(self): """語料預處理 """ lines = self.read_corpus_from_file(self.train_corpus_path) new_lines = [] for line in lines: words = self.q_to_b(line.strip()).split(u' ') pro_words = self.process_t(words) pro_words = self.process_nr(pro_words) pro_words = self.process_k(pro_words) new_lines.append(' '.join(pro_words[1:])) self.write_corpus_to_file(data='\n'.join(new_lines).encode('utf-8'), file_path=self.process_corpus_path) def process_k(self, words): """處理大粒度分詞,合併語料庫中括號中的大粒度分詞,類似:[國家/n 環保局/n]nt """ pro_words = [] index = 0 temp = u'' while True: word = words[index] if index < len(words) else u'' if u'[' in word: temp += re.sub(pattern=u'/[a-zA-Z]*', repl=u'', string=word.replace(u'[', u'')) elif u']' in word: w = word.split(u']') temp += re.sub(pattern=u'/[a-zA-Z]*', repl=u'', string=w[0]) pro_words.append(temp+u'/'+w[1]) temp = u'' elif temp: temp += re.sub(pattern=u'/[a-zA-Z]*', repl=u'', string=word) elif word: pro_words.append(word) else: break index += 1 return pro_words def process_nr(self, words): """ 處理姓名,合併語料庫分開標註的姓和名,類似:溫/nr 家寶/nr""" pro_words = [] index = 0 while True: word = words[index] if index < len(words) else u'' if u'/nr' in word: next_index = index + 1 if next_index < len(words) and u'/nr' in words[next_index]: pro_words.append(word.replace(u'/nr', u'') + words[next_index]) index = next_index else: pro_words.append(word) elif word: pro_words.append(word) else: break index += 1 return pro_words def process_t(self, words): """處理時間,合併語料庫分開標註的時間詞,類似: (/w 一九九七年/t 十二月/t 三十一日/t )/w """ pro_words = [] index = 0 temp = u'' while True: word = words[index] if index < len(words) else u'' if u'/t' in word: temp = temp.replace(u'/t', u'') + word elif temp: pro_words.append(temp) pro_words.append(word) temp = u'' elif word: pro_words.append(word) else: break index += 1 return pro_words def pos_to_tag(self, p): """由詞性提取標籤""" t = self._maps.get(p, None) return t if t else u'O' def tag_perform(self, tag, index): """標籤使用BIO模式""" if index == 0 and tag != u'O': return u'B_{}'.format(tag) elif tag != u'O': return u'I_{}'.format(tag) else: return tag def pos_perform(self, pos): """去除詞性攜帶的標籤先驗知識""" if pos in self._maps.keys() and pos != u't': return u'n' else: return pos def initialize(self): """初始化 """ lines = self.read_corpus_from_file(self.process_corpus_path) words_list = [line.strip().split(' ') for line in lines if line.strip()] del lines self.init_sequence(words_list) def init_sequence(self, words_list): """初始化字序列、詞性序列、標記序列 """ words_seq = [[word.split(u'/')[0] for word in words] for words in words_list] pos_seq = [[word.split(u'/')[1] for word in words] for words in words_list] tag_seq = [[self.pos_to_tag(p) for p in pos] for pos in pos_seq] self.pos_seq = [[[pos_seq[index][i] for _ in range(len(words_seq[index][i]))] for i in range(len(pos_seq[index]))] for index in range(len(pos_seq))] self.tag_seq = [[[self.tag_perform(tag_seq[index][i], w) for w in range(len(words_seq[index][i]))] for i in range(len(tag_seq[index]))] for index in range(len(tag_seq))] self.pos_seq = [[u'un']+[self.pos_perform(p) for pos in pos_seq for p in pos]+[u'un'] for pos_seq in self.pos_seq] self.tag_seq = [[t for tag in tag_seq for t in tag] for tag_seq in self.tag_seq] self.word_seq = [[u'<BOS>']+[w for word in word_seq for w in word]+[u'<EOS>'] for word_seq in words_seq] def extract_feature(self, word_grams): """特徵選取""" features, feature_list = [], [] for index in range(len(word_grams)): for i in range(len(word_grams[index])): word_gram = word_grams[index][i] feature = {u'w-1': word_gram[0], u'w': word_gram[1], u'w+1': word_gram[2], u'w-1:w': word_gram[0]+word_gram[1], u'w:w+1': word_gram[1]+word_gram[2], # u'p-1': self.pos_seq[index][i], u'p': self.pos_seq[index][i+1], # u'p+1': self.pos_seq[index][i+2], # u'p-1:p': self.pos_seq[index][i]+self.pos_seq[index][i+1], # u'p:p+1': self.pos_seq[index][i+1]+self.pos_seq[index][i+2], u'bias': 1.0} feature_list.append(feature) features.append(feature_list) feature_list = [] return features def segment_by_window(self, words_list=None, window=3): """視窗切分""" words = [] begin, end = 0, window for _ in range(1, len(words_list)): if end > len(words_list): break words.append(words_list[begin:end]) begin = begin + 1 end = end + 1 return words def generator(self): """訓練資料""" word_grams = [self.segment_by_window(word_list) for word_list in self.word_seq] features = self.extract_feature(word_grams) return features, self.tag_seq
再建test類
import sys reload(sys) sys.setdefaultencoding('utf8') import sklearn_crfsuite,joblib from sklearn_crfsuite import metrics import base_corpus_process class CRF_NER(object): def __init__(self): """初始化引數""" self.algorithm = "lbfgs" self.c1 ="0.1" self.c2 = "0.1" self.max_iterations = 100 self.model_path = "D://input_py//day15//model.pkl" self.corpus = base_corpus_process.CorpusProcess() #Corpus 例項 self.corpus.pre_process() #語料預處理 self.corpus.initialize() #初始化語料 self.model = None def initialize_model(self): """初始化""" algorithm = self.algorithm c1 = float(self.c1) c2 = float(self.c2) max_iterations = int(self.max_iterations) self.model = sklearn_crfsuite.CRF(algorithm=algorithm, c1=c1, c2=c2, max_iterations=max_iterations, all_possible_transitions=True) def train(self): """訓練""" self.initialize_model() x, y = self.corpus.generator() x_train, y_train = x[500:], y[500:] x_test, y_test = x[:500], y[:500] self.model.fit(x_train, y_train) labels = list(self.model.classes_) labels.remove('O') y_predict = self.model.predict(x_test) metrics.flat_f1_score(y_test, y_predict, average='weighted', labels=labels) sorted_labels = sorted(labels, key=lambda name: (name[1:], name[0])) print(metrics.flat_classification_report(y_test, y_predict, labels=sorted_labels, digits=3)) self.save_model() def predict(self, sentence): """預測""" self.load_model() u_sent = self.corpus.q_to_b(sentence) word_lists = [[u'<BOS>']+[c for c in u_sent]+[u'<EOS>']] word_grams = [self.corpus.segment_by_window(word_list) for word_list in word_lists] features = self.corpus.extract_feature(word_grams) y_predict = self.model.predict(features) entity = u'' for index in range(len(y_predict[0])): if y_predict[0][index] != u'O': if index > 0 and y_predict[0][index][-1] != y_predict[0][index-1][-1]: entity += u' ' entity += u_sent[index] elif entity[-1] != u' ': entity += u' ' return entity def load_model(self): """載入模型 """ self.model = joblib.load(self.model_path) def save_model(self): """儲存模型""" joblib.dump(self.model, self.model_path) ner = CRF_NER() model = ner.train()
執行得到的準確率和召回率:
precision recall f1-score support B_LOC 0.944 0.827 0.882 266 I_LOC 0.878 0.796 0.835 1203 B_ORG 0.942 0.911 0.926 682 I_ORG 0.939 0.869 0.903 997 B_PER 0.981 0.927 0.953 440 I_PER 0.975 0.945 0.960 824 B_T 0.989 0.989 0.989 444 I_T 0.993 0.994 0.993 1099 avg / total 0.949 0.904 0.925 5955