1. 程式人生 > 實用技巧 >【敏感詞檢測】用DFA構建字典樹完成敏感詞檢測任務

【敏感詞檢測】用DFA構建字典樹完成敏感詞檢測任務

任務概述

敏感詞檢測是各類平臺對使用者釋出內容(UGC)進行稽核的必做任務。
對於文字內容做敏感詞檢測,最簡單直接的方法就是規則匹配。構建一個敏感詞詞表,然後與文字內容進行匹配,如發現有敏感詞,則提交報告給人工稽核或者直接加以遮蔽。
當然也可以用機器學習的方法來做,不過需要收集及標註大量資料,有條件的話也可以加以實現。

任務難點及解決策略

1)對抗檢測的場景:比如同音替換、字形替換、隱喻暗指、詞中間插入特殊字元等。

解決策略:特殊字元可以使用特殊字元詞表過濾,其他幾種不好解決,主要通過擴大敏感詞表規模來。

2)斷章取義的問題:從語義上理解沒有問題,但按視窗大小切出幾個字來看,卻屬於敏感詞,造成誤報。

解決策略:這個問題主要是分詞錯誤導致的,應當考慮分詞規則,而不是無腦遍歷,或者用正則匹配。

3)檢測效率問題:隨著詞表的增大,迴圈查詢詞表的速度會變得很慢。

解決策略:使用DFA演算法構建字典樹。

4)詞的歧義問題:一個詞某個義項是違規的,但其他義項或許是正常的。

解決策略:這個要結合上下文考慮,機器學習類的方法比較容易解決這一問題。

5)詞表質量問題:從網路上獲取得到的敏感詞詞表,要麼包含詞彙較少,不能滿足檢測需求,要麼包含詞彙過多,檢測出了很多當前業務場景下不需要遮蔽的詞。

解決策略:需要定期人工整理。

敏感詞字典樹的構建

構建字典樹使用的是確定有限自動機(DFA)。DFA演算法的核心是建立了以敏感詞為基礎的許多敏感詞樹。
它的基本思想是基於狀態轉移來檢索敏感詞,只需要掃描一次待檢測文字(長度n),就能對所有敏感詞進行檢測。
且DFA演算法的時間複雜度O(n)基本上是與敏感詞的個數無關的,只與文字長度有關。

如下圖所示,比如abcd,abd,bcd,efg,hij這幾個詞在樹中表示如下。
中文的常用字只有四五千個,但由這些字構成的詞卻難以計數,如果迴圈遍歷,時間消耗極大,而通過字典樹,沿著根節點向下,每走一步就可以極大地縮小搜尋空間。

程式碼實現

import jieba

MinMatch=1 #最小匹配原則
MaxMatch=2 #最大匹配原則

class SensitiveWordDetect:
    def __init__(self, Dir_sensitive, Dir_stopWord):
        # 載入敏感詞庫
        sensitive_lst = []
        with open(Dir_sensitive, 'r', encoding = 'utf-8') as file:
            sensitive_lst = file.readlines()
            
        self.sensitiveWordList = sorted([i.split('\n')[0] for i in sensitive_lst])
        
        # 載入特殊符號停用詞
        stop_w_lst = []
        with open(Dir_stopWord, 'r', encoding = 'utf-8') as file:
            stop_w_lst = file.readlines()
        self.stopWordList = [i.split('\n')[0] for i in stop_w_lst]
        
        # 得到sensitive字典
        self.sensitiveWordMap = self.initSensitiveWordMap(self.sensitiveWordList)
        
    # 構建敏感詞庫
    def initSensitiveWordMap(self, sensitiveWordList):
        sensitiveWordMap = {}
        # 讀取每一行,每一個word都是一個敏感詞
        for word in sensitiveWordList:
            nowMap=sensitiveWordMap
            # 遍歷該敏感詞的每一個特定字元
            for i in range(len(word)):
                keychar=word[i]
                wordMap=nowMap.get(keychar)
                if wordMap !=None:
                    # nowMap更新為下一層
                    nowMap=wordMap
                else:
                    # 不存在則構建一個map,isEnd設定為0,因為不是最後一個
                    newNextMap={}
                    newNextMap["isEnd"]=0
                    nowMap[keychar]=newNextMap
                    nowMap=newNextMap
                # 到這個詞末尾字元
                if i==len(word)-1:
                    nowMap["isEnd"]=1
        
        return sensitiveWordMap
        
    def checkSensitiveWord(self,txt,beginIndex=0,matchModel=MinMatch):
        '''
        :param txt: 輸入待檢測的文字
        :param beginIndex: 輸入文字開始的下標
        :return: 返回敏感詞字元的長度
        '''
        nowMap=self.sensitiveWordMap
        sensitiveWordLen=0 # 敏感詞的長度
        containChar_sensitiveWordLen=0 # 包括特殊字元敏感詞的長度
        endFlag=False # 結束標記位

        for i in range(beginIndex,len(txt)):
            char=txt[i]
            if char in self.stopWordList:
                containChar_sensitiveWordLen+=1
                continue

            nowMap=nowMap.get(char)
            if nowMap != None:
                sensitiveWordLen+=1
                containChar_sensitiveWordLen+=1
                # 結束位置為True
                if nowMap.get("isEnd")==1:
                    endFlag=True
                    # 最短匹配原則
                    if matchModel==MinMatch:
                        break
            else:
                break
        if  endFlag==False:
            containChar_sensitiveWordLen=0
        
        return containChar_sensitiveWordLen
        
    def getSensitiveWord(self,txt):
        # 去除停止詞
        new_txt = ''
        for char in txt:
            if char not in self.stopWordList:
                new_txt += char
        # 然後分詞
        seg_list = list(jieba.cut(new_txt, cut_all=False))
        
        cur_txt_sensitiveList=[]
        # 注意,並不是一個個char查詢的,找到敏感詞會增強敏感詞的長度
        for i in range(len(txt)):
            length=self.checkSensitiveWord(txt,i,matchModel = MaxMatch)
            if length>0:
                word=txt[i:i+length]
                cur_txt_sensitiveList.append(word)
                i=i+length-1 # 出了迴圈還要+1 i+length是沒有檢測到的,下次直接從i+length開始
        
        # 對得到的結果和分詞結果進行匹配,不匹配的不要
        rst_list = []
        for line in cur_txt_sensitiveList:
            new_line = ''
            for char in line:
                if char not in self.stopWordList:
                    new_line += char
            if new_line in seg_list:
                rst_list.append(line)
        return rst_list
        
    def replaceSensitiveWord(self,txt,replaceChar='*'):
        Lst=self.getSensitiveWord(txt)
        
        # 如果需要加入的關鍵詞,已經在關鍵詞列表存在了,就不需要繼續新增
        def judge(Lst,word):
            if len(Lst)==0:
                return True
            for Str in Lst:
                if Str.count(word)!=0:
                    return False
            return True

        for word in Lst:
            replaceStr=len(word)*replaceChar
            txt=txt.replace(word,replaceStr)

        newLst=[]
        for word in Lst:
            newWord=""
            # newWord是除去停用詞、最精煉版本的敏感詞
            for char in word:
                if char in self.stopWordList:
                    continue
                newWord+=char
            length = self.checkSensitiveWord(newWord, 0, matchModel = MinMatch)
            if judge(newLst, newWord[:length]):
                newLst.append(newWord[:length])
            else:
                continue

        return txt, newLst # 最終返回的結果是遮蔽敏感詞後的文字,以及檢測出的敏感詞