使用spark讀取es中的資料並進行資料清洗,使用fp-growth演算法進行加工
阿新 • • 發佈:2019-02-15
最近學了spark,用fg-growth演算法進行資料的關聯排序
object HelloPFg { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering") conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.100.100") val sc = new SparkContext(conf) var sqlsc = new SQLContext(sc) /** * 讀取es中的資料,logstash是es中的索引名稱,如果需要讀取多個索引,則使用逗號將索引隔開即可 * val esLogs = sc.esRDD("logstash-2016.04.04,logstash-2016.04.05").values * 如果需要讀取不同的index中的不同的type中的資料,則分別讀取,然後使用union將多個rdd合併成一個rdd即可 val esLogs = sc.esRDD("logstash-2016.04.04/spark").values val esLogs1 = sc.esRDD("logstash-2016.04.05/docs").values val test=esLogs.union(esLogs1) * */ val esLogs = sc.esRDD("logstash-2016.04.04").values//使用values取出資料中的values,本來取出的資料為Map val line_num = esLogs.count() //對資料進行過濾,只保留防火牆的資料 val waf1 = esLogs.filter(_.contains("waf_logtype")) System.out.println("waf1:" + waf1.first()) System.out.println("waf1:srcip:" + waf1.first().get("srcip")) //去掉含有message的資料 var waf2 = waf1.map(m => m.-("message")) System.out.println("waf2...............waf2........."+waf2.first()) //組裝fp-growth需要的資料型別,fpg演算法需要RDD型別的Array[String]型別 var waf3 = waf2.map(m => Array(m.get("waf_logtype").toString(), m.get("url").toString().substring(0, dns(m.get("url").toString())+1),//對url進行過濾,去掉第一個反斜槓後面的部分 m.get("srcip").toString(), m.get("method").toString())) println("...............waf3:........."+waf3.first()) System.out.println("waf3:"+waf3.first().mkString(",")) //設定最小支援度,以及分片的數量,分片的數量就是計算的結果會生成檔案的個數 val fpg = new FPGrowth() .setMinSupport(0.2) .setNumPartitions(10) val model = fpg.run(waf3) model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq) } val minConfidence = 0.8 model.generateAssociationRules(minConfidence).collect().foreach { rule => println( rule.antecedent.mkString("[", ",", "]") + " => " + rule.consequent.mkString("[", ",", "]") + ", " + rule.confidence) } } private def dns(line: String): Int = { if (line.indexOf('/') > 0) line.indexOf('/') else 0 } }