1. 程式人生 > 實用技巧 >Spark學習--SparkCore實戰案例

Spark學習--SparkCore實戰案例

案例一:詞頻統計

要求:統計Harry Potter.txt檔案中出現最多單詞前十位

內容樣例:

def WordCount(): Unit ={
    val conf=new SparkConf().setMaster("local[6]").setAppName("wordCount")
    val sc=new SparkContext(conf)

    val result=sc.textFile("dataset/HarryPotter.txt")
      .flatMap(item=>item.split(" "))
      .filter(item=>StringUtils.isNotEmpty(item))
      .map(item
=>(item,1)) .reduceByKey((curr,agg)=>curr+agg) .sortBy(item=>item._2,ascending = false) .map(item=>s"${item._1},${item._2}") .take(10) result.foreach(println(_)) }

結果:

案例二:日誌資訊統計

要求:統計某一日誌檔案裡出現的IP的次數Top10,最多,最少

內容樣例:

def logIpTop10(): Unit ={
    val conf=new
SparkConf().setMaster("local[6]").setAppName("sparkCoreTest") val sc=new SparkContext(conf) sc.setCheckpointDir("checkpoint") val result=sc.textFile("dataset/access_log_sample.txt") .map(item=>(item.split(" ")(0),1)) .filter(item=>StringUtils.isNoneEmpty(item._1)) .reduceByKey((curr,agg)
=>curr+agg) .cache() result.checkpoint() val top10=result.sortBy(item => item._2, ascending = false).take(10) top10.foreach(println(_)) val max=result.sortBy(item => item._2, ascending = false).first() val min=result.sortBy(item => item._2, ascending = true).first() println("max:"+max+" min:"+min) }

結果:

案例三:學生成績統計

要求:統計學生數,課程數,學生平均成績

內容樣例:

def stuGrade(): Unit ={
    val conf=new SparkConf().setMaster("local[6]").setAppName("sparkCoreTest")
    val sc=new SparkContext(conf)
    val stu1=sc.textFile("dataset/stu1.txt")
    val stu2=sc.textFile("dataset/stu2.txt")
    val stu=stu1.union(stu2)


    val stuNum=stu.map(item=>(item.split(",")(0),(item.split(",")(1),item.split(",")(2))))
      .groupByKey()
      .count()

    val courseNum=stu.map(item=>(item.split(",")(1),(item.split(",")(0),item.split(",")(2))))
      .groupByKey()
      .count()

    println("學生數:"+stuNum+" 課程數:"+courseNum)

  val result=stu.map(item=>(item.split(",")(0),item.split(",")(2).toDouble))
    .combineByKey(
      createCombiner = (curr: Double) => (curr, 1),
      mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
      mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
    )
    .map(item=>(item._1,item._2._1/item._2._2))
    .collect()
    result.foreach(println(_))
  }

結果:

案例四:統計某省PM

要求:按年月統計某省PM總數

內容樣例:

def pmProcess(): Unit ={
    val conf=new SparkConf().setMaster("local[6]").setAppName("sparkCoreTest")
    val sc=new SparkContext(conf)
    val source = sc.textFile("dataset/pmTest.csv")
    val result = source.map( item => ((item.split(",")(1), item.split(",")(2)), item.split(",")(6)) )
      .filter( item => StringUtils.isNotEmpty(item._2) && ! item._2.equalsIgnoreCase("NA") )
      .map( item => (item._1, item._2.toInt) )
      .reduceByKey( (curr, agg) => curr + agg )
      .sortBy( item => item._2, ascending = false)
      .map(item=> s"${item._1._1},${item._1._2},${item._2}")
      .collect()
    result.foreach(println(_))
  }

結果: