1. 程式人生 > 其它 >Spark Graphx:如何使用Pregel

Spark Graphx:如何使用Pregel

技術標籤:sparksparkgraphx

關於如何使用Pregel(分散式圖計算框架),這裡我們可以用一個小例題來說明。

需求說明

求從0到任意點的最短路徑(SSSP)
在這裡插入圖片描述

實現思路

初始化 Vertex 的 Message 為最大值
將源點(0)的 Message 設為 0
每步每個節點將自己目前的 Message 加上邊的權值傳送到相鄰節點,每個節點聚合出自身所有訊息的最小值
當某一步當中一個節點Message 值無變化,該節點停止迭代

具體實現

val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.
getName) .master("local[4]") .getOrCreate() val sc: SparkContext = spark.sparkContext //求從0到任意點的最短路徑 val vertices: RDD[(VertexId, Double)] = sc.makeRDD(Seq((0L, 1.0), (1L, 1.0), (2L, 1.0), (3L, 1.0))) val edges = sc.makeRDD(Seq(Edge(0L, 1L, 100), Edge(0L, 2L, 30), Edge(0L, 4L
, 10), Edge(2L, 1L, 60), Edge(2L, 3L, 60), Edge(3L, 1L, 10), Edge(4L, 3L, 50))) val graph = Graph(vertices, edges) val sourceId: VertexId = 0L val initGraph = graph.mapVertices((id, _) => if (id == sourceId) 0 else Double.PositiveInfinity) val sssp = initGraph.pregel(Double.PositiveInfinity)
( //接收資料處理函式 (id, dist, newDist) => math.min(dist, newDist), triplet => { //判斷是否繼續傳送下一個頂點 if (triplet.srcAttr + triplet.attr < triplet.dstAttr) Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) else Iterator.empty }, (dist1, dist2) => math.min(dist1, dist2) //合併訊息 ) println(sssp.vertices.collect().mkString("\n")) } //-----------------------------------------第二種------------------------------------------------- val sourceId: VertexId = 0L val initGraph = graph.mapVertices((id, _) => if (id == initialMsg) 0.0 else Double.PositiveInfinity) def vprog(id:VertexId,dist:Double,newDist:Double)={ math.min(dist,newDist) } def sendMsg(triplet: EdgeTriplet[Double,Int])={ if (triplet.srcAttr + triplet.attr < triplet.dstAttr) Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) else Iterator.empty } def mergeMsg(a:Double,b:Double)=math.min(a,b) initGraph.pregel(Double.PositiveInfinity)(vprog,sendMsg,mergeMsg) .vertices.repartition(1).foreach(x=>println(x.toString()))