spark中文文字分類
阿新 • • 發佈:2018-12-30
最近要做一個點評中文的文字分類模型,想在spark中訓練模型,然後通過把tf過程、idf過程、以及模型封裝到一個類裡面一個java類中,然後寫到redis中,但是spark中idf過程中碰到一些困難,忘高手賜教,先看下面程式碼:
package com.meituan.spark.model import org.apache.log4j.{ Level, Logger } import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.SQLContext import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import com.meituan.nlp.util.WordUtil import com.meituan.nlp.util.TextUtil import org.apache.spark.mllib.feature.IDF import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.VectorTransformer import org.ansj.splitWord.analysis.ToAnalysis import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.classification.NaiveBayes import com.meituan.model.util.NaiveBaysianBean import com.alibaba.fastjson.JSON object Test { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf = new SparkConf().setAppName("AnalyzeSlotPrice").setMaster("local[16]") val start = System.currentTimeMillis() val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rdd = sc.textFile("/Users/shuubiasahi/Desktop/一個月內資料.csv") rdd.map { x => val spt = x.split("\t") (spt.apply(1)) }.map(x => (x, 1)).reduceByKey(_ + _).take(10).foreach(println) val data = rdd.map { x => val spt = x.split("\t") val content = spt.apply(0) val result = ToAnalysis.parse(TextUtil.fan2Jian(WordUtil.replaceAll(content))). getTerms.asScala.map(x => x.getName).filter { x => !WordUtil.isStopword(x) && x.length() > 1 }.mkString(" ") val label=spt.apply(1) // val label = if (spt.apply(1) == "-1") "-1" else "1" (label, result) } data.map(x => (x._1, 1)).reduceByKey(_ + _).take(10).foreach(println) val splits = data.randomSplit(Array(0.7, 0.3)) val train = splits(0) val test = splits(1) val hashingTF = new HashingTF(500000) val tf_num_pairs_train = train.map(x => (x._1, hashingTF.transform(x._2.split(" ").toSeq))) val idf = new IDF().fit(tf_num_pairs_train.values) val num_idf_pairs_train = tf_num_pairs_train.mapValues(v => idf.transform(v)).map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray))) // val num_idf_pairs_train = tf_num_pairs_train.map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray))) val model = NaiveBayes.train(num_idf_pairs_train, lambda =5, modelType = "multinomial") val tf_num_pairs_test = test.map(x => (x._1, hashingTF.transform(x._2.split(" ").toSeq))) val idf1 = new IDF().fit(tf_num_pairs_test.values) val num_idf_pairs_test = tf_num_pairs_test.mapValues(v => idf1.transform(v)).map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray))) // val num_idf_pairs_test = tf_num_pairs_test.map(x => LabeledPoint(x._1.toDouble, Vectors.dense(x._2.toArray))) val testpredictionAndLabel = num_idf_pairs_test.map(p => (model.predict(p.features), p.label)) var testaccuracy = 1.0 * testpredictionAndLabel.filter(x => x._1 == x._2).count() / num_idf_pairs_test.count() print("output:"+testaccuracy) /*val modelJson=JSON.toJSON(model).toString() val naiveBaysianModel = new NaiveBaysianBean() naiveBaysianModel.setHashingTf(hashingTF) naiveBaysianModel.setIdf(idf) naiveBaysianModel.setModelBase64(modelJson)*/ } }
在程式碼中可以看出,tf過程可以處理seq[str] ,但是idf過程要處理的是Rdd[vector] 這個真不好弄,不怎麼怎麼封裝到java類中,不再依賴於spark,在別的地方呼叫,希望高手賜教。。。。。