1. 程式人生 > 其它 >數倉工具—Hive實戰之UDF分詞(1)

數倉工具—Hive實戰之UDF分詞(1)

技術標籤:資料倉庫Hivehive大資料資料倉庫面試

UDF 分詞

分詞

這個是一個比較常見的場景,例如公司的產品有每天都會產生大量的彈幕或者評論,這個時候我們可能會想去分析一下大家最關心的熱點話題是什麼,或者是我們會分析最近一段時間的網路趨勢是什麼,但是這裡有一個問題就是你的詞庫建設的問題,因為你使用通用的詞庫可能不能達到很好的分詞效果,尤其有很多網路流行用語它是不在詞庫裡的,還有一個就是停用詞的問題了,因為很多時候停用詞是沒有意義的,所以這裡我們需要將其過濾,而過濾的方式就是通過停用詞詞表進行過濾

這個時候我們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫,還有一種是自建詞庫,然後有專人去維護,這個也是比較常見的一種情況

最後一個就是我們使用的分詞工具,因為目前主流的分詞器很多,選擇不同的分詞工具可能對我們的分詞結果有很多影響

分詞工具

1:Elasticsearch的開源中文分詞器 IK Analysis(Star:2471)

IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從檔案系統中讀取詞典,es-ik本身可擴充套件成從不同的源讀取詞典。目前提供從sqlite3資料庫中讀取。es-ik-plugin-sqlite3使用方法: 1. 在elasticsearch.yml中設定你的sqlite3詞典的位置: ik_analysis_db_path: /opt/ik/dictionary.db

2:開源的java中文分詞庫 IKAnalyzer(Star:343)

IK Analyzer 是一個開源的,基於java語言開發的輕量級的中文分詞工具包。從2006年12月推出1.0版開始, IKAnalyzer已經推出了4個大版本。最初,它是以開源專案Luence為應用主體的,結合詞典分詞和文法分析演算法的中文分片語件。從3.0版本開始,IK發展為面向Java的公用分片語件,獨立於Lucene專案

3:java開源中文分詞 Ansj(Star:3019)

Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了所有的資料結構和演算法.詞典是用的開源版的ictclas所提供的.並且進行了部分的人工優化 分詞速度達到每秒鐘大約200萬字左右,準確率能達到96%以上

目前實現了.中文分詞. 中文姓名識別 . 詞性標註、使用者自定義詞典,關鍵字提取,自動摘要,關鍵字標記等功能

可以應用到自然語言處理等方面,適用於對分詞效果要求高的各種專案.

4:結巴分詞 ElasticSearch 外掛(Star:188)

elasticsearch官方只提供smartcn這個中文分詞外掛,效果不是很好,好在國內有medcl大神(國內最早研究es的人之一)寫的兩個中文分詞外掛,一個是ik的,一個是mmseg的

5:Java分散式中文分片語件 - word分詞(Star:672)

word分詞是一個Java實現的分散式的中文分片語件,提供了多種基於詞典的分詞演算法,並利用ngram模型來消除歧義。能準確識別英文、數字,以及日期、時間等數量詞,能識別人名、地名、組織機構名等未登入詞

6:Java開源中文分詞器jcseg(Star:400)

Jcseg是什麼? Jcseg是基於mmseg演算法的一個輕量級開源中文分詞器,同時集成了關鍵字提取,關鍵短語提取,關鍵句子提取和文章自動摘要等功能,並且提供了最新版本的lucene, solr, elasticsearch的分詞介面, Jcseg自帶了一個 jcseg.properties檔案…

7:中文分詞庫Paoding

庖丁中文分詞庫是一個使用Java開發的,可結合到Lucene應用中的,為網際網路、企業內部網使用的中文搜尋引擎分片語件。Paoding填補了國內中文分詞方面開源元件的空白,致力於此並希翼成為網際網路網站首選的中文分詞開源元件。 Paoding中文分詞追求分詞的高效率和使用者良好體驗。

8:中文分詞器mmseg4j

