Spark學習筆記——文本處理技術
阿新 • • 發佈:2017-05-08
使用 ken ins main 最小 leg tran sparse rain
1.建立TF-IDF模型
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.{SparseVector => SV} import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF /** * Created by common on 17-5-6. */ object TFIDF { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) // val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*" val path = "file:///media/common/工作/kaggle/test/*" val rdd = sc.wholeTextFiles(path) // 提取文本信息 val text = rdd.map { case (file, text) => text } // print(text.count()) val regex = """[^0-9]*""".r // 排除停用詞 val stopwords = Set( "the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not", "with", "as", "was", "if", "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to" ) // 以使用正則表達切分原始文檔來移除這些非單詞字符 val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase)) // 過濾掉數字和包含數字的單詞 val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches) // 基於出現的頻率,排除很少出現的單詞,需要先計算一遍整個測試集 val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _) val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet // 每一個文檔的預處理函數 def tokenize(line: String): Seq[String] = { line.split("""\W+""") .map(_.toLowerCase) .filter(token => regex.pattern.matcher(token).matches) .filterNot(token => stopwords.contains(token)) .filterNot(token => rareTokens.contains(token)) .filter(token => token.size >= 2) //刪除只有一個字母的單詞 .toSeq } // 每一篇文檔經過預處理之後,每一個文檔成為一個Seq[String] val tokens = text.map(doc => tokenize(doc)).cache() println(tokens.distinct.count) // 第一篇文檔第一部分分詞之後的結果 println(tokens.first()) println(tokens.first().length) // 生成2^18維的特征 val dim = math.pow(2, 18).toInt val hashingTF = new HashingTF(dim) // HashingTF 的 transform 函數把每個輸入文檔(即詞項的序列)映射到一個MLlib的Vector對象 val tf = hashingTF.transform(tokens) // tf的長度是文檔的個數,對應的是文檔和維度的矩陣 tf.cache // 取得第一個文檔的向量 val v = tf.first.asInstanceOf[SV] println(v.size) // v.value和v.indices的長度相等,value是詞頻,indices是詞頻非零的下標 println(v.values.size) println(v.indices.size) println(v.values.toSeq) println(v.indices.take(10).toSeq) // 對每個單詞計算逆向文本頻率 val idf = new IDF().fit(tf) // 轉換詞頻向量為TF-IDF向量 val tfidf = idf.transform(tf) val v2 = tfidf.first.asInstanceOf[SV] println(v2.values.size) println(v2.values.take(10).toSeq) println(v2.indices.take(10).toSeq) // 計算整個文檔的TF-IDF最小和最大權值 val minMaxVals = tfidf.map { v => val sv = v.asInstanceOf[SV] (sv.values.min, sv.values.max) } val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) => (math.min(min1, min2), math.max(max1, max2)) } println(globalMinMax) // 比較幾個單詞的TF-IDF權值 val common = sc.parallelize(Seq(Seq("you", "do", "we"))) val tfCommon = hashingTF.transform(common) val tfidfCommon = idf.transform(tfCommon) val commonVector = tfidfCommon.first.asInstanceOf[SV] println(commonVector.values.toSeq) val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment"))) val tfUncommon = hashingTF.transform(uncommon) val tfidfUncommon = idf.transform(tfUncommon) val uncommonVector = tfidfUncommon.first.asInstanceOf[SV] println(uncommonVector.values.toSeq) } }
Spark學習筆記——文本處理技術