Scala--Spark將某網站的爬蟲記錄進行整理
阿新 • • 發佈:2018-12-26
設計思路:
1.將ip截取出來(多種方式)用map組成二元組(注意區分map,flatmap)將相同ip出現次數統計出來分析得出爬蟲ip
2.將多次重複ip且訪問密集的設為訪問黑名單
3.將同一時間訪問某網站的ip整理出來
package Test1225 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object Spider01{ // 練習1 將ip出現次數多的爬蟲ip挑選出來 def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("ip") val ssc = new StreamingContext(conf,Seconds(10)) ssc.sparkContext.setLogLevel("ERROR") ssc.checkpoint("d://123/eq") val ds = ssc.socketTextStream("lion",9999) val paris = ds.map(_.split(" ")(0)).map(x => (x,1)).reduceByKey(_+_).filter(x => x._2 > 5) val res = paris.updateStateByKey(updateFunction).repartition(1) res.saveAsTextFiles("D://124//spider01")//查詢到的結果輸出到本地目錄 res.print() ssc.start() ssc.awaitTermination() ssc.stop() } def updateFunction(newValues:Seq[Int],runningCount:Option[Int]): Option[Int] ={ val newCounts = newValues.sum val sum = runningCount.getOrElse(0) Some(newCounts+sum) } }
2.
package Test1225 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object spider02 { // 根據上一題的結果建立一個黑名單hrdd然後禁止加入黑名單的hrdd訪問 ,將ip不是114.112.141.6的留下 def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ip").setMaster("local[*]") val ssc =new StreamingContext(conf,Seconds(10)) ssc.sparkContext.setLogLevel("ERROR") val iplist = List(("114.112.141.6",1)) val hrdd = ssc.sparkContext.parallelize(iplist) val ds = ssc.socketTextStream("lion",9999) ds.map(_.split(" ")(0)).map(x => (x,1)).transform{rdd => rdd.leftOuterJoin(hrdd).filter(x => if(x._2._2 == None)true else false)}.print(100) ssc.start() ssc.awaitTermination() } }
3.
package Test1225 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object spider03 { // 統計某一時間段內網站訪問量 def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ip").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("ERROR") //val iplist = List(("114.112.141.6", 1)) //val hrdd = ssc.sparkContext.parallelize(iplist) val ds = ssc.socketTextStream("lion", 9999) ds.map(_.split(" ")(0)).map(x => (x, 1)).reduceByKey(_+_).count().print(100) ssc.start() ssc.awaitTermination() } }