1. 程式人生 > 實用技巧 >spark Graphx入門之 IDEA 操作

spark Graphx入門之 IDEA 操作

https://www.bookstack.cn/read/spark-graphx-source-analysis/vertex-edge-triple.md

一、基本操作

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession

case class Users(userid:Long,username:String,age:Int)
object MyGraphx01 {
  def main(args: Array[String]): Unit = {
    // 讀所有的點和圖
    val spark = SparkSession.builder().appName("app").master("local[2]").getOrCreate();
    val edge 
= spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/e.txt") val vects = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/v.txt") // 所有的點構成1個 ((1l,users),(2L,users)) val vectorSeq = vects.map(x => { val arr
= x.split(",") val key = arr(0).toLong (key, Users(arr(0).toLong, arr(1), arr(2).toInt)) }) // 所有的邊構成 ((1L,2L,同事),(2l,3l,同學)) val edgeSeq = edge.map(x => { val arr = x.split(",") Edge(arr(0).toLong, arr(1).toLong, arr(2)) }) // 所有的點邊構成圖結構 val graph = Graph(vectorSeq, edgeSeq).cache()
//.foreach(println) spark.stop() } }

二、應用

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object MyLove {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate();
    val sc = spark.sparkContext
    // 建立所有的點
    val points = Seq((1L,("Alice",28)),(2L,("bob",43)),(3L,("charlie",64)),(4L,("David",42)),(5L,("Ed",53)),(6L,("Fran",50)))
    // 所有的邊(關係和權重)
    val eds = Seq(Edge(2L,1L,7),Edge(2L,4L,2),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8)
      ,Edge(5L,6L,3),Edge(3L,2L,4),Edge(3L,6L,3))
    val edges = sc.makeRDD(eds);
    val vectices:RDD[(Long, (String, Int))] = sc.makeRDD(points)
    // 建立圖
    val graph = Graph(vectices, edges)
    // 找到大於30歲的人
    graph.vertices.filter(x=>x._2._2>=60)
     .foreach(println)
    // 找真愛,打電話次數大於5次
    // triplets => 包含頂點和邊的屬性
    // srcAttr/dstAttr => 頂點屬性
    // attr => 權重
    graph.triplets
      .foreach(x=>println(x.srcAttr+"==>"+x.dstAttr+"=="+x.attr))
    graph.triplets.filter(_.attr>5)
     .foreach(x=>println(x.srcAttr._1,x.dstAttr._1))

    // inDegrees => VertexRDD[VertexId,Int(有多少個入度)]
    graph.inDegrees
      .foreach(println(_))
    println("====outdegrees====")
    graph.outDegrees
      .foreach(println(_))
   println(graph.numVertices,graph.numEdges,graph.inDegrees,graph.outDegrees)
    // 運算元:增加權重
    graph.mapEdges(x=>x.attr+2).triplets.filter(x=>x.attr>5).foreach(x=>println(x.srcAttr._1,x.dstAttr._1))

  }
}