1. 程式人生 > >Spark MLlib實現的中文文字分類–Naive Bayes

Spark MLlib實現的中文文字分類–Naive Bayes

文字分類是指將一篇文章歸到事先定義好的某一類或者某幾類,在資料平臺的一個典型的應用場景是,通過爬取使用者瀏覽過的頁面內容,識別出使用者的瀏覽偏好,從而豐富該使用者的畫像。
本文介紹使用Spark MLlib提供的樸素貝葉斯(Naive Bayes)演算法,完成對中文文字的分類過程。主要包括中文分詞、文字表示(TF-IDF)、模型訓練、分類預測等。

中文分詞

對於中文文字分類而言,需要先對文章進行分詞,我使用的是IKAnalyzer中文分析工具,,其中自己可以配置擴充套件詞庫來使分詞結果更合理,我從搜狗、百度輸入法下載了細胞詞庫,將其作為擴充套件詞庫。這裡不再介紹分詞。

中文詞語特徵值轉換(TF-IDF)

分好詞後,每一個詞都作為一個特徵,但需要將中文詞語轉換成Double型來表示,通常使用該詞語的TF-IDF值作為特徵值,Spark提供了全面的特徵抽取及轉換的API,非常方便,詳見http://spark.apache.org/docs/latest/ml-features.html,這裡介紹下TF-IDF的API:

比如,訓練語料/tmp/lxw1234/1.txt:

0,蘋果 官網 蘋果 宣佈
1,蘋果 梨 香蕉

逗號分隔的第一列為分類編號,0為科技,1為水果。

  1. caseclassRawDataRecord(category:String, text:String)
  2. val conf =newSparkConf
    ().setMaster("yarn-client")
  3. val sc =newSparkContext(conf)
  4. val sqlContext =new org.apache.spark.sql.SQLContext(sc)
  5. import sqlContext.implicits._
  6. //將原始資料對映到DataFrame中,欄位category為分類編號,欄位text為分好的詞,以空格分隔
  7. var srcDF = sc.textFile("/tmp/lxw1234/1.txt").map {
  8. x =>
  9. var data = x.split(",")
  10. RawDataRecord(data(0),data
    (1))
  11. }.toDF()
  12. srcDF.select("category","text").take(2).foreach(println)
  13. [0,蘋果官網蘋果宣佈]
  14. [1,蘋果香蕉]
  15. //將分好的詞轉換為陣列
  16. var tokenizer =newTokenizer().setInputCol("text").setOutputCol("words")
  17. var wordsData = tokenizer.transform(srcDF)
  18. wordsData.select($"category",$"text",$"words").take(2).foreach(println)
  19. [0,蘋果官網蘋果宣佈,WrappedArray(蘋果,官網,蘋果,宣佈)]
  20. [1,蘋果香蕉,WrappedArray(蘋果,梨,香蕉)]
  21. //將每個詞轉換成Int型,並計算其在文件中的詞頻(TF)
  22. var hashingTF =
  23. newHashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
  24. var featurizedData = hashingTF.transform(wordsData)

這裡將中文詞語轉換成INT型的Hashing演算法,類似於Bloomfilter,上面的setNumFeatures(100)表示將Hash分桶的數量設定為100個,這個值預設為2的20次方,即1048576,可以根據你的詞語數量來調整,一般來說,這個值越大,不同的詞被計算為一個Hash值的概率就越小,資料也更準確,但需要消耗更大的記憶體,和Bloomfilter是一個道理。

  1. featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
  2. [0,WrappedArray(蘋果,官網,蘋果,宣佈),(100,[23,81,96],[2.0,1.0,1.0])]
  3. [1,WrappedArray(蘋果,梨,香蕉),(100,[23,72,92],[1.0,1.0,1.0])]

