Spark SQL一步步分析Wifi探針商業大資料案例
該專案主要實現的主要功能:
一是通過探針裝置採集可監測範圍內的手機MAC地址、與探針距離、時間、地理位置等資訊:
二是探針採集的資料可以定時傳送到服務端儲存:
三是利用大資料技術對資料進行人流量等指標的分析。最終以合理的方式展示資料處理結果。
資料收集
資料收集由伺服器和探針裝置共同完成,探針採集資料併發送到伺服器,伺服器接收探針裝置的資料,處理成定格式儲存至分
布式檔案系統(HDFS)中,供資料處理使用。 下面介紹探針採集資料的原理。
術語介紹:
STA: (station) 工作站,指手機或者電腦等連線WiFi的裝置。
AP: (AcessPoint)接入點,指無線路由器等產生WiFi熱點的裝置。
SSID: ( Service Set dentifer)服務集標識,就是WiFi的名字。
在無線領域中STA總是不斷試圖尋找周邊存在的AP,所以我們可以利用這種特性來發現一個未連線 AP的STA,而對於一個已經
連線到AP的STA,也可以通過截狹它發出的資料幀來獲取MAC、與探針之間的距離和它當前連線的SSID等資訊。
資料清洗:
探針上傳的資料是一種半結構化資料,如:真實的資料很大
id:嗅探裝置ID
mmac:嗅探器裝置自身Wifi MAC
rate:傳送頻率
wssid:嗅探器裝置連線的WiFi的MAC地址
time:時間戳,採集這些MAC的時間
lat:緯度 lon:經度
addr:地址
mac:採集到的手機的MAC地址
rssi:手機的訊號強度
range:手機距離嗅探裝置的距離
ts:目標ssid,手機連線的WiFi的ssid
tmc:手機連線的WiFi的地址
tc:是否與路由器連線
ds:手機是否睡眠
essidn:曾今連線的WiFi的SSID
該資料屬於半結構化資料,其中包含探針裝置ID,裝置自身WFIMAC,傳送頻率,裝置連線的WFi的SID裝置連線的WFI的
MAC地址、時間戳,採集到這些MAC的時間、緯度、經度、地址資訊,以及一組被探測到的裝置資訊, 裝置資訊包括
手機的MAC、訊號強度、與探針之間的距離、手機連線WiFi的SID手機連線的WFI的MAC地址、手機曾經連線過的WiFi的
SSID需要在清洗過程中去除所有無用的資料,使之變成結構化的檔案,到這裡資料清洗的第一步就完成了。
第二步使用Spark SQL完成,在這一一步中完成時間點到時間段的轉化,即在處理之前每一條記錄表示一個終端在某 一時間
點的狀態,而在結果中一條記錄表示一 個終端在一段時間內的狀態。
經過資料清洗,不僅大大減小了資料集的容量,也為後續的資料處理提供了極大的方便。
客流資料分析
我們將得到以下指標:
客流量:店鋪或區域整體客流及趨勢。
入店量:進入店鋪或區域的客流及趨勢。
入店率:進入店鋪或區域的客流佔全部客流的比例及趨勢。來訪週期:進入店鋪或區域的顧客距離上次來店的間隔。
新老顧客:一定時間段內首次/兩次以上進入店鋪的顧客。
顧客活躍度:按顧客距離上次來訪間隔劃分為不同活躍度(高活躍度、中活躍度、低活躍度、沉睡活躍度) .
駐店時長:進店鋪的顧客在店內的停留時長,
跳出率:進店鋪後很快離店的顧客及佔比(佔總體客流) .
深訪率:進店鋪深度訪問的顧客及佔比(佔總體客流,可以根據定位軌跡或者停留時長判定).
資料匯出
系統分析結果直接儲存為文字檔案,儲存在HDFS中。
分析結果最終會被匯入關係型資料庫,供後續生成圖表使用,該展示系統使用PHP做後臺,前端使用HTML和JS生成圖表。
功能設計:
HDFS中是原始資料集(data表(主要欄位:tanzhen_id mac(不同的mac代表不同客戶)time range),通過Spark SQL
得到)----------->visit表(取出同一mac所有資料,按照time遍歷,得到每一個使用者的每一次訪問記錄),
visit表主要欄位(MAC start_time leave_time stay_time)
思路:data表首先抽取每一個使用者(MAC)的資料,對每個使用者資料進行遍歷,得到每個使用者每一次的訪問記錄。
通過visit表得到:客流量、入店率,來訪週期,新老顧客,顧客活躍度等等。
指標說明
(1)店鋪外人流走勢/客流量/入店量/入店率/離店量
店鋪外人流/客流量,在實時接收探針資料過程中根據range欄位(範圍)以及資料條數實時得到。
入店量/離店量是對visit表分別按start_time、 leave_time欄位從小到大遍歷統計規定時間段內的記錄條數。
(2)跳出率/深訪率/駐店時長
對visit 表按time欄位從小到大遍歷統計規定時間段內記錄條數stay_time小於三分鐘和大於20分鐘的記錄條數以及
stay_time均值。
(3)新老顧客數/順客活躍度
對visit表按time欄位排序,按一定定時間段遍歷,新顧客收等於該時間段結束時刻之前所有的順客數減去該時間段開始時刻
之前所有的顧客數,老顧客數等於該時間段內顧客數減去新顧客數。
資料庫結構
專案的資料處理過程,處理過程中的每個少 驛都至少依賴張表,本節將介紹專案用到的資料庫結構,包括表名和每張表包舍的
字設名,原始資料表是資料接收伺服器最終儲存到HDFS中的資料,中向結果表是經過第2次收據清洗後的輸出結果,資料表
說明如下。
原始資料data 表主要欄位:
tanzhen_id:探針裝置的id,
mac:使用者裝置的MAC.
time:探測到當前裝置的時間,
range:該裝置與探什之間的距離。
中間結果vistor表主要欄位1
mac:標識不同使用者.
start_time:使用者入店時間,
leave_time:使用者離店時間。
stay_time;使用者停留時間。
上傳資料到HDFS:
因為是用python等語言處理過的,所以傳的資料格式為:只提取有用的資料:
資料清洗程式碼:命令列可以用來除錯看資料結構
timeArray有四個time是因為時間格式劃分為四塊。
完整原始碼:
package com.victor.spark.WiFiData
/**
* Company: Huazhong University of science and technology
* 華中科技大學電氣學院聚變與等離子體研究所
* Version: V1.0
* Author: Victor
* Contact: [email protected] 2018--2020
* Software: IntelliJ IDEA
* File: wifi
* Time: 2018/11/11 17:12
* Desc:
**/
package com.victor.spark.WifiProject
import org.apache.spark.sql.SparkSession
import scala.util.control.Breaks
object new_customer_extract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("customer_extract")
.config("spark.some.config.option","some-values")
.getOrCreate()
import java.io._
val writer = new PrintWriter(new File("/spark/data/re.txt"))//for the storage of result
import spark.implicits._
//read the data
val df = spark.read.json("/spark/data/log.json")
//create the view data for df
df.createOrReplaceTempView("data")
spark.sql("cache table data")
//get all MAC of all users
val macArray = spark.sql("SELECT DISTINCT mac FROM data").collect()
var i =0
val inner = new Breaks
val lenth = macArray.length
//loop for each user
while(i<lenth){
var resultString = ""
var mac = macArray(i)(0)
val sql = "SELECT 'time' from data where mac='"+mac+"'order by 'time'"
val timeArray = spark.sql(sql).collect()
//to get timeList from timeArray
import scala.collection.mutable.ListBuffer
var timeList = new ListBuffer[Int]
var list_length = timeArray.length
var j = 0
while (j < list_length){
timeList += timeArray(i)(0).toString.toInt
j = j+1
}
var k = 0
var oldTime = 0
var newTime = 0
var maxVisitTimeInterval = 300
var startTime = 0
var leaveTime = 0
while (k < list_length){
if(k == 0){
oldTime = timeList(0)
newTime = timeList(0)
startTime = timeList(0)
}
else if(k == (list_length - 1)){
leaveTime = timeList(k)
var stayTime = leaveTime - startTime
resultString += """{"mac":"""" + mac + """,""" +""""in_time":"""+startTime+","+""""out_time":"""
+leaveTime+","+""""stay_Time":"""+stayTime+"}\n"
}else{
newTime = timeList(k)
if ((newTime - oldTime) > maxVisitTimeInterval){
leaveTime = oldTime
var stayTime = leaveTime-startTime
resultString += """{"mac":"""" + mac + """,""" +""""in_time":"""+startTime+","+""""out_time":"""
+leaveTime+","+""""stay_Time":"""+stayTime+"}\n"
startTime = newTime
oldTime = newTime
}else{
oldTime = newTime
}
}
k = k +1
}
writer.write(resultString)
i = i+1
}
writer.close()
spark.sql("uncache table data")
}
}
資料處理流程
在面我們完成了資料的初步處理,接下來我們將使用這些幾餘較小的資料計算以下指標:
客流量:店鋪或區域整體客流及趨勢。
入店量:進入店鋪或區域的客流及趨勢。
入店率:通俗一點講就是在單位時間內,從店鋪門口經過的客流量與進入店鋪內的客流量的比率。
來訪週期:進入店鋪或區域的顧客距離上次來店的問隔。
新老顧客:一定時間段內首次/兩次以上進入店鋪的顧客。
顧客活躍度:按顧客距離上次來訪間隔,劃分為不同活躍度(高活躍度、中活躍度、低活躍度、沉睡活躍度)。
駐店時長:進入店鋪的顧客在店內的停留時長。
跳出率:進入店鋪後很快離店的顧客及佔比(佔總體客流)。
深訪率:進入店鋪深度訪問的顧客及佔比(佔總體客流).
原始碼更新中。。。。。。。。。。