1. 程式人生 > >spark計算使用者訪問學科子網頁的top3

spark計算使用者訪問學科子網頁的top3

專案說明:附件為要計算資料的demo。點選開啟連結

利用spark的快取機制,讀取需要篩選的資料,自定義一個分割槽器,將不同的學科資料分別放到一個分割槽器中,並且根據指定的學科,取出點選量前三的資料,並寫入檔案。

具體程式如下:

1、專案主程式:

  1. package cn.allengao.Location
  2. import java.net.URL
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
  5. /**
  6. * class_name:
  7. * package:
  8. * describe: 快取機制,自定義一個分割槽器,根據指定的學科, 取出點選量前三的,按照每種學科資料放到不同的分割槽器裡
  9. * creat_user: Allen Gao
  10. * creat_date: 2018/1/30
  11. * creat_time: 11:21
  12. **/
  13. object AdvUrlCount {
  14. def main(args: Array[String]) {
  15. //從資料庫中載入規則
  16. // val arr = Array("java.learn.com", "php.learn.com", "net.learn.com")
  17. val conf = new SparkConf().setAppName("AdvUrlCount").setMaster("local[2]")
  18. val sc = new SparkContext(conf)
  19. //獲取資料
  20. val file = sc.textFile("j://information/learn.log")
  21. //提取出url並生成一個元祖,rdd1將資料切分,元組中放的是(URL, 1)
  22. val urlAndOne = file.map(line => {
  23. val fields = line.split("\t")
  24. val url = fields(1)
  25. (url, 1)
  26. })
  27. //把相同的url進行聚合
  28. val sumedUrl = urlAndOne.reduceByKey(_ + _)
  29. //獲取學科資訊快取,提高執行效率
  30. val cachedProject = sumedUrl.map(x => {
  31. val url = x._1
  32. val project = new URL(url).getHost
  33. val count = x._2
  34. (project, (url, count))
  35. }).cache()
  36. //呼叫Spark自帶的分割槽器此時會發生雜湊碰撞,會有資料傾斜問題產生,需要自定義分割槽器
  37. // val res = cachedProject.partitionBy(new HashPartitioner(3))
  38. // res.saveAsTextFile("j://information//out")
  39. //得到所有學科
  40. val projects = cachedProject.keys.distinct().collect()
  41. //呼叫自定義分割槽器並得到分割槽號
  42. val partitioner = new ProjectPartitioner(projects)
  43. //分割槽
  44. val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
  45. //對每個分割槽的資料進行排序並取top3
  46. val res = partitioned.mapPartitions(it => {
  47. it.toList.sortBy(_._2._2).reverse.take(3).iterator
  48. })
  49. res.saveAsTextFile("j://information//out1")
  50. sc.stop()
  51. }
  52. }
2、自定義分割槽器:
  1. package cn.allengao.Location
  2. import org.apache.spark.Partitioner
  3. import scala.collection.mutable
  4. classProjectPartitioner(projects: Array[String]) extendsPartitioner{
  5. //用來存放學科和分割槽號
  6. private val projectsAndPartNum = new mutable.HashMap[String,Int]()
  7. //計數器,用於指定分割槽號
  8. var n = 0
  9. for(pro<-projects){
  10. projectsAndPartNum += (pro -> n)
  11. n += 1
  12. }
  13. //得到分割槽數
  14. override def numPartitions = projects.length
  15. //得到分割槽號
  16. override def getPartition(key: Any) = {
  17. projectsAndPartNum.getOrElse(key.toString,0)
  18. }
  19. }
執行結果: