新詞發現及Java和spark實現
新詞發現並不是一個新的課題,但最有意思的一點是如果採用無監督的演算法,可以完全脫離人工的經驗由演算法自動找到有語意的“詞語”,而不是胡亂拼湊的漢字片段(歸因於演算法的有效性和語料本身是由有意義的詞語構成的)。本文參考了matrix67的一篇文章,網際網路時代的社會語言學:基於SNS的文字資料探勘,採用無監督方法來發現新詞,基本原理就是通過N-gram找到可能的詞,然後計算這些詞的詞頻、緊密度和自由度,最終通過一組閾值得到,具體可以參考上文連結,原文解釋的非常詳細和深入淺出。ok,talk is cheap, 下面給出我的實現。
在matrix67 blog的基礎上,根據具體業務進一步調整了新詞發現演算法,包括但不限於(等專案上線後會給出核心程式碼):
1. 增加了對應詞庫中的已有詞,常用高頻詞語,常用停用詞 來剔除新詞結果的無關詞
2. 去掉待處理語料中的垃圾語料和髒資料(廣告等)
3. 詞內compactness的定義改為對稱條件概率(PMI)的定義(對長度為2以上的新詞支援更好)
4. 如果首字為量詞(個、件、次等),且左鄰字大部分為數字(1、2、兩等),則改詞不作為新詞
5. 計算新詞得分score時綜合考慮詞頻和緊密度(更偏向與緊密度,暫不考慮詞外自由度,特徵不魯棒),更具score得到top N的新詞
6. 去掉新詞結果中的語氣疊詞(呵呵,嘻嘻嘻,哈哈哈哈哈等)
7. 去掉新詞結果中包含高頻常用字(了,是、不,麼等),且詞頻高,緊密度低 的詞
8. 去掉包含2個以上的高頻常用字(了,是、不,麼等)的新詞 考慮新詞結果中父串(如‘西門慶’)、子串(門慶),若兩者詞頻相近,則去子串保留父串;如果父串(如大野怪)、子串(‘野怪’)的詞頻不相近,則去父串保留子串。
matrix67的新詞發現文章的 java實現的單機執行版本:
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class findNewWord {
public boolean if_remove_oldWors = true;// 是否在n-gram階段就濾掉舊詞
public String str_in="我現在正在玩lol,現在戰況激烈可能現在沒辦法抽空回覆你";
int space_num = 4;
public int len_max = 4;
double compactness_t = 1000;
double flexible_t = 0.5;
double wcount_t = 5;
public HashMap<String,String> w_Map = new HashMap<String,String>();
public HashMap<String,String> nw_Map = new HashMap<String,String>();
public Set<String> wset = new HashSet<String>();
public int str_len = str_in.length();
public int game_id;
public String date_time;
//
public static Set<String> foundWords = new HashSet<String>();
public String cm = "99999999999999999";
public int count_max = Integer.parseInt(cm.substring(0,space_num));
//
// 開始統計詞資訊
public void wordStatics(){
str_in = str_in.replaceAll("[^\u4E00-\u9FA5^a-z^A-Z^0-9^\\>]", "");//[^a-z^A-Z^0-9]
str_len = str_in.length();
if(str_len<2) return;
// 初始化,減少包含檢索
String spacel = " ";
String info_ini = 0+",的:0"+spacel.substring(0,space_num-1)+",的:0"+spacel.substring(0,space_num-1);
// n-gram (n=len_max)
for(int i=0;i<str_len;i++){
if(str_in.charAt(i)=='>') continue;
int flag = 0;//判斷是否遇到句尾
for(int j=0;j<=len_max;j++){
if(i+j+1>str_len) continue;
String cur_w = str_in.substring(i, i+j+1);//當前詞
if(cur_w.charAt(cur_w.length()-1)=='>'){//當前詞末尾已經是完結符,則其不構成一個詞,不用統計
flag = 1;
}
if(flag == 1) continue;
// 初始化
wset.add(cur_w);
w_Map.put(cur_w, info_ini);
}
}
// 開始統計
for(int i=0;i<str_in.length();i++){
if(str_in.charAt(i)=='>') continue;
int flag = 0;//判斷是否遇到句尾 or 是舊詞
for(int j=0;j<=len_max;j++){
if(i+j+1>str_len) continue;
String cur_w = str_in.substring(i, i+j+1);//當前詞
if(cur_w.charAt(cur_w.length()-1)=='>'){//當前詞末尾已經是完結符
flag = 1;
}
if(flag == 1) continue;
//
String info_all = w_Map.get(cur_w);
String info[] = info_all.split(",");
int num = Integer.parseInt(info[0])+1;
//
String left_w = "";
if(i!=0 && str_in.charAt(i-1)!='>') left_w += str_in.charAt(i-1);
String right_w = "";
if((i+j+1)<str_len && str_in.charAt(i+j+1)!='>') right_w += str_in.charAt(i+j+1);
//
info_all = updateWordStatic(info_all, left_w, "left");
info_all = updateWordStatic(info_all, right_w, "right");
info_all = info_all.substring((info_all.indexOf(',', 0)));
w_Map.put(cur_w, num+info_all);
if(cur_w.equals("現在")){
//System.out.println(cur_w+"==>:"+num+","+left_w+","+right_w);
}
}
}
}
// 更新詞統計資訊
public String updateWordStatic(String str,String w_cur,String place){
if(w_cur.length()<1) return str;
String[] str_l = str.split(",");
String spacel = " ";
if(place.equals("left")){
int i_w = str_l[1].indexOf(w_cur+":");
//如果找到
if(i_w>0){
int i_n = i_w + w_cur.length()+1;
int i_b = i_n+space_num;
int num_cur = Integer.parseInt(str_l[1].substring(i_n, i_n+space_num).replaceAll(" ", ""))+1;
String nums = " ";
nums = nums.substring(0,space_num);
if(num_cur<count_max){
String num_t = num_cur + "";
nums = num_t + nums.substring(num_t.length());
}
else num_cur=count_max;
str = str_l[0] + ","+str_l[1].substring(0, i_w)
+ w_cur+":"+nums
+str_l[1].substring(i_b)+","+str_l[2];
}
// 如果沒有
else{
str = str_l[0] + ","+str_l[1]+w_cur+":"+"1"+spacel.substring(0,space_num-1)
+","+str_l[2];
}
}
if(place.equals("right")){
int i_w = str_l[2].indexOf(w_cur+":");
//如果找到
if(i_w>0){
int i_n = i_w + w_cur.length()+1;
int i_b = i_n+space_num;
int num_cur = Integer.parseInt(str_l[2].substring(i_n, i_n+space_num).replaceAll(" ", ""))+1;
String nums = " ";
nums = nums.substring(0,space_num);
if(num_cur<count_max){
String num_t = num_cur + "";
nums = num_t + nums.substring(num_t.length());
}
else num_cur=count_max;
str = str_l[0] + ","+str_l[1]+","+str_l[2].substring(0, i_w)
+ w_cur+":"+nums
+str_l[2].substring(i_b);
}
// 如果沒有
else{
str = str_l[0] + ","+str_l[1]
+","+str_l[2]+w_cur+":"+"1"+spacel.substring(0,space_num-1);
}
}
return str;
}
// 計算詞內凝固度、詞外自由度 發現新詞
public HashMap<String,String> findWords() throws Exception{
System.out.println("existing word num: "+foundWords.size());
//
for(String s:wset){
if(s.length()>1 && s.replaceAll("[^\u4E00-\u9FA5]", "").length()>0){//[^a-z^A-Z^0-9]){
// 計算詞內凝固度
String info[] = w_Map.get(s).split(",");
double compact_score = Double.MAX_VALUE;
Integer s0 = Integer.parseInt(info[0]);//詞頻
for(int i=0;i<s.length()-1;i++){
// 計算詞內凝固度
//如果能分成兩半
if(wset.contains(s.substring(0, i+1)) & wset.contains(s.substring(i+1))){
double s1 = 1.0*Integer.parseInt(w_Map.get(s.substring(0, i+1)).split(",")[0]);//詞頻
double s2 = 1.0*Integer.parseInt(w_Map.get(s.substring(i+1)).split(",")[0]);//詞頻
double score_comp = s0*str_len*1.0/(s1*s2);//分母是隨機組合的概率,比值越大越是人組的語意的詞
//
compact_score = Math.min(compact_score, score_comp );
}
}
// 計算詞外自由度
double left_f = culWordsEtropy(info[1]);
double right_f =culWordsEtropy(info[2]);
double flexible = Math.min(left_f, right_f );
// 如果滿足條件就加入潛在新詞
if(compact_score>compactness_t
&& flexible>flexible_t
&& s0>wcount_t
&& (!foundWords.contains(s)))
nw_Map.put(s, s0+","+compact_score+","+flexible);
}
}
return nw_Map;
}
// 計算詞熵
public double culWordsEtropy(String str){
int sumc = 0;
List<Integer> num_l = new ArrayList<Integer>();
for(int i=0;i<str.length();i++){
if(str.charAt(i)==':'){
Integer temp = Integer.parseInt(str.substring(i+1, i+5).replaceAll(" ", ""));
if(temp>0){
num_l.add(temp);
sumc += temp;
}
}
}
//
double strEtropy = 0.0;
for(Integer num:num_l){
strEtropy += (-1.0) * (num*1.0/(sumc*1.0)) * (Math.log(num*1.0/(sumc*1.0)));
}
return strEtropy;
}
public static void saveStrCSV(String file, String conent) {
BufferedWriter out = null;
try {
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));
out.write(conent+"\r\n");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception{
String dataPath = "E:/learningData/NLP/some_text_doc/金瓶梅.txt";//西遊記
//
InputStreamReader read = new InputStreamReader(new FileInputStream(dataPath),"gbk"); //,"gbk","UTF-8"
@SuppressWarnings("resource")
BufferedReader currBR = new BufferedReader(read);
String tempCurrLine = "";
String con = "";
while ((tempCurrLine = currBR.readLine()) != null) {
con +=tempCurrLine;
}
//
findNewWord fnw = new findNewWord();
fnw.str_in = con;
fnw.wordStatics();
HashMap<String,String> fw = fnw.findWords();
System.out.println("\n\n\nnew words num: " + fw.size());
//
List<Map.Entry<String, String>> entryList = new ArrayList<Map.Entry<String, String>>(fw.entrySet());
Collections.sort(entryList, new Comparator<Map.Entry<String, String>>()
{
@Override
public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2) {
Integer num1 = Integer.parseInt(o1.getValue().split(",")[0]);
Integer num2 = Integer.parseInt(o2.getValue().split(",")[0]);
Double com1 = Double.valueOf(o1.getValue().split(",")[1]);
Double com2 = Double.parseDouble(o2.getValue().split(",")[1]);
//return com2.compareTo(com1);
return num2.compareTo(num1);
}
});
for (int i = 0; i < 300; ++i) {
System.out.println(entryList.get(i).getKey()+"\t-->"+ entryList.get(i).getValue());
}
}
}
好了,讓演算法讀本‘金瓶梅’測試下效果,部分新詞輸出如下:
吩咐 -->405,1529.4558139534884,4.0082581300545606
夥計 -->241,1145.2131936416185,2.423341693467576
雪娥 -->239,1332.8995039219842,3.082098823200978
韓道國 -->211,1172.4191111862115,4.061459127893269
左右 -->206,1895.2380392815176,3.5324181923556903
晚夕 -->201,1046.5672754910577,3.708572332910948
提刑 -->195,1174.3713084805363,2.0821136913180194
師父 -->152,1142.9169610701424,2.8967660115396217
兄弟 -->149,1424.9685027919963,3.7470579082813456
秋菊 -->147,2261.07776504432,3.7488604958114653
良久 -->146,1553.7093203883496,3.873346472349265
御史 -->145,3226.143306607125,1.3085725892219808
須臾 -->144,2244.59385665529,4.067556537602757
伺候 -->141,2683.1859375,3.619320955768149
悄悄 -->122,1196.0950492687944,3.6785419885382886
喬大戶 -->120,1407.0731707317073,2.7895116233201307
丫鬟 -->119,1340.5661870503598,3.6447949795948507
夏提刑 -->118,1490.53275713051,3.89193214636793
姥姥 -->116,1417.3836206896551,1.6372525278308285
告訴 -->114,1353.5642534753565,3.015068941026971
達達 -->108,1034.7288619544315,2.603724058730813
捲棚 -->106,2458.2177086639163,1.6620497616520689
慢慢 -->99,1084.6969429404414,3.4736352593048228
孫雪娥 -->93,1168.5473720410387,3.509899048916368
常峙節 -->84,1226.9888059701493,3.5717010220730745
崇禎 -->81,6985.437450826121,1.0546115534488711
傅夥計 -->81,1624.1142073170731,3.73473063195388
馮媽媽 -->81,1263.9021068615355,3.4071486331531933
琵琶 -->79,8324.886075949367,3.26117688842775
誰說金瓶梅是小黃書?明明是本正經名著麼:)。一本西遊記或者金瓶梅的txt檔案大小大概1.5M,80w字左右,程式在8g記憶體的電腦上執行時間大概為1min以內,還是可以接受的。
但是如果文字檔案容量進一步增大,就要分散式方式來計算了,畢竟基於原生的n-gram演算法還是很吃記憶體的。由於公司相關的業務還在開發測試階段,下面先就給出核心的程式碼片段。思路是從hdfs或者hive中取資料,將達文字切分成小文字,並利用上述演算法在各個partitions中統計出新詞,最後彙總各個partitions中的結果,collection到本地。這裡吐槽一下spark的最常用的Dataset,Row支援很多的資料結構,但是對於map,只支援儲存scala的mutable.Map,不支援可變的java map。折騰了好久,最後還是回到javaRDD來做了,或者Dataset
JavaRDD<Map<String, String>> statistic_info_pa =
content_no.javaRDD().mapPartitions(
//new Function2<Integer, Iterator<Integer>, Iterator<String>>()
new FlatMapFunction<Iterator<Row>, Map<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Map<String, String>> call(Iterator<Row> v_in) throws Exception {
//
LinkedList<Map<String, String>> linkedList = new LinkedList<Map<String, String>>();
String str_in="";
while(v_in.hasNext()){
Row v1 = v_in.next();
String content = v1.getString(1).replaceAll("[^\u4E00-\u9FA5^a-z^A-Z^0-9]", "");
str_in = content;
//
NewWordsDiscovery fnw = new NewWordsDiscovery();
NewWordsDiscovery.foundWords = colIndexNew.getValue();
fnw.str_in = str_in;
fnw.wordStatics();
Map<String,String> fw = fnw.findWords();
// 每個partitions只取緊密度 top 1000
fw = NewWordsDiscovery.getTopNewWordsMapByCompactness(fw,1000);
linkedList.add(fw);
}
return linkedList.iterator();
}
}
);
// 將所有新詞取回本地並彙總
List<Map<String,String>> fw_list = statistic_info_pa.collect();
Map<String, String> fw_res = new HashMap<>();
for(Map<String,String> hm:fw_list){
Iterator<Entry<String,String>> entryIt = hm.entrySet().iterator();
while(entryIt.hasNext()){
Entry<String, String> entry = entryIt.next();
String w = entry.getKey();
if(notKouQiCi(w)){
// 沒有則新增
if(!fw_res.containsKey(w)){
String info = entry.getValue();
fw_res.put(w, info);
}
// 否則合併
else{
String info[] = entry.getValue().split(",");
String info_0[] = fw_res.get(w).split(",");
Integer num1 = Integer.parseInt(info_0[0]);
Integer num2 = Integer.parseInt(info[0]);
Double com1 = Double.valueOf(info_0[1]);
Double com2 = Double.parseDouble(info[1]);
Double flex1 = Double.valueOf(info_0[2]);
Double flex2 = Double.parseDouble(info[2]);
fw_res.put(w, (num1+num2)+","+(com1+com2)+","+(flex1+flex2));
}
}
}
}