Spark MLlib實現的中文文字分類–Naive Bayes
文字分類是指將一篇文章歸到事先定義好的某一類或者某幾類,在資料平臺的一個典型的應用場景是,通過爬取使用者瀏覽過的頁面內容,識別出使用者的瀏覽偏好,從而豐富該使用者的畫像。
本文介紹使用Spark MLlib提供的樸素貝葉斯(Naive Bayes)演算法,完成對中文文字的分類過程。主要包括中文分詞、文字表示(TF-IDF)、模型訓練、分類預測等。
中文分詞
對於中文文字分類而言,需要先對文章進行分詞,我使用的是IKAnalyzer中文分析工具,,其中自己可以配置擴充套件詞庫來使分詞結果更合理,我從搜狗、百度輸入法下載了細胞詞庫,將其作為擴充套件詞庫。這裡不再介紹分詞。
中文詞語特徵值轉換(TF-IDF)
分好詞後,每一個詞都作為一個特徵,但需要將中文詞語轉換成Double型來表示,通常使用該詞語的TF-IDF值作為特徵值,Spark提供了全面的特徵抽取及轉換的API,非常方便,詳見http://spark.apache.org/docs/latest/ml-features.html,這裡介紹下TF-IDF的API:
比如,訓練語料/tmp/lxw1234/1.txt:
0,蘋果 官網 蘋果 宣佈
1,蘋果 梨 香蕉
逗號分隔的第一列為分類編號,0為科技,1為水果。
- caseclassRawDataRecord(category:String, text:String)
- val conf =newSparkConf
- val sc =newSparkContext(conf)
- val sqlContext =new org.apache.spark.sql.SQLContext(sc)
- import sqlContext.implicits._
- //將原始資料對映到DataFrame中,欄位category為分類編號,欄位text為分好的詞,以空格分隔
- var srcDF = sc.textFile("/tmp/lxw1234/1.txt").map {
- x =>
- var data = x.split(",")
- RawDataRecord(data(0),data
- }.toDF()
- srcDF.select("category","text").take(2).foreach(println)
- [0,蘋果官網蘋果宣佈]
- [1,蘋果梨香蕉]
- //將分好的詞轉換為陣列
- var tokenizer =newTokenizer().setInputCol("text").setOutputCol("words")
- var wordsData = tokenizer.transform(srcDF)
- wordsData.select($"category",$"text",$"words").take(2).foreach(println)
- [0,蘋果官網蘋果宣佈,WrappedArray(蘋果,官網,蘋果,宣佈)]
- [1,蘋果梨香蕉,WrappedArray(蘋果,梨,香蕉)]
- //將每個詞轉換成Int型,並計算其在文件中的詞頻(TF)
- var hashingTF =
- newHashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
- var featurizedData = hashingTF.transform(wordsData)
這裡將中文詞語轉換成INT型的Hashing演算法,類似於Bloomfilter,上面的setNumFeatures(100)表示將Hash分桶的數量設定為100個,這個值預設為2的20次方,即1048576,可以根據你的詞語數量來調整,一般來說,這個值越大,不同的詞被計算為一個Hash值的概率就越小,資料也更準確,但需要消耗更大的記憶體,和Bloomfilter是一個道理。
- featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
- [0,WrappedArray(蘋果,官網,蘋果,宣佈),(100,[23,81,96],[2.0,1.0,1.0])]
- [1,WrappedArray(蘋果,梨,香蕉),(100,[23,72,92],[1.0,1.0,1.0])]
結果中,“蘋果”用23來表示,第一個文件中,詞頻為2,第二個文件中詞頻為1.
- //計算TF-IDF值
- var idf =new IDF().setInputCol("rawFeatures").setOutputCol("features")
- var idfModel = idf.fit(featurizedData)
- var rescaledData = idfModel.transform(featurizedData)
- rescaledData.select($"category", $"words", $"features").take(2).foreach(println)
- [0,WrappedArray(蘋果,官網,蘋果,宣佈),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
- [1,WrappedArray(蘋果,梨,香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]
- //因為一共只有兩個文件,且都出現了“蘋果”,因此該詞的TF-IDF值為0.
最後一步,將上面的資料轉換成Bayes演算法需要的格式,如:
https://github.com/apache/spark/blob/branch-1.5/data/mllib/sample_naive_bayes_data.txt
- var trainDataRdd = rescaledData.select($"category",$"features").map {
- caseRow(label:String, features:Vector)=>
- LabeledPoint(label.toDouble,Vectors.dense(features.toArray))
- }
每一個LabeledPoint中,特徵陣列的長度為100(setNumFeatures(100)),”官網”和”宣佈”對應的特徵索引號分別為81和96,因此,在特徵陣列中,第81位和第96位分別為它們的TF-IDF值。
到此,中文詞語特徵表示的工作已經完成,trainDataRdd已經可以作為Bayes演算法的輸入了。
分類模型訓練
訓練模型,語料非常重要,我這裡使用的是搜狗提供的分類語料庫,很早之前的了,這裡只作為學習測試使用。
下載地址在:http://www.sogou.com/labs/dl/c.html,語料庫一共有10個分類:
C000007 汽車
C000008 財經
C000010 IT
C000013 健康
C000014 體育
C000016 旅遊
C000020 教育
C000022 招聘
C000023 文化
C000024 軍事
每個分類下有幾千個文件,這裡將這些語料進行分詞,然後每一個分類生成一個檔案,在該檔案中,每一行資料表示一個文件的分詞結果,重新用0-9作為這10個分類的編號:
0 汽車
1 財經
2 IT
3 健康
4 體育
5 旅遊
6 教育
7 招聘
8 文化
9 軍事
比如,汽車分類下的檔案內容為:
資料準備好了,接下來進行模型訓練及分類預測,程式碼:
- package com.lxw1234.textclassification
- import scala.reflect.runtime.universe
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.ml.feature.HashingTF
- import org.apache.spark.ml.feature.IDF
- import org.apache.spark.ml.feature.Tokenizer
- import org.apache.spark.mllib.classification.NaiveBayes
- import org.apache.spark.mllib.linalg.Vector
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.spark.mllib.regression.LabeledPoint
- import org.apache.spark.sql.Row
- objectTestNaiveBayes{
- caseclassRawDataRecord(category:String, text:String)
- def main(args :Array[String]){
- val conf =newSparkConf().setMaster("yarn-client")
- val sc =newSparkContext(conf)
- val sqlContext =new org.apache.spark.sql.SQLContext(sc)
- import sqlContext.implicits._
- var srcRDD = sc.textFile("/tmp/lxw1234/sougou/").map {
- x =>
- var data = x.split(",")
- RawDataRecord(data(0),data(1))
- }
- //70%作為訓練資料,30%作為測試資料
- val splits = srcRDD.randomSplit(Array(0.7,0.3))
- var trainingDF = splits(0).toDF()
- var testDF = splits(1).toDF()
- //將詞語轉換成陣列
- var tokenizer =newTokenizer().setInputCol("text").setOutputCol("words")
- var wordsData = tokenizer.transform(trainingDF)
- println("output1:")
- wordsData.select($"category",$"text",$"words").take(1)
- //計算每個詞在文件中的詞頻
- var hashingTF =newHashingTF().setNumFeatures(500000).setInputCol("words").setOutputCol("rawFeatures")
- var featurizedData = hashingTF.transform(wordsData)
- println("output2:")
- featurizedData.select($"category", $"words", $"rawFeatures").take(1)
- //計算每個詞的TF-IDF
- var idf =new IDF().setInputCol("rawFeatures").setOutputCol("features")
- var idfModel = idf.fit(featurizedData)
- var rescaledData