GraphX實現N度關係
阿新 • • 發佈:2019-01-03
背景
本文給出了一個簡單的計算圖中每個點的N度關係點集合的演算法,也就是N跳關係。
之前通過官方文件學習和理解了一下GraphX的計算介面。
N度關係
目標:
在N輪裡,找到某一個點的N度關係的點集合。
實現思路:
1. 準備好邊資料集,即”1 3”, “4, 1” 這樣的點關係。使用GraphLoader 的介面load成Graph
2. 初始化每個Vertice的屬性為空Map
3. 使用aggregateMessages把VerticeID和totalRounds傳播出度點上,出度點把收集到的資訊合成一個大Map
4. 更新後的Vertice與原圖進行”Join”,更新圖中的變化過的點屬性
5. 重複步驟3和4,最後輸出更新了N輪之後的有關係的Vertice
spark-shell下可執行的程式碼:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val friendsGraph = GraphLoader.edgeListFile(sc, "data/friends.txt")
val totalRounds: Int = 3 // total N round
var targetVerticeID: Long = 6 // target vertice
// round one
var roundGraph = friendsGraph.mapVertices((id, vd) => Map())
var roundVertices = roundGraph.aggregateMessages[Map[Long, Integer]](
ctx => {
if (targetVerticeID == ctx.srcId) {
// only the edge has target vertice should send msg
ctx.sendToDst(Map(ctx.srcId -> totalRounds))
}
},
_ ++ _
)
for (i <- 2 to totalRounds) {
val thisRoundGraph = roundGraph.outerJoinVertices(roundVertices){ (vid, data, opt) => opt.getOrElse(Map[Long, Integer]()) }
roundVertices = thisRoundGraph.aggregateMessages[Map[Long, Integer]](
ctx => {
val iterator = ctx.srcAttr.iterator
while (iterator.hasNext) {
val (k, v) = iterator.next
if (v > 1) {
val newV = v - 1
ctx.sendToDst(Map(k -> newV))
ctx.srcAttr.updated(k, newV)
} else {
// do output and remove this entry
}
}
},
(newAttr, oldAttr) => {
if (oldAttr.contains(newAttr.head._1)) { // optimization to reduce msg
oldAttr.updated(newAttr.head._1, 1) // stop sending this ever
} else {
oldAttr ++ newAttr
}
}
)
}
val result = roundVertices.map(_._1).collect
資料和輸出
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7
4 3
1 6
6 1
Array(6, 1, 3, 7)
總結
實現的比較naive,還有許多可以優化的地方。
全文完 :)