1. 程式人生 > >SparkStreaming--小案例2對於爬蟲來的資料進行分析

SparkStreaming--小案例2對於爬蟲來的資料進行分析

請注意本部落格中程式碼頭和尾是固定模式,而lines是需要根據你的資料特點進行切分和整理的,我會附上我的一部分資料供參考,附在文件末尾。

1.統計某一時間段輸入資料出現次數(時間不斷更新)這不是爬蟲資料分析是一個熱身

package Test1226

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
//統計某一時間段資料出現次數(時間不斷更新)
object test01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123//12262")
    val lines = ssc.socketTextStream("lion", 2222)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val res = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(20), Seconds(10))
    res.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

2.統計哪個網站訪問量最多

package Test1226

import Test1225.Spider01.updateFunction
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object test02 {
//  統計哪個網站訪問量最多
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123//122602")
    val lines = ssc.socketTextStream("lion", 2222)
    lines.map(x => x.split(" ")(6).split("\\?")(0)+" "+x.split(" ")(0)).map(x => (x,1)).reduceByKey(_+_)
      .map(x => x._1.split(" ")(0)).countByValue().map(x => (x._2,x._1))
      .transform(rdd => rdd.sortByKey(false)).map(x => (x._2,x._1))print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

3.統計網站某模組訪問量降序排序

package Test1226

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object test03 {
//  統計網站某模組訪問量降序排序
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123//122602")
    val lines = ssc.socketTextStream("lion", 3333)
    lines.map(x => (x.split(" ")(6).split("\\?")(0),x.split(" ")(9).toInt)).reduceByKey(_ + _)
        .map(x => (x._2,x._1)).transform(rdd => rdd.sortByKey(false)).map(x =>(x._2,x._1)).print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

4.統計非200的報錯訪問量(200為正常訪問)

package Test1226

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object test04 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123//122602")
    val lines = ssc.socketTextStream("lion", 3333)
//    統計非200的報錯訪問量
    lines.filter(x => if(x.split(" ").length <= 8) false else true)
    lines.map(x => (x.split(" ")(6).split("\\?")(0)+" "+x.split(" ")(8),1))
      .filter(x => if(x._1.split(" ")(1)=="200")false else true).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

5.統計ueragent的數量(最後一對雙引號中的內容)

package Test1226

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object test05 {
//統計那個useragent數量
def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.sparkContext.setLogLevel("ERROR")
  ssc.checkpoint("d://123//122602")
  val lines = ssc.socketTextStream("lion", 3333)
  lines.map(_.split("\"")(5)).map((_,1)).reduceByKey(_+_).print()
  ssc.start()
  ssc.awaitTermination()
  ssc.stop()
}
}

6.統計前一分鐘的網站總訪問量

package Test1226

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object test06 {
//統計前一分鐘的訪問量
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("nwc").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("d://123//12262")
    val lines = ssc.socketTextStream("lion", 2222)
    lines.window(Seconds(60)).count().print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

所用部分資料:

120.197.87.216 - - [04/Jan/2012:00:00:02 +0800] "GET /home.php?mod=space&uid=563413&mobile=yes HTTP/1.1" 200 3388 "-" "-"
123.126.50.73 - - [04/Jan/2012:00:00:02 +0800] "GET /thread-679411-1-1.html HTTP/1.1" 200 5251 "-" "Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)"
203.208.60.187 - - [04/Jan/2012:00:00:02 +0800] "GET /archiver/tid-3003.html HTTP/1.1" 200 2056 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /ctp080113.php?action=getgold HTTP/1.1" 200 13886 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /ctp080113.php?action=getmedal HTTP/1.1" 200 13882 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
110.6.179.88 - - [04/Jan/2012:00:00:02 +0800] "GET /forum.php?mod=attachment&aid=NTczNzU3fDFjNDdjZTgzfDEzMjI4NzgwMDV8MTMzOTc4MDB8MTEwMTcxMA%3D%3D&mobile=no HTTP/1.1" 200 172 "http://www.itpub.net/forum.php?mod=attachment&aid=NTczNzU3fDFjNDdjZTgzfDEzMjI4NzgwMDV8MTMzOTc4MDB8MTEwMTcxMA%3D%3D&mobile=yes" "Mozilla/5.0 (Linux; U; Android 2.2; zh-cn; ZTE-U V880 Build/FRF91) UC AppleWebKit/530+ (KHTML, like Gecko) Mobile Safari/530"
116.205.130.2 - - [04/Jan/2012:00:00:02 +0800] "GET /popwin_js.php?fid=6 HTTP/1.1" 200 32 "http://www.itpub.net/forum-6-1.html?ts=28" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; QQDownload 702; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; AskTbPTV/5.11.3.15590; .NET4.0E)"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /popwin_js.php?fid=133 HTTP/1.1" 200 11 "http://www.itpub.net/thread-1558574-3-9.html" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /ctp080113.php?tid=1558574 HTTP/1.1" 200 5 "http://www.itpub.net/thread-1558574-3-9.html" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
110.75.173.35 - - [04/Jan/2012:00:00:02 +0800] "GET /forum.php?goto=lastpost&mod=redirect&tid=1380214 HTTP/1.1" 302 5 "-" "Yahoo! Slurp China"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /popwin_js.php?fid=133 HTTP/1.1" 200 11 "http://www.itpub.net/thread-1554759-4-10.html" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
114.112.141.6 - - [04/Jan/2012:00:00:02 +0800] "GET /ctp080113.php?tid=1554759 HTTP/1.1" 200 5 "http://www.itpub.net/thread-1554759-4-10.html" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; InfoPath.3; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)"
120.197.87.220 - - [04/Jan/2012:00:00:02 +0800] "GET /forum.php?mod=viewthread&tid=692703&extra=&page=2&mobile=yes HTTP/1.1" 200 4903 "-" "-"