使用spark rdd計算手機在基站停留時間
阿新 • • 發佈:2018-12-25
lac_log.txt
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
user.log
18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0
通過使用者手機連線訊號塔(也稱基站)產生的日誌資訊,判斷使用者的家庭地址和工作地址,也就是求出使用者的手機在哪2個位置停留時間最長。現在有兩張表,
使用者產生的訪問日誌表(user.log),格式如下:
手機號 | 進入基站的時間 | 基站id | 事件型別(1代表進入基站,2代表出基站) |
---|---|---|---|
18611132889 | 20160327075000 | 9F36407EAD0629FC166F14DDE7970F68 | 1 |
基站資訊表(lac_info.txt),格式如下:
基站id | 基站的經度 | 基站的經度 |
---|---|---|
9F36407EAD0629FC166F14DDE7970F68 | 116.304864 | 40.050645 |
請使用spark rdd 實現此需求,執行結果如下:
手機號:位置經緯度1 停留時長 位置經緯度2 停留時長
18611132889 :(116.296302,40.032296) 97500|(116.304864,40.050645) 54000|
18688888888 :(116.296302,40.032296) 87600|(116.304864,40.050645) 51200|
實現程式碼如下
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("month demo").setMaster("local[*]") val sc = new SparkContext(conf) val rdd_Info = sc.textFile("d://user.log").map(line => { val fields = line.split(",") //事件型別 val eventType = fields(3) //時間戳 val time = fields(1) val timeLong = if (eventType == "1") -time.toLong else time.toLong //(手機號,基站ID資訊,帶符號的時間戳) ((fields(0), fields(2)), timeLong) }) val rdd_lacInfo = rdd_Info.reduceByKey(_ + _).map(t => { val mobile = t._1._1 //手機號 val lac = t._1._2 //id val time = t._2 //時間 (lac, (mobile, time)) }) val rdd_coordinate = sc.textFile("d://lac_info.txt").map(line => { val f = line.split(",") //(基站ID, (經度, 緯度)) (f(0), (f(1), f(2))) }) val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t => { val lac = t._1 val mobile = t._2._1._1 // println(mobile) val time = t._2._1._2 val x = t._2._2._1 val y = t._2._2._2 (mobile, lac, time, x, y) }) //按照手機號進行分組 val rdd_mobile = rdd_all.groupBy(_._1) //取出停留時間最長的前兩個基站 val rdd_topTwo: RDD[(String, List[(String, String, Long, String, String)])] = rdd_mobile.mapValues(it => { it.toList.sortBy(_._3).reverse.take(2) }) val re = rdd_topTwo.map(t => { val long1 = t._2(0)._4 val lac1 = t._2(0)._5 val timelong1 = t._2(0)._3 val timelong2 = t._2(1)._3 (t._1, (long1, lac1, timelong1), (long1, lac1, timelong2)) }) re.foreach(println) sc.stop() } } //輸出 (18611132889,(116.296302,40.032296,97500),(116.296302,40.032296,54000)) (18688888888,(116.296302,40.032296,87600),(116.296302,40.032296,51200))