spark Graphx入門之 IDEA 操作
阿新 • • 發佈:2020-10-08
https://www.bookstack.cn/read/spark-graphx-source-analysis/vertex-edge-triple.md
一、基本操作
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
case class Users(userid:Long,username:String,age:Int)
object MyGraphx01 {
def main(args: Array[String]): Unit = {
// 讀所有的點和圖
val spark = SparkSession.builder().appName("app").master("local[2]").getOrCreate();
val edge = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/e.txt")
val vects = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/v.txt")
// 所有的點構成1個 ((1l,users),(2L,users))
val vectorSeq = vects.map(x => {
val arr = x.split(",")
val key = arr(0).toLong
(key, Users(arr(0).toLong, arr(1), arr(2).toInt))
})
// 所有的邊構成 ((1L,2L,同事),(2l,3l,同學))
val edgeSeq = edge.map(x => {
val arr = x.split(",")
Edge(arr(0).toLong, arr(1).toLong, arr(2))
})
// 所有的點邊構成圖結構
val graph = Graph(vectorSeq, edgeSeq).cache()
//.foreach(println)
spark.stop()
}
}
二、應用
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object MyLove {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate();
val sc = spark.sparkContext
// 建立所有的點
val points = Seq((1L,("Alice",28)),(2L,("bob",43)),(3L,("charlie",64)),(4L,("David",42)),(5L,("Ed",53)),(6L,("Fran",50)))
// 所有的邊(關係和權重)
val eds = Seq(Edge(2L,1L,7),Edge(2L,4L,2),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8)
,Edge(5L,6L,3),Edge(3L,2L,4),Edge(3L,6L,3))
val edges = sc.makeRDD(eds);
val vectices:RDD[(Long, (String, Int))] = sc.makeRDD(points)
// 建立圖
val graph = Graph(vectices, edges)
// 找到大於30歲的人
graph.vertices.filter(x=>x._2._2>=60)
.foreach(println)
// 找真愛,打電話次數大於5次
// triplets => 包含頂點和邊的屬性
// srcAttr/dstAttr => 頂點屬性
// attr => 權重
graph.triplets
.foreach(x=>println(x.srcAttr+"==>"+x.dstAttr+"=="+x.attr))
graph.triplets.filter(_.attr>5)
.foreach(x=>println(x.srcAttr._1,x.dstAttr._1))
// inDegrees => VertexRDD[VertexId,Int(有多少個入度)]
graph.inDegrees
.foreach(println(_))
println("====outdegrees====")
graph.outDegrees
.foreach(println(_))
println(graph.numVertices,graph.numEdges,graph.inDegrees,graph.outDegrees)
// 運算元:增加權重
graph.mapEdges(x=>x.attr+2).triplets.filter(x=>x.attr>5).foreach(x=>println(x.srcAttr._1,x.dstAttr._1))
}
}