Spark專案練習(計算使用者停留時間最長的兩個小區)
阿新 • • 發佈:2018-12-25
專案說明:附件為要計算資料的demo。點選開啟連結
其中bs_log資料夾資料格式為(手機號,時間戳,基站ID,連線狀態(“1”為連線,“0”為斷開))
lac_info.txt 檔案資料格式為(基站ID,經度,緯度,訊號輻射型別)
程式思路:
1, 先根據"手機號,基站ID"構成一個元祖,做為唯一標識, 和時間戳構成新的資料結構->(手機號, 站點, 時間戳)
2、(手機號,基站ID)作為key,通過reduceByKey運算元進行聚合,計算出在基站的停留時間,構成新的資料結構,以便和座標資料進行join,->(基站ID,(手機號,停留時間))
3、將基站座標資料資訊通過map,構建成資料型別 ->(基站ID,(經度,緯度))
4、將2、3進行join操作,構成新的資料型別 ->(手機號,基站ID,停留時間,經度,緯度)
5、按手機號進行分組。->(手機號,(手機號,基站ID,停留時間,經度,緯度))
6、取出停留時間最長的兩個基站ID。
具體程式如下:
package cn.allengao.Location import org.apache.spark.{SparkConf, SparkContext} /** * class_name: * package: * describe: 基站資訊查詢 * creat_user: Allen Gao * creat_date: 2018/1/29 * creat_time: 10:03 **/ /* * 說明: * 1, 先根據"手機號,基站ID"構成一個元祖,做為唯一標識, 和時間戳構成新的資料結構->(手機號, 站點, 時間戳) * 2、(手機號,基站ID)作為key,通過reduceByKey運算元進行聚合,計算出在基站的停留時間,構成新的資料結構, * 以便和座標資料進行join,->(基站ID,(手機號,停留時間)) * 3、將基站座標資料資訊通過map,構建成資料型別 ->(基站ID,(經度,緯度)) * 4、將2、3進行join操作,構成新的資料型別 ->(手機號,基站ID,停留時間,經度,緯度) * 5、按手機號進行分組。->(手機號,(手機號,基站ID,停留時間,經度,緯度)) * 6、取出停留時間最長的兩個基站ID。 * */ object UserLocation { def main(args: Array[String]): Unit = { //建立Spark配置資訊 val conf = new SparkConf().setAppName("UserLocation").setMaster("local[*]") //建立Spark上下文,並將配置資訊匯入 val sc = new SparkContext(conf) /* 基站連線手機號,連線時間戳,基站站點ID資訊,“1”表示連線,“0”表示斷開連線。 18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 */ //從log檔案拿到資料,並按行採集。 //sc.textFile("c://information//bs_log").map(_.split(",")).map(x => (x(0), x(1), x(2), x(3))) val rdd_Info = sc.textFile("j://information//bs_log").map(line => { //通過“,”將資料進行切分field(0)手機號,field(1)時間戳,field(2)基站ID資訊,field(3)事件型別 val fields = line.split(",") //事件型別,“1”表示連線,“0”表示斷開。 val eventType = fields(3) val time = fields(1) //連線基站將時間戳至為“-”,斷開基站將時間戳至為“+”,以便後面進行計算。 val timeLong = if(eventType == "1") -time.toLong else time.toLong //構成一個數據型別(手機號,基站ID資訊,帶符號的時間戳) ((fields(0),fields(2)),timeLong) }) val rdd_lacInfo = rdd_Info.reduceByKey(_+_).map(t=>{ val mobile = t._1._1 val lac = t._1._2 val time = t._2 (lac, (mobile, time)) }) val rdd_coordinate = sc.textFile("j://information//lac_info.txt").map(line =>{ val f = line.split(",") //(基站ID, (經度, 緯度)) (f(0),(f(1), f(2))) }) //rdd1.join(rdd2)-->(CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935))) val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t =>{ val lac = t._1 val mobile = t._2._1._1 val time = t._2._1._2 val x = t._2._2._1 val y = t._2._2._2 (mobile, lac, time, x, y) }) //按照手機號進行分組 val rdd_mobile = rdd_all.groupBy(_._1) //取出停留時間最長的前兩個基站 val rdd_topTwo= rdd_mobile.mapValues(it =>{ it.toList.sortBy(_._3).reverse.take(2) }) // println(rdd_Info.collect().toBuffer) // println(rdd_lacInfo.collect().toBuffer) // println(rdd_coordinate.collect().toBuffer) // println(rdd_all.collect().toBuffer) // println(rdd_mobile.collect().toBuffer) // println(rdd_topTwo.collect().toBuffer) rdd_topTwo.saveAsTextFile("j://information//out") sc.stop() } }