Spark Graphx 進行團伙的識別(community detection)
阿新 • • 發佈:2019-02-07
最近在使用Spark Graphx,拿Graphx做了點實驗。對大規模圖常見的分析方法有連通圖挖掘,團伙挖掘等。在金融科技領域,尤其風控領域,會有各種重要的關聯網路,並且這種網路圖十分龐大。 所以,Spark Graphx這種分散式計算框架十分適合這種場景。下面以裝置間關聯網路(節點數億級別)為例,採用Graphx做一個裝置團伙挖掘demo。團伙識別的演算法採用的是Graphx自帶的LabelPropagation演算法。
下面的是Graphx示例程式碼(僅僅是demo):
其中輸入檔案格式:
AB weight
備註(A,B 代表裝置id,String型別,weight:int,關聯代表權重)
因為Graphx節點型別只支援Long,不支援String,所以,需要進行相應的轉換,這裡用到的廣播變數進行idmap。
github連結: https://github.com/dylan-fan/spark_graphx_community_detection
package com.org.test import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ import scala.collection.mutable.Set object DeviceCom { def main(args: Array[String]) { if (args.length < 3) { println("usage: spark-submit com.org.test.DeviceCom <input> <output> <iternum>") System.exit(1) } val conf = new SparkConf() conf.setAppName("DeviceCom-" + System.getenv("USER")) val sc = new SparkContext(conf) val input = args(0) val output = args(1) val iternum = args(2).toInt val vids = sc.textFile(input) .flatMap(line => line.split("\t").take(2)) .distinct .zipWithUniqueId() .map(x => (x._1, x._2.toLong)) val vids_map = sc.broadcast(vids.collectAsMap()) val vids_rdd = vids.map { case (username, userid) => (userid, username) } val raw_edge = sc.textFile(input) .map(line => line.split("\t")) val col = raw_edge.collect() val edges_rdd = sc.parallelize(col.map { case (x) => (vids_map.value(x(0)), vids_map.value(x(1))) }) val g = Graph.fromEdgeTuples(edges_rdd, 1) val lp = lib.LabelPropagation.run(g, iternum).vertices val LpByUsername = vids_rdd.join(lp).map { case (id, (username, label)) => (username, label) } LpByUsername.map(x => x._1 + "\t" + x._2).saveAsTextFile(output) sc.stop() } }
這裡,只是採用Graphx做個demo(很簡單啦),來測試Graphx在當前資料量級下的相關效能。實際裝置團伙挖掘會更復雜,涉及到各種策略制定。