Spark GraphX-航班飛行網圖分析
阿新 • • 發佈:2021-01-25
文章目錄
1、如下圖所示的航班表,解決以下問題:
- 統計航班飛行網圖中機場的數量
- 統計航班飛行網圖中航線的數量
- 計算最長的飛行航線(Point to Point)
- 找出最繁忙的機場
- 找出最重要的飛行航線(PageRank)
- 找出最便宜的飛行航線(SSSP)
2、思路如下:
- 根據機場資訊和航線資訊,構建Graph
- 直接使用Graph的numVertices、numEdges方法求出機場和航線的數量
- 最長航線,即為點邊點三元檢視的資訊,直接對邊的屬性進行排序取第一個即可
- 最繁忙機場,即為圖的入度和出度最高的節點inDegrees,outdegrees
- 找出最重要的飛行航線,使用PageRank求出圖中每個節點的權重,並join上機場RDD排序即可
- 找出最便宜的飛行航線,使用Pregel函式,找出最短距離即可
3、程式碼如下:
val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
val sc = spark.sparkContext
val f = sc.textFile ("D:\\JavaProjects\\ClassStudy\\Scala\\sparkdemo\\files\\08filght\\USA Flight Datset - Spark Tutorial - Edureka.csv")
//把表頭取出,然後過濾不等於表頭的,即可去掉表頭
val header = f.first()
val flightsRDD = f.filter(!_.equals(header)).map(x => x.split(","))
//使用Spark GraphX完成下列任務
//探索航班飛行網圖資料
//構建航班飛行網圖
val airports = flightsRDD.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))).distinct()
val lines = flightsRDD.map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct.map(x => Edge(x._1, x._2, x._3))
val nowhere = "nowhere"
val graph = Graph(airports, lines, nowhere)
//統計航班飛行網圖中機場的數量
val numAirports = graph.numVertices
//統計航班飛行網圖中航線的數量
val numLines = graph.numEdges
//計算最長的飛行航線(Point to Point)
val maxLine = graph.triplets.sortBy(_.attr, false).map(x => "出發點:%s,目的地:%s,距離:%d".format(x.srcAttr, x.dstAttr, x.attr)).take(1)(0)
println(s"機場的數量:${numAirports},航線的數量:${numLines},最長的飛行航線:${maxLine}")
//找出最繁忙的機場
val (apId1, indegrees) = graph.inDegrees.sortBy(_._2, false).take(1)(0)
airports.filter {
case (id, name) => id == apId1
}.collect {
case (id, name) => "入度最高的機場為 %s ,入度為 %d".format(name, indegrees)
}.foreach(println(_))
val (apId2, outdegrees) = graph.outDegrees.sortBy(_._2, false).take(1)(0)
airports.filter {
case (id, name) => id == apId2
}.collect {
case (id, name) => "出度最高的機場為 %s ,出度為 %d".format(name, outdegrees)
}.foreach(println(_))
//找出最重要的飛行航線(PageRank)
graph.pageRank(0.005).vertices.join(airports)
.sortBy(_._2._1, false).map(x => (x._2._2, x._2._1)).take(3)
.foreach(println(_))
//找出最便宜的飛行航線(SSSP)
//Pricing Model: price = 180.0 + distance * 0.15
val sourceId = 0L
val initGrapg = graph.mapVertices((id, _) => if (id == sourceId) 0 else Double.PositiveInfinity)
//Pregel傳入三個函式:
// 1、定義接收訊息的處理方法
// 2、定義傳送訊息的處理方法
// 3、定義接收多條訊息的合併方法
val sssp = initGrapg.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist),triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
else
Iterator.empty
},(dist1, dist2) => math.min(dist1, dist2)
)
sssp.triplets.sortBy(_.attr, true).take(1).foreach(x => println(s"最便宜價格為:${x.attr * 0.15 + 180.0},距離為:${x.attr}"))