mmseg4j 用 Chih-Hao Tsai 的 MMSeg 演算法(http://technology.chtsai.org/mmseg/ )實現的中文分詞器,並實現 lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使…

9:中文分詞Ansj(Star:3015)

Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了所有的資料結構和演算法.詞典是用的開源版的ictclas所提供的.並且進行了部分的人工優化 記憶體中中文分詞每秒鐘大約100萬字(速度上已經超越ictclas) 檔案讀取分詞每秒鐘大約30萬字 準確率能達到96%以上 目前實現了…

10:Lucene中文分詞庫ICTCLAS4J

ictclas4j中文分詞系統是sinboy在中科院張華平和劉群老師的研製的FreeICTCLAS的基礎上完成的一個java開源分詞專案,簡化了原分詞程式的複雜度,旨在為廣大的中文分詞愛好者一個更好的學習機會。

程式碼實現

**第一步:**引入依賴

這裡我們引入了兩個依賴,其實是兩個不同分詞工具

<dependency>
  <groupId>org.ansj</groupId>
  <artifactId>ansj_seg</artifactId>
  <version>5.1.6</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>com.janeluo</groupId>
  <artifactId>ikanalyzer</artifactId>
  <version>2012_u6</version>
</dependency>

在開始之前我們先寫一個demo 玩玩,讓大家有個基本的認識

@Test
public  void testAnsjSeg() {
    String str = "我叫李太白,我是一個詩人,我生活在唐朝" ;
  	// 選擇使用哪種分詞器 BaseAnalysis ToAnalysis NlpAnalysis  IndexAnalysis
    Result result = ToAnalysis.parse(str);
    System.out.println(result);
    KeyWordComputer kwc = new KeyWordComputer(5);
    Collection<Keyword> keywords = kwc.computeArticleTfidf(str);
    System.out.println(keywords);
}

輸出結果

我/r,叫/v,李太白/nr,,/w,我/r,是/v,一個/m,詩人/n,,/w,我/r,生活/vn,在/p,唐朝/t
[李太白/24.72276098504223, 詩人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]

**第二步:**引入停用詞詞庫

因為是停用詞詞庫,本身也不是很大,所以我直接放在專案裡了,當然你也可以放在其他地方,例如HDFS 上

image-20201228185848916

**第三步:**編寫UDF

程式碼很簡單我就不不做詳細解釋了,需要注意的是GenericUDF 裡面的一些方法的使用規則,至於程式碼設計的好壞以及還有什麼改進的方案我們後面再說,下面兩套實現的思路幾乎是一致的,不一樣的是在使用的分詞工具上的不一樣

ansj的實現

/**
 * Chinese words segmentation with user-dict in com.kingcall.dic
 * use Ansj(a java open source analyzer)
 */

@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.",
        extended = "Example: select _FUNC_('我是測試字串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測試\", \"字串\"]")

public class AnsjSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    private static final String userDic = "/app/stopwords/com.kingcall.dic";

    //load userDic in hdfs
    static {
        try {
            FileSystem fs = FileSystem.get(new Configuration());
            FSDataInputStream in = fs.open(new Path(userDic));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));

            String line = null;
            String[] strs = null;
            while ((line = br.readLine()) != null) {
                line = line.trim();
                if (line.length() > 0) {
                    strs = line.split("\t");
                    strs[0] = strs[0].toLowerCase();
                    DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq
                }
            }
            MyStaticValue.isNameRecognition = Boolean.FALSE;
            MyStaticValue.isQuantifierRecognition = Boolean.TRUE;
        } catch (Exception e) {
            System.out.println("Error when load userDic" + e.getMessage());
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }


    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }

        Text s = (Text) converters[0].convert(arguments[0].get());
        ArrayList<Text> result = new ArrayList<>();

        if (filterStop) {
            for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        } else {
            for (Term words : DicAnalysis.parse(s.toString())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        }
        return result;
    }


    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("ansj_seg", children);
    }
}

ikanalyzer的實現

@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.",
        extended = "Example: select _FUNC_('我是測試字串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測試\", \"字串\"]")
public class IknalyzerSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    //用來存放停用詞的集合
    Set<String> stopWordSet = new HashSet<String>();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }
        //讀入停用詞檔案
        BufferedReader StopWordFileBr = null;
        try {
            StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt"))));
            //初如化停用詞集
            String stopWord = null;
            for(; (stopWord = StopWordFileBr.readLine()) != null;){
                stopWordSet.add(stopWord);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }
        Text s = (Text) converters[0].convert(arguments[0].get());
        StringReader reader = new StringReader(s.toString());
        IKSegmenter iks = new IKSegmenter(reader, true);
        List<Text> list = new ArrayList<>();
        if (filterStop) {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    if (!stopWordSet.contains(lexeme.getLexemeText())) {
                        list.add(new Text(lexeme.getLexemeText()));
                    }
                }
            } catch (IOException e) {
            }
        } else {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    list.add(new Text(lexeme.getLexemeText()));
                }
            } catch (IOException e) {
            }
        }
        return list;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "Usage: evaluate(String str)";
    }
}

**第四步:**測試程式碼

GenericUDF 給我妹提供了一些方法,這些方法可以用來構建測試需要的環境和引數,這樣我們就可以測試這些程式碼了

@Test
public void testAnsjSegFunc() throws HiveException {
    AnsjSeg udf = new AnsjSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測試字串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}


@Test
public void testIkSegFunc() throws HiveException {
    IknalyzerSeg udf = new IknalyzerSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測試字串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}

我們看到載入停用詞沒有找到,但是整體還是跑起來了,因為讀取不到HDFS 上的檔案

image-20201228203906521

第五步: 建立UDF 並使用

add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg';
select ansjSeg("我是字串,你是啥");
create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
select ikSeg("我是字串,你是啥");

image-20201228211400316

總結

  1. 這一節我們學習了一個比較常見的UDF,通過實現GenericUDF 抽象類來實現,這一節的重點在於程式碼的實現以及對GenericUDF類中方法的理解
  2. 上面的程式碼實現上有一個問題,那就是關於停用詞的載入,就是我們能不能動態載入停用詞呢?