結果中,“蘋果”用23來表示,第一個文件中,詞頻為2,第二個文件中詞頻為1.

  1. //計算TF-IDF值
  2. var idf =new IDF().setInputCol("rawFeatures").setOutputCol("features")
  3. var idfModel = idf.fit(featurizedData)
  4. var rescaledData = idfModel.transform(featurizedData)
  5. rescaledData.select($"category", $"words", $"features").take(2).foreach(println)
  6. [0,WrappedArray(蘋果,官網,蘋果,宣佈),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
  7. [1,WrappedArray(蘋果,梨,香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]
  8. //因為一共只有兩個文件,且都出現了“蘋果”,因此該詞的TF-IDF值為0.

最後一步,將上面的資料轉換成Bayes演算法需要的格式,如:

https://github.com/apache/spark/blob/branch-1.5/data/mllib/sample_naive_bayes_data.txt

  1. var trainDataRdd = rescaledData.select($"category",$"features").map {
  2. caseRow(label:String, features:Vector)=>
  3. LabeledPoint(label.toDouble,Vectors.dense(features.toArray))
  4. }


每一個LabeledPoint中,特徵陣列的長度為100(setNumFeatures(100)),”官網”和”宣佈”對應的特徵索引號分別為81和96,因此,在特徵陣列中,第81位和第96位分別為它們的TF-IDF值。

到此,中文詞語特徵表示的工作已經完成,trainDataRdd已經可以作為Bayes演算法的輸入了。

分類模型訓練

訓練模型,語料非常重要,我這裡使用的是搜狗提供的分類語料庫,很早之前的了,這裡只作為學習測試使用。

下載地址在:http://www.sogou.com/labs/dl/c.html,語料庫一共有10個分類:

C000007 汽車
      C000008 財經
C000010  IT
C000013 健康
C000014 體育
C000016 旅遊
C000020 教育
C000022 招聘
C000023 文化
C000024 軍事

每個分類下有幾千個文件,這裡將這些語料進行分詞,然後每一個分類生成一個檔案,在該檔案中,每一行資料表示一個文件的分詞結果,重新用0-9作為這10個分類的編號:
0 汽車
1 財經
2 IT
3 健康
4 體育
5 旅遊
6 教育
7 招聘
8 文化
9 軍事

比如,汽車分類下的檔案內容為:


資料準備好了,接下來進行模型訓練及分類預測,程式碼:

  1. package com.lxw1234.textclassification
  2. import scala.reflect.runtime.universe
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.SparkContext
  5. import org.apache.spark.ml.feature.HashingTF
  6. import org.apache.spark.ml.feature.IDF
  7. import org.apache.spark.ml.feature.Tokenizer
  8. import org.apache.spark.mllib.classification.NaiveBayes
  9. import org.apache.spark.mllib.linalg.Vector
  10. import org.apache.spark.mllib.linalg.Vectors
  11. import org.apache.spark.mllib.regression.LabeledPoint
  12. import org.apache.spark.sql.Row
  13. objectTestNaiveBayes{
  14. caseclassRawDataRecord(category:String, text:String)
  15. def main(args :Array[String]){
  16. val conf =newSparkConf().setMaster("yarn-client")
  17. val sc =newSparkContext(conf)
  18. val sqlContext =new org.apache.spark.sql.SQLContext(sc)
  19. import sqlContext.implicits._
  20. var srcRDD = sc.textFile("/tmp/lxw1234/sougou/").map {
  21. x =>
  22. var data = x.split(",")
  23. RawDataRecord(data(0),data(1))
  24. }
  25. //70%作為訓練資料,30%作為測試資料
  26. val splits = srcRDD.randomSplit(Array(0.7,0.3))
  27. var trainingDF = splits(0).toDF()
  28. var testDF = splits(1).toDF()
  29. //將詞語轉換成陣列
  30. var tokenizer =newTokenizer().setInputCol("text").setOutputCol("words")
  31. var wordsData = tokenizer.transform(trainingDF)
  32. println("output1:")
  33. wordsData.select($"category",$"text",$"words").take(1)
  34. //計算每個詞在文件中的詞頻
  35. var hashingTF =newHashingTF().setNumFeatures(500000).setInputCol("words").setOutputCol("rawFeatures")
  36. var featurizedData = hashingTF.transform(wordsData)
  37. println("output2:")
  38. featurizedData.select($"category", $"words", $"rawFeatures").take(1)
  39. //計算每個詞的TF-IDF
  40. var idf =new IDF().setInputCol("rawFeatures").setOutputCol("features")
  41. var idfModel = idf.fit(featurizedData)
  42. var rescaledData