1. 程式人生 > >Scala--Spark將某網站的爬蟲記錄進行整理

Scala--Spark將某網站的爬蟲記錄進行整理

設計思路:

1.將ip截取出來(多種方式)用map組成二元組(注意區分map,flatmap)將相同ip出現次數統計出來分析得出爬蟲ip

2.將多次重複ip且訪問密集的設為訪問黑名單

3.將同一時間訪問某網站的ip整理出來

package Test1225


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Spider01{
//  練習1 將ip出現次數多的爬蟲ip挑選出來
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("ip")
    val ssc = new StreamingContext(conf,Seconds(10))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123/eq")
    val ds = ssc.socketTextStream("lion",9999)
    val paris = ds.map(_.split(" ")(0)).map(x => (x,1)).reduceByKey(_+_).filter(x => x._2 > 5)

    val res = paris.updateStateByKey(updateFunction).repartition(1)
    res.saveAsTextFiles("D://124//spider01")//查詢到的結果輸出到本地目錄
    res.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  def updateFunction(newValues:Seq[Int],runningCount:Option[Int]): Option[Int] ={

    val newCounts = newValues.sum
    val sum = runningCount.getOrElse(0)
    Some(newCounts+sum)
  }
}

2.

package Test1225

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object spider02 {
//  根據上一題的結果建立一個黑名單hrdd然後禁止加入黑名單的hrdd訪問	,將ip不是114.112.141.6的留下
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ip").setMaster("local[*]")
    val ssc =new StreamingContext(conf,Seconds(10))
    ssc.sparkContext.setLogLevel("ERROR")
    val iplist = List(("114.112.141.6",1))
    val hrdd = ssc.sparkContext.parallelize(iplist)
    val ds = ssc.socketTextStream("lion",9999)
    ds.map(_.split(" ")(0)).map(x => (x,1)).transform{rdd => rdd.leftOuterJoin(hrdd).filter(x => if(x._2._2 == None)true else false)}.print(100)

    ssc.start()
    ssc.awaitTermination()


  }

}

3.

package Test1225

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object spider03 {
  //  統計某一時間段內網站訪問量
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ip").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("ERROR")
    //val iplist = List(("114.112.141.6", 1))
    //val hrdd = ssc.sparkContext.parallelize(iplist)
    val ds = ssc.socketTextStream("lion", 9999)
    ds.map(_.split(" ")(0)).map(x => (x, 1)).reduceByKey(_+_).count().print(100)

    ssc.start()
    ssc.awaitTermination()
  }
}