1. 程式人生 > >Spark案例實戰之一

Spark案例實戰之一

一.計算最受歡迎的老師

1.專案需求:現有某網路上的訪問日誌,現需要計算某一學科下被訪問次數最多的老師。
2.網路的url如右:http://bigdata.xiaoniu.com/laozhaobigdata表示學科,laozhao表示教師。
3.程式碼如下:

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/*
1.分析最受歡迎的老師
 */
object PopularTeacher{
  def main(args:Array[String]): Unit = {
    val
words = Array("http://bigdata.xiaoniu.com/laozhao", "http://bigdata.xiaoniu.com/laozhao", "http://bigdata.xiaoniu.com/laozhao", "http://bigdata.xiaoniu.com/laozhao", "http://bigdata.xiaoniu.com/laozhao", "http://java.xiaoniu.com/laozhang", "http://java.xiaoniu.com/laozhang", "http://python.xiaoniu.com/laoqian"
, "http://java.xiaoniu.com/laoli", "http://python.xiaoniu.com/laoli", "http://python.xiaoniu.com/laoli") val conf = new SparkConf().setAppName("Popular").setMaster("local") val sc = new SparkContext(conf) //讀取資料 //val result1 :RDD [String]= sc.textFile(args(0)) val result1 = sc.parallelize(words) val
subjectAndTeacher:RDD[(String,String)] = result1.map(lines =>{ val url = new URL(lines) println("url = "+url) val host = new URL(lines).getHost println("host = "+host) val subject = host.substring(0,host.indexOf("."))//切分字串 val teacher = url.getPath.substring(1)//獲得老師的名字 (subject,teacher)//這是一個直接返回的 })//整理資料 //總的排序 val result2 = subjectAndTeacher.map(x => (x,1)) //形成 ((鍵值對),1) 這種map val result22 = result2.reduceByKey(_+_)//根據鍵將相同的合併 //print("result22's content are:") //並行的程式,你永遠都不知道是不是按照程式的順序輸出 result22.foreach(println) val result3: Array[((String, String), Int)] = result22.collect() //println(result3.toBuffer) //每個學科裡面做排序 區域性排序 按照學科的名字排序 //val result4 = result22.groupBy(_._1._1) val result4: RDD[(String, Iterable[((String, String), Int)])] = result22.groupBy(x => x._1._1) //二次排序 //將keys和values轉換成List型別,然後按照values排序,然後倒敘輸出,然後取前三 val result5: RDD[(String, List[((String, String), Int)])] = result4.mapValues(_.toList.sortBy(_._2).reverse.take(3)) val result = result5.collect() result5.foreach(println) } }
import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
  *1.自定義分割槽器
  *2.繼承自Partitioner
  *3.subjects是一個字串陣列
  *
   * @param subjects
  */
class SelfPartition (subjects :Array[String]) extends Partitioner{
 /*當課程和分割槽之間沒有定義規則時,需要自定義規則
 val rules = new mutable.HashMap[String ,Int]()
 var i = 0
  for (sub <- subjects){
    rules += (sub -> i)
    i+=1
  }
  */
  //直接固定map
  val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接寫Map

  //定義分割槽數    是個方法,而不是定義變數
  override def numPartitions: Int = {
    subjects.length
  }

  //獲取具體分割槽
  override def getPartition(key: Any): Int ={
    val k = key.toString
    rules.getOrElse(k,0)
  }
}

/**
  * 1.訪問記錄儲存是一個URL,暫時用一個records = Array[String]來儲存
  * 2.將records轉換成text(一個rdd)
  * 3.對text進行操作,如:mapPartitions,map
  * 4.將操作後的結果收集並寫出到控制檯
  */


object FavoriteTeacher{
  def main (args:Array[String]): Unit ={
    val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
    val sc = new SparkContext(conf)

    //儲存文字
    val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://java.xiaoniu.com/laozhang",
      "http://java.xiaoniu.com/laozhang",
      "http://python.xiaoniu.com/laoqian",
      "http://java.xiaoniu.com/laoli",
      "http://python.xiaoniu.com/laoli",
      "http://python.xiaoniu.com/laoli")
    val text: RDD[String] = sc.parallelize(records)//轉換成rdd
    print("First disposition:")
    text.collect().foreach(println)
    //列印結果如下:http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://java.xiaoniu.com/laozhang
//    http://java.xiaoniu.com/laozhang
//    http://python.xiaoniu.com/laoqian
//    http://java.xiaoniu.com/laoli
//    http://python.xiaoniu.com/laoli
//    http://python.xiaoniu.com/laoli

    /*
      1.處理lines,並返回一個(String,String)元組
     */
    def fun1(lines :String ): (String, String) = {
      val url = new URL(lines)//將lines轉換成URL
      val hostName = url.getHost//獲取host
      val path = url.getPath//獲取path
      val courseName = hostName.substring(0,hostName.indexOf("."))//獲取課程名
      val teacherName = path.substring(1)//獲取教師的姓名
      (courseName,teacherName)
    }
    val res1: RDD[(String, String)] = text.map(fun1)
    print("Second disposition:")
    res1.foreach(print)
    //列印結果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
    // (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
    // (java,laoli)(python,laoli)(python,laoli)


    val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一個map 組合
    val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根據Key將每個map合併
    print("Third disposition:")
    res3.foreach(print)
    val res4: RDD[(String, Iterable[((String, String), Int)])] = res3.groupBy(_._1._1)//根據學科來分組
    res4.foreach(println)
    val finRes  = res4.mapValues(x => x.toList.sortBy(_._2).reverse.take(2))//對value操作!很重要
    finRes.foreach(print)

//    val selfPartition = new SelfPartition(records)//new 一個分割槽物件
//    val res4 = res2.reduceByKey(selfPartition,_+_)
  }
}
import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
  *1.自定義分割槽器
  *2.繼承自Partitioner
  *3.subjects是一個字串陣列
  *
   * @param subjects
  */
class SelfPartition (subjects :Array[String]) extends Partitioner{
 //當課程和分割槽之間沒有定義規則時,需要自定義規則
 val rules = new mutable.HashMap[String ,Int]()
 var i = 0
  for (sub <- subjects){
    rules += (sub -> i)  //將rules逐漸新增完
    i+=1
  }

  //直接固定map
  //val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接寫Map

  //定義分割槽數    是個方法,而不是定義變數
  override def numPartitions: Int = {
    subjects.length+   1
  }

  //獲取具體分割槽
  override def getPartition(key: Any): Int ={
    val k = key.toString
    rules.getOrElse(k,0)
  }
}

/**
  * 1.訪問記錄儲存是一個URL,暫時用一個records = Array[String]來儲存
  * 2.將records轉換成text(一個rdd)
  * 3.對text進行操作,如:mapPartitions,map
  * 4.將操作後的結果收集並寫出到控制檯
  * 5.讓每個學科分到各自的分割槽
  */


object FavoriteTeacher{
  def main (args:Array[String]): Unit ={
    val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
    val sc = new SparkContext(conf)

    //儲存文字
    val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://bigdata.xiaoniu.com/laozhao",
      "http://java.xiaoniu.com/laozhang",
      "http://java.xiaoniu.com/laozhang",
      "http://python.xiaoniu.com/laoqian",
      "http://java.xiaoniu.com/laoli",
      "http://python.xiaoniu.com/laoli",
      "http://python.xiaoniu.com/laoli")
    val text: RDD[String] = sc.parallelize(records)//轉換成rdd
    print("First disposition:")
    text.collect().foreach(println)
    //列印結果如下:http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://bigdata.xiaoniu.com/laozhao
//    http://java.xiaoniu.com/laozhang
//    http://java.xiaoniu.com/laozhang
//    http://python.xiaoniu.com/laoqian
//    http://java.xiaoniu.com/laoli
//    http://python.xiaoniu.com/laoli
//    http://python.xiaoniu.com/laoli

    /*
      1.處理lines,並返回一個(String,String)元組
     */
    def fun1(lines :String ): (String, String) = {
      val url = new URL(lines)//將lines轉換成URL
      val hostName = url.getHost//獲取host
      val path = url.getPath//獲取path
      val courseName = hostName.substring(0,hostName.indexOf("."))//獲取課程名
      val teacherName = path.substring(1)//獲取教師的姓名
      (courseName,teacherName)
    }
    val res1: RDD[(String, String)] = text.map(fun1)
    print("Second disposition:")
    res1.foreach(print)
    //列印結果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
    // (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
    // (java,laoli)(python,laoli)(python,laoli)


    val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一個map 組合
    val subjects: Array[String] = res2.map(_._1._1).distinct().collect()

    print("subjects = "+subjects)


    val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根據Key將每個map合併
    print("Third disposition:")
    res3.foreach(print)

    val selfPartition = new SelfPartition(subjects)

    //按照自定義的規則分割槽shuffle
     val res4: RDD[(String, (String, Int))] = res3.map(t => (t._1._1, (t._1._2,t._2))).partitionBy(selfPartition)

    /*
      * 1.分割槽中本來就是Iterator,所以在toList之後,需要再轉換成iterator
      */
    val result: RDD[(String, (String, Int))] = res4.mapPartitions(_.toList.sortBy(_._2._2).reverse.take(2).iterator)
    result.foreach(print)
  }
}