1. 程式人生 > 其它 >Spark GraphX-航班飛行網圖分析

Spark GraphX-航班飛行網圖分析

技術標籤:Sparksparkgraphx

文章目錄

1、如下圖所示的航班表,解決以下問題:

  • 統計航班飛行網圖中機場的數量
  • 統計航班飛行網圖中航線的數量
  • 計算最長的飛行航線(Point to Point)
  • 找出最繁忙的機場
  • 找出最重要的飛行航線(PageRank)
  • 找出最便宜的飛行航線(SSSP)
    在這裡插入圖片描述

2、思路如下:

  • 根據機場資訊和航線資訊,構建Graph
  • 直接使用Graph的numVertices、numEdges方法求出機場和航線的數量
  • 最長航線,即為點邊點三元檢視的資訊,直接對邊的屬性進行排序取第一個即可
  • 最繁忙機場,即為圖的入度和出度最高的節點inDegrees,outdegrees
  • 找出最重要的飛行航線,使用PageRank求出圖中每個節點的權重,並join上機場RDD排序即可
  • 找出最便宜的飛行航線,使用Pregel函式,找出最短距離即可

3、程式碼如下:

    val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate()
    val sc = spark.sparkContext
    val f = sc.textFile
("D:\\JavaProjects\\ClassStudy\\Scala\\sparkdemo\\files\\08filght\\USA Flight Datset - Spark Tutorial - Edureka.csv") //把表頭取出,然後過濾不等於表頭的,即可去掉表頭 val header = f.first() val flightsRDD = f.filter(!_.equals(header)).map(x => x.split(",")) //使用Spark GraphX完成下列任務 //探索航班飛行網圖資料
//構建航班飛行網圖 val airports = flightsRDD.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))).distinct() val lines = flightsRDD.map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct.map(x => Edge(x._1, x._2, x._3)) val nowhere = "nowhere" val graph = Graph(airports, lines, nowhere) //統計航班飛行網圖中機場的數量 val numAirports = graph.numVertices //統計航班飛行網圖中航線的數量 val numLines = graph.numEdges //計算最長的飛行航線(Point to Point) val maxLine = graph.triplets.sortBy(_.attr, false).map(x => "出發點:%s,目的地:%s,距離:%d".format(x.srcAttr, x.dstAttr, x.attr)).take(1)(0) println(s"機場的數量:${numAirports},航線的數量:${numLines},最長的飛行航線:${maxLine}") //找出最繁忙的機場 val (apId1, indegrees) = graph.inDegrees.sortBy(_._2, false).take(1)(0) airports.filter { case (id, name) => id == apId1 }.collect { case (id, name) => "入度最高的機場為 %s ,入度為 %d".format(name, indegrees) }.foreach(println(_)) val (apId2, outdegrees) = graph.outDegrees.sortBy(_._2, false).take(1)(0) airports.filter { case (id, name) => id == apId2 }.collect { case (id, name) => "出度最高的機場為 %s ,出度為 %d".format(name, outdegrees) }.foreach(println(_)) //找出最重要的飛行航線(PageRank) graph.pageRank(0.005).vertices.join(airports) .sortBy(_._2._1, false).map(x => (x._2._2, x._2._1)).take(3) .foreach(println(_)) //找出最便宜的飛行航線(SSSP) //Pricing Model: price = 180.0 + distance * 0.15 val sourceId = 0L val initGrapg = graph.mapVertices((id, _) => if (id == sourceId) 0 else Double.PositiveInfinity) //Pregel傳入三個函式: // 1、定義接收訊息的處理方法 // 2、定義傳送訊息的處理方法 // 3、定義接收多條訊息的合併方法 val sssp = initGrapg.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist),triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) else Iterator.empty },(dist1, dist2) => math.min(dist1, dist2) ) sssp.triplets.sortBy(_.attr, true).take(1).foreach(x => println(s"最便宜價格為:${x.attr * 0.15 + 180.0},距離為:${x.attr}"))

4、程式碼如下:

在這裡插入圖片描述