sparkStreaming裡面使用文字分析模型(2.0.1)
阿新 • • 發佈:2019-02-05
如果使用模型的建立話請參考另一篇部落格建模地址
功能:接收來自kafka的資料,資料是一篇文章,來判斷文章的型別,把判斷的結果一併儲存到hbase,並把文章建立索引(沒有程式碼只有一個空殼,可以自己實現,以後有機會了可能會補上),
程式碼實現:
package spark.mllib
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.feature.{HashingTF, IDF, LabeledPoint, Tokenizer}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache .spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream
import org.apache.spark.SparkConf
import org.apache .spark.streaming.api.java.JavaPairReceiverInputDStream.fromReceiverInputDStream
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.NaiveBayesModel
import org.omg.CORBA_2_3.portable.OutputStream
import java.io.FileOutputStream
class UseModel1 {
}
object UseModel1{
//流程程式碼
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) =Array("192.168.10.199:2181" ,"order","order","2");
val conf = new SparkConf().setAppName("useModel").setMaster("local[4]");
val ssc = getStreamingContext(conf, 10);
val dstreams = getKafkaDstream(ssc, topics, zkQuorum, group, numThreads);
val dstream = dstreams.inputDStream.map(_._2);
dstream.persist()
//測試
dstream.print()
//如果能判斷不為空就更好了
dstream.foreachRDD(rdd =>everyRDD(rdd))
ssc.start()
ssc.awaitTermination()
}
//得到StreamingContext
def getStreamingContext(conf:SparkConf,secend:Int):StreamingContext = {
return new StreamingContext(conf, Seconds(secend))
}
//得到sparkSession
def getSparkSession(conf:SparkConf): SparkSession = {
val spark = SparkSession.builder()
.config(conf)
.config("spark.sql.warehouse.dir", "warehouse/dir")
.getOrCreate()
return spark;
}
//得到kafkaDStream
def getKafkaDstream(ssc:StreamingContext,topics:String,zkQuorum:String,group:String,numThreads:String):JavaPairReceiverInputDStream[String,String] ={
ssc.checkpoint("directory")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap;
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
return stream;
}
//檔案儲存測試
def savaString(str:String):Unit={
val out = new FileOutputStream("D:\\decstop\\file.txt",true);
out.write(str.getBytes)
out.flush()
out.close()
}
//每一個rdd做動作
def everyRDD(rdd:RDD[String]){
val sameModel = NaiveBayesModel.load("resoult")
val spark = getSparkSession(rdd.context.getConf)
import spark.implicits._
val rddDF = rdd.map { line => (1,line) }.toDF("label","text").persist()
//rddDF.show()
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val tokenizerRDD = tokenizer.transform(rddDF)
//tokenizerRDD.show(false)
val hashingTF =
new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
val hashingTFRDD = hashingTF.transform(tokenizerRDD)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(hashingTFRDD)
val rescaledData = idfModel.transform(hashingTFRDD)
//rescaledData.show(false)
//轉化為貝葉斯需要的格式
val useDataRdd = rescaledData.select($"label", $"features").map{
case Row(label:Int , features:Vector) =>
LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
}
val predictions = sameModel.transform(useDataRdd)
predictions.persist()
//predictions.show(false)
//參照下面可以實現各種的邏輯,可以把下面的儲存,建索引都加上
predictions.select($"label",$"prediction").foreach { x => savaString((""+x.getAs("label")+" "+x.getAs("prediction")+"\n\r")) }
//測試
predictions.createOrReplaceTempView("prediction")
rddDF.createOrReplaceTempView("atical")
//spark.sql("select p.label,p.prediction,a.text from prediction p,atical a where p.label=a.label").select(col, cols)
}
//簡歷索引 主要的建立索引的有hbase_rowKay(time) aothor title article
def buiderIndex(){}
//儲存到hbase
def savaToHbase(){
}
//傳送到下一個kafka 傳送的資料 time 正輿情數量 負面輿情數量 百分比 是否報警
def sendToKafka(){
}
}