1. 程式人生 > 實用技巧 >spark Graphx 之 Pregel

spark Graphx 之 Pregel

Pregel是Google提出的用於大規模分散式圖計算框架

  1. 圖遍歷(BFS)
  2. 單源最短路徑(SSSP)
  3. PageRank計算

Pregel的計算由一系列迭代組成,稱為supersteps

Pregel迭代過程

  1. 每個頂點從上一個superstep接收入站訊息
  2. 計算頂點新的屬性值
  3. 在下一個superstep中向相鄰的頂點發送訊息
  4. 當沒有剩餘訊息時,迭代結束

應用一、計算單源最短路徑

求從0到任意點的最短路徑(SSSP)

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

/** * 求從0到任意點的最短路徑 */ object PregelTest03 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("mytest") val sc = SparkContext.getOrCreate(conf) val vect = sc.parallelize(Array ((0L, ("Alice", 28)), (1L, ("Bob", 27)), (2L, ("Charlie", 65)), (
3L, ("David", 42)), (4L, ("Ed", 55)) )) val edges = sc.parallelize(Array( Edge(0L, 1L, 100), Edge(0L, 2L, 30), Edge(0L, 4L, 10), Edge(4L, 3L, 50), Edge(3L, 1L, 10), Edge(2L, 1L, 60), Edge(2L, 3L, 60) )) val graphx = Graph(vect,edges) // 設定起始頂點
val srcVectId = 0L // 用mapVertices修改屬性 val initialGraph = graphx.mapVertices({case (vid,(name,age))=>if (vid==srcVectId) 0.0 else Double.PositiveInfinity}) // 呼叫pregel val pregelGraph = initialGraph.pregel( Double.PositiveInfinity, //每個點的初始值,無窮大 Int.MaxValue, //最大迭代次數 EdgeDirection.Out //傳送資訊的方向 )( // 1. sendMsg條件+傳送什麼訊息 2.mergeMsg合併多條訊息 3.vprog頂點接受訊息 // 3) vprog:使用者定義函式,用於頂點接收訊息 //vprog (接受到的訊息和自己的訊息進行合併) (vid:VertexId,vd:Double,distMsg:Double)=>{ val minDist = math.min(vd,distMsg) println(s"頂點${vid},屬性${vd},收到訊息${distMsg},合併後的屬性${minDist}") minDist }, // 1) sendMsg:確定下一個迭代傳送的訊息及發往何處 //傳送訊息,如果自己的訊息+權重<目的地的訊息,則傳送 : Iterator[(VertexId, A)] (edgeTriplet:EdgeTriplet[Double,PartitionID])=>{ // 控制的意義:防止無意義的訊息傳送(無限發給無限)【詳見下圖】 if(edgeTriplet.srcAttr+edgeTriplet.attr < edgeTriplet.dstAttr){ println(s"頂點${edgeTriplet.srcId} 給頂點${edgeTriplet.dstId} 傳送訊息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
      // Iterator 可以繼續迭代
      Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr
+edgeTriplet.attr)) }else{ Iterator.empty } // println(s"頂點${edgeTriplet.srcId} 給頂點${edgeTriplet.dstId} 傳送訊息 ${edgeTriplet.srcAttr + edgeTriplet.attr}") // Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr)) }, // 2) mergeMsg:在vprog前,合併到達頂點的多個訊息 //多條接收訊息,mergeMessage,取小合併多條訊息 (msg1:Double,msg2:Double)=>math.min(msg1,msg2) ) println("===============") // 輸出結果 pregelGraph.triplets.foreach(println) println(pregelGraph.vertices.collect.mkString(",")) // 關閉資源 sc.stop() } }

應用二、求最小值