scala 例項——一
阿新 • • 發佈:2018-12-25
import scala.util.control.Breaks._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row
import scala.util.control.Breaks
object customer_extract {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("customer_extract")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("/spark_data/spark_sql_data.json" )
df.createOrReplaceTempView("data")
// Global temporary view is tied to a system preserved database `global_temp`
//獲取所有使用者Mac地址
val distinct_mac_DF = spark.sql("SELECT DISTINCT mac FROM data")
val mac_array = distinct_mac_DF.collect()
var i = 0
var result_string = ""
val outer = new Breaks
val inner = new Breaks
val every_visit = new Breaks
//對每一個使用者(Mac)進行迴圈
while(i < mac_array.length) {
outer.breakable {
var mac = mac_array(i)
var sql = "SELECT `time` from data where mac = '" + mac + "' order by `time` limit 1"
var min_time = (spark.sql(sql).collect()) (0).asInstanceOf[Row].getInt(0)
sql = "SELECT `time` from data where mac = '" + mac + "' order by `time` desc limit 1"
var max_time = (spark.sql(sql).collect()) (0).asInstanceOf[Row].getInt(0)
//第一層過濾,過濾掉 只檢測到一次的使用者
if (min_time == max_time) {
outer.break
}
var old_num = 0
var new_num = 0
var start_time = 0
var leave_time = 0
import scala.collection.mutable.ArrayBuffer
var result_array = new ArrayBuffer[Array[Int]]
var m = 0 //結果集內條數
var next_start_time = 0
var now_time = min_time
var flag_a = 1
//當此flag值為 1 時 ,標誌 每一個mac 首次訪問
var flag_b = 1 //當此flag值為 1 時,標誌 每一次訪問開始
/* 在最小時間和最大時間按照時間間隔從小到大迴圈*/
inner.breakable {
while (now_time < (max_time + 300)) {
every_visit.breakable {
sql = "SELECT count(*) num from data where mac ='" + mac + "' and `time`>" + now_time + ""
new_num = (spark.sql(sql).collect()) (0).toString.toInt
if ((flag_a == 1) && (flag_b == 1)) {
old_num = new_num;
start_time = min_time;
flag_a = 0;
flag_b = 0;
} else if ((flag_a == 0) && (flag_b == 0)) {
if (old_num > new_num) {
old_num = new_num;
flag_a = 0;
flag_b = 0;
} else if (new_num == old_num) {
leave_time = now_time - 60;
//新增到結果集
result_array += Array(start_time, leave_time);
var sql13 = "SELECT `time` next_time from data where mac ='" + mac + "' and `time`> " + leave_time + " order by `time` limit 1"; //SQL語句
if (spark.sql(sql13).collect()(0).asInstanceOf[Row].isNullAt(0) == true) {
inner.break;
}
next_start_time = spark.sql(sql13).collect()(0).asInstanceOf[Row].getInt(0)
now_time = next_start_time;
m = m + 1;
start_time = next_start_time;
leave_time = 0
flag_a = 0;
flag_b = 1;
next_start_time = 0;
every_visit.break;
}
} else if ((flag_a == 0) && (flag_b == 1)) {
//print 4 ."<br>";
old_num = new_num;
flag_a = 0;
flag_b = 0;
}
now_time = now_time + 120;
}
}
}
//將 result_array 結果 轉換為Json格式 ,存入 result_string
for(i<-result_array){
var vist_string = """{"mac":""""+mac +"""","""+""""in_time":"""+i(0)+","+""""out_time":"""+i(1)+"}\n"
result_string = result_string + vist_string
}
}
i = i + 1
}
//將結果集 存入 檔案
import java.io._
val writer = new PrintWriter(new File("\\sparkdata\\visit_records.json" ))
writer.write(result_string)
writer.close()
}