Spark Graphx:如何使用Pregel
阿新 • • 發佈:2021-01-25
關於如何使用Pregel(分散式圖計算框架),這裡我們可以用一個小例題來說明。
需求說明
求從0到任意點的最短路徑(SSSP)
實現思路
初始化 Vertex 的 Message 為最大值
將源點(0)的 Message 設為 0
每步每個節點將自己目前的 Message 加上邊的權值傳送到相鄰節點,每個節點聚合出自身所有訊息的最小值
當某一步當中一個節點Message 值無變化,該節點停止迭代
具體實現
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass. getName)
.master("local[4]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
//求從0到任意點的最短路徑
val vertices: RDD[(VertexId, Double)] = sc.makeRDD(Seq((0L, 1.0), (1L, 1.0), (2L, 1.0), (3L, 1.0)))
val edges = sc.makeRDD(Seq(Edge(0L, 1L, 100), Edge(0L, 2L, 30), Edge(0L, 4L , 10), Edge(2L, 1L, 60), Edge(2L, 3L, 60), Edge(3L, 1L, 10), Edge(4L, 3L, 50)))
val graph = Graph(vertices, edges)
val sourceId: VertexId = 0L
val initGraph = graph.mapVertices((id, _) => if (id == sourceId) 0 else Double.PositiveInfinity)
val sssp = initGraph.pregel(Double.PositiveInfinity) (
//接收資料處理函式
(id, dist, newDist) => math.min(dist, newDist),
triplet => {
//判斷是否繼續傳送下一個頂點
if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
else
Iterator.empty
},
(dist1, dist2) => math.min(dist1, dist2) //合併訊息
)
println(sssp.vertices.collect().mkString("\n"))
}
//-----------------------------------------第二種-------------------------------------------------
val sourceId: VertexId = 0L
val initGraph = graph.mapVertices((id, _) =>
if (id == initialMsg) 0.0 else Double.PositiveInfinity)
def vprog(id:VertexId,dist:Double,newDist:Double)={
math.min(dist,newDist)
}
def sendMsg(triplet: EdgeTriplet[Double,Int])={
if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
else
Iterator.empty
}
def mergeMsg(a:Double,b:Double)=math.min(a,b)
initGraph.pregel(Double.PositiveInfinity)(vprog,sendMsg,mergeMsg)
.vertices.repartition(1).foreach(x=>println(x.toString()))