Spark案例實戰之一
阿新 • • 發佈:2018-12-25
一.計算最受歡迎的老師
1.專案需求:現有某網路上的訪問日誌,現需要計算某一學科下被訪問次數最多的老師。
2.網路的url如右:http://bigdata.xiaoniu.com/laozhao
bigdata表示學科,laozhao表示教師。
3.程式碼如下:
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*
1.分析最受歡迎的老師
*/
object PopularTeacher{
def main(args:Array[String]): Unit = {
val words = Array("http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://java.xiaoniu.com/laozhang",
"http://java.xiaoniu.com/laozhang",
"http://python.xiaoniu.com/laoqian" ,
"http://java.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli")
val conf = new SparkConf().setAppName("Popular").setMaster("local")
val sc = new SparkContext(conf)
//讀取資料
//val result1 :RDD [String]= sc.textFile(args(0))
val result1 = sc.parallelize(words)
val subjectAndTeacher:RDD[(String,String)] = result1.map(lines =>{
val url = new URL(lines)
println("url = "+url)
val host = new URL(lines).getHost
println("host = "+host)
val subject = host.substring(0,host.indexOf("."))//切分字串
val teacher = url.getPath.substring(1)//獲得老師的名字
(subject,teacher)//這是一個直接返回的
})//整理資料
//總的排序
val result2 = subjectAndTeacher.map(x => (x,1)) //形成 ((鍵值對),1) 這種map
val result22 = result2.reduceByKey(_+_)//根據鍵將相同的合併
//print("result22's content are:") //並行的程式,你永遠都不知道是不是按照程式的順序輸出
result22.foreach(println)
val result3: Array[((String, String), Int)] = result22.collect()
//println(result3.toBuffer)
//每個學科裡面做排序 區域性排序 按照學科的名字排序
//val result4 = result22.groupBy(_._1._1)
val result4: RDD[(String, Iterable[((String, String), Int)])] = result22.groupBy(x => x._1._1)
//二次排序
//將keys和values轉換成List型別,然後按照values排序,然後倒敘輸出,然後取前三
val result5: RDD[(String, List[((String, String), Int)])] = result4.mapValues(_.toList.sortBy(_._2).reverse.take(3))
val result = result5.collect()
result5.foreach(println)
}
}
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
/**
*1.自定義分割槽器
*2.繼承自Partitioner
*3.subjects是一個字串陣列
*
* @param subjects
*/
class SelfPartition (subjects :Array[String]) extends Partitioner{
/*當課程和分割槽之間沒有定義規則時,需要自定義規則
val rules = new mutable.HashMap[String ,Int]()
var i = 0
for (sub <- subjects){
rules += (sub -> i)
i+=1
}
*/
//直接固定map
val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接寫Map
//定義分割槽數 是個方法,而不是定義變數
override def numPartitions: Int = {
subjects.length
}
//獲取具體分割槽
override def getPartition(key: Any): Int ={
val k = key.toString
rules.getOrElse(k,0)
}
}
/**
* 1.訪問記錄儲存是一個URL,暫時用一個records = Array[String]來儲存
* 2.將records轉換成text(一個rdd)
* 3.對text進行操作,如:mapPartitions,map
* 4.將操作後的結果收集並寫出到控制檯
*/
object FavoriteTeacher{
def main (args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
val sc = new SparkContext(conf)
//儲存文字
val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://java.xiaoniu.com/laozhang",
"http://java.xiaoniu.com/laozhang",
"http://python.xiaoniu.com/laoqian",
"http://java.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli")
val text: RDD[String] = sc.parallelize(records)//轉換成rdd
print("First disposition:")
text.collect().foreach(println)
//列印結果如下:http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://java.xiaoniu.com/laozhang
// http://java.xiaoniu.com/laozhang
// http://python.xiaoniu.com/laoqian
// http://java.xiaoniu.com/laoli
// http://python.xiaoniu.com/laoli
// http://python.xiaoniu.com/laoli
/*
1.處理lines,並返回一個(String,String)元組
*/
def fun1(lines :String ): (String, String) = {
val url = new URL(lines)//將lines轉換成URL
val hostName = url.getHost//獲取host
val path = url.getPath//獲取path
val courseName = hostName.substring(0,hostName.indexOf("."))//獲取課程名
val teacherName = path.substring(1)//獲取教師的姓名
(courseName,teacherName)
}
val res1: RDD[(String, String)] = text.map(fun1)
print("Second disposition:")
res1.foreach(print)
//列印結果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
// (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
// (java,laoli)(python,laoli)(python,laoli)
val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一個map 組合
val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根據Key將每個map合併
print("Third disposition:")
res3.foreach(print)
val res4: RDD[(String, Iterable[((String, String), Int)])] = res3.groupBy(_._1._1)//根據學科來分組
res4.foreach(println)
val finRes = res4.mapValues(x => x.toList.sortBy(_._2).reverse.take(2))//對value操作!很重要
finRes.foreach(print)
// val selfPartition = new SelfPartition(records)//new 一個分割槽物件
// val res4 = res2.reduceByKey(selfPartition,_+_)
}
}
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
*1.自定義分割槽器
*2.繼承自Partitioner
*3.subjects是一個字串陣列
*
* @param subjects
*/
class SelfPartition (subjects :Array[String]) extends Partitioner{
//當課程和分割槽之間沒有定義規則時,需要自定義規則
val rules = new mutable.HashMap[String ,Int]()
var i = 0
for (sub <- subjects){
rules += (sub -> i) //將rules逐漸新增完
i+=1
}
//直接固定map
//val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接寫Map
//定義分割槽數 是個方法,而不是定義變數
override def numPartitions: Int = {
subjects.length+ 1
}
//獲取具體分割槽
override def getPartition(key: Any): Int ={
val k = key.toString
rules.getOrElse(k,0)
}
}
/**
* 1.訪問記錄儲存是一個URL,暫時用一個records = Array[String]來儲存
* 2.將records轉換成text(一個rdd)
* 3.對text進行操作,如:mapPartitions,map
* 4.將操作後的結果收集並寫出到控制檯
* 5.讓每個學科分到各自的分割槽
*/
object FavoriteTeacher{
def main (args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
val sc = new SparkContext(conf)
//儲存文字
val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://bigdata.xiaoniu.com/laozhao",
"http://java.xiaoniu.com/laozhang",
"http://java.xiaoniu.com/laozhang",
"http://python.xiaoniu.com/laoqian",
"http://java.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli",
"http://python.xiaoniu.com/laoli")
val text: RDD[String] = sc.parallelize(records)//轉換成rdd
print("First disposition:")
text.collect().foreach(println)
//列印結果如下:http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://bigdata.xiaoniu.com/laozhao
// http://java.xiaoniu.com/laozhang
// http://java.xiaoniu.com/laozhang
// http://python.xiaoniu.com/laoqian
// http://java.xiaoniu.com/laoli
// http://python.xiaoniu.com/laoli
// http://python.xiaoniu.com/laoli
/*
1.處理lines,並返回一個(String,String)元組
*/
def fun1(lines :String ): (String, String) = {
val url = new URL(lines)//將lines轉換成URL
val hostName = url.getHost//獲取host
val path = url.getPath//獲取path
val courseName = hostName.substring(0,hostName.indexOf("."))//獲取課程名
val teacherName = path.substring(1)//獲取教師的姓名
(courseName,teacherName)
}
val res1: RDD[(String, String)] = text.map(fun1)
print("Second disposition:")
res1.foreach(print)
//列印結果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
// (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
// (java,laoli)(python,laoli)(python,laoli)
val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一個map 組合
val subjects: Array[String] = res2.map(_._1._1).distinct().collect()
print("subjects = "+subjects)
val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根據Key將每個map合併
print("Third disposition:")
res3.foreach(print)
val selfPartition = new SelfPartition(subjects)
//按照自定義的規則分割槽shuffle
val res4: RDD[(String, (String, Int))] = res3.map(t => (t._1._1, (t._1._2,t._2))).partitionBy(selfPartition)
/*
* 1.分割槽中本來就是Iterator,所以在toList之後,需要再轉換成iterator
*/
val result: RDD[(String, (String, Int))] = res4.mapPartitions(_.toList.sortBy(_._2._2).reverse.take(2).iterator)
result.foreach(print)
}
}