1. 程式人生 > >scala 例項——一

scala 例項——一

import scala.util.control.Breaks._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row

import
scala.util.control.Breaks object customer_extract { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("customer_extract") .config("spark.some.config.option", "some-value") .getOrCreate() import spark.implicits._ val df = spark.read.json("/spark_data/spark_sql_data.json"
) df.createOrReplaceTempView("data") // Global temporary view is tied to a system preserved database `global_temp` //獲取所有使用者Mac地址 val distinct_mac_DF = spark.sql("SELECT DISTINCT mac FROM data") val mac_array = distinct_mac_DF.collect() var i = 0 var result_string = "" val
outer = new Breaks val inner = new Breaks val every_visit = new Breaks //對每一個使用者(Mac)進行迴圈 while(i < mac_array.length) { outer.breakable { var mac = mac_array(i) var sql = "SELECT `time` from data where mac = '" + mac + "' order by `time` limit 1" var min_time = (spark.sql(sql).collect()) (0).asInstanceOf[Row].getInt(0) sql = "SELECT `time` from data where mac = '" + mac + "' order by `time` desc limit 1" var max_time = (spark.sql(sql).collect()) (0).asInstanceOf[Row].getInt(0) //第一層過濾,過濾掉 只檢測到一次的使用者 if (min_time == max_time) { outer.break } var old_num = 0 var new_num = 0 var start_time = 0 var leave_time = 0 import scala.collection.mutable.ArrayBuffer var result_array = new ArrayBuffer[Array[Int]] var m = 0 //結果集內條數 var next_start_time = 0 var now_time = min_time var flag_a = 1 //當此flag值為 1 時 ,標誌 每一個mac 首次訪問 var flag_b = 1 //當此flag值為 1 時,標誌 每一次訪問開始 /* 在最小時間和最大時間按照時間間隔從小到大迴圈*/ inner.breakable { while (now_time < (max_time + 300)) { every_visit.breakable { sql = "SELECT count(*) num from data where mac ='" + mac + "' and `time`>" + now_time + "" new_num = (spark.sql(sql).collect()) (0).toString.toInt if ((flag_a == 1) && (flag_b == 1)) { old_num = new_num; start_time = min_time; flag_a = 0; flag_b = 0; } else if ((flag_a == 0) && (flag_b == 0)) { if (old_num > new_num) { old_num = new_num; flag_a = 0; flag_b = 0; } else if (new_num == old_num) { leave_time = now_time - 60; //新增到結果集 result_array += Array(start_time, leave_time); var sql13 = "SELECT `time` next_time from data where mac ='" + mac + "' and `time`> " + leave_time + " order by `time` limit 1"; //SQL語句 if (spark.sql(sql13).collect()(0).asInstanceOf[Row].isNullAt(0) == true) { inner.break; } next_start_time = spark.sql(sql13).collect()(0).asInstanceOf[Row].getInt(0) now_time = next_start_time; m = m + 1; start_time = next_start_time; leave_time = 0 flag_a = 0; flag_b = 1; next_start_time = 0; every_visit.break; } } else if ((flag_a == 0) && (flag_b == 1)) { //print 4 ."<br>"; old_num = new_num; flag_a = 0; flag_b = 0; } now_time = now_time + 120; } } } //將 result_array 結果 轉換為Json格式 ,存入 result_string for(i<-result_array){ var vist_string = """{"mac":""""+mac +"""","""+""""in_time":"""+i(0)+","+""""out_time":"""+i(1)+"}\n" result_string = result_string + vist_string } } i = i + 1 } //將結果集 存入 檔案 import java.io._ val writer = new PrintWriter(new File("\\sparkdata\\visit_records.json" )) writer.write(result_string) writer.close() }