Spark日誌分析專案Demo(4)--RDD使用,使用者行為統計分析
先說說需求,日誌挖掘
(1)隨機抽取100個session,統計時長(session時間),步長(session訪問頁面個數)
(2)統計top10熱門品類
(3)統計top10熱門品類上的top10使用者
下面介紹通過日誌分析使用者行為流程
(1)某個J2EE專案在接收使用者建立任務的請求之後,會將任務資訊插入MySQL的task表中,任務引數以JSON格式封裝在task_param 欄位中。這是專案前提,不是本專案的內容。
接著J2EE平臺會執行我們的spark-submit shell指令碼,並將taskid作為引數傳遞給spark-submit shell指令碼. spark-submit shell指令碼在執行時,是可以接收引數的,並且會將接收的引數,傳遞給Spark作業的main函式 引數就封裝在main函式的args陣列中
使用者可能指定的條件如下:
* 1、時間範圍:起始日期~結束日期
* 2、性別:男或女
* 3、年齡範圍
* 4、職業:多選
* 5、城市:多選
* 6、搜尋詞:多個搜尋詞,只要某個session中的任何一個action搜尋過指定的關鍵詞,那麼session就符合條件
* 7、點選品類:多個品類,只要某個session中的任何一個action點選過某個品類,那麼session就符合條件
根據main函式獲得的引數,程式從資料庫裡獲得查詢引數。
(2)首先要從user_visit_action記憶體臨時表中,查詢出來指定日期範圍內的行為資料。
(3)上面查詢出的資料進行map操作,以sessionid為key
(4)將上面的資料進行session粒度的資料聚合,session粒度的資料 與使用者資訊資料進行join,就可以獲取到session粒度的資料+session對應的user的資訊
(5)按照使用者在j2ee 平臺指定的篩選引數進行資料過濾,生成公共的RDD:就是通過篩選條件的session的訪問明細資料。在過濾的時候,對每個session的訪問時長(訪問開始到結束時間)和訪問步長(訪問頁面的次數),進行計算。
(6)抽取100個session,算出平均每天多少session抽取。統計每個小時佔當天session的比例。根據前兩個數值確定隨機抽取的session。根據隨機抽取的session,統計訪問時長和訪問步長的情況,寫入資料庫
(7)在(5)的結果上獲取top10熱門品類
(8)在(7)的結果上獲取top10熱門品類上,每個品類的Top10活躍使用者
下面結合程式碼說說每一步的實現:
(1)
// 建立需要使用的DAO元件
ITaskDAO taskDAO = DAOFactory.getTaskDAO();
// 首先得查詢出來指定的任務,並獲取任務的查詢引數
long taskid = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_TASKID_SESSION);
Task task = taskDAO.findById(taskid);
if(task == null) {
System.out.println(new Date() + ": cannot find this task with id [" + taskid + "].");
return;
}
JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
上面的ITaskDAO,Task,ParamUtils都是專案自己實現的工具類,目的是為了查詢引數。
(2)通過SQL過濾資料
JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext, taskParam);
/**
* 獲取指定日期範圍內的使用者行為資料RDD
* @param sqlContext
* @param taskParam
* @return
*/
public static JavaRDD<Row> getActionRDDByDateRange(
SQLContext sqlContext, JSONObject taskParam) {
String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
String sql =
"select * "
+ "from user_visit_action "
+ "where date>='" + startDate + "' "
+ "and date<='" + endDate + "'";
DataFrame actionDF = sqlContext.sql(sql);
return actionDF.javaRDD();
}
(3)下面實現map操作,以sessionid為key
JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
/**
* 獲取sessionid2到訪問行為資料的對映的RDD
* @param actionRDD
* @return
*/
public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {
return actionRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Iterator<Row> iterator)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
while(iterator.hasNext()) {
Row row = iterator.next();
list.add(new Tuple2<String, Row>(row.getString(2), row));
}
return list;
}
});
}
(4)把相同sessionId key的做內容聚合後,與使用者資訊資料進行join。
JavaPairRDD<String, String> sessionid2AggrInfoRDD =
aggregateBySession(sc, sqlContext, sessionid2actionRDD);
/**
* 對行為資料按session粒度進行聚合
* @param actionRDD 行為資料RDD
* @return session粒度聚合資料
*/
private static JavaPairRDD<String, String> aggregateBySession(
JavaSparkContext sc,
SQLContext sqlContext,
JavaPairRDD<String, Row> sessinoid2actionRDD) {
// 對行為資料按session粒度進行分組
JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
sessinoid2actionRDD.groupByKey();
// 對每一個session分組進行聚合,將session中所有的搜尋詞和點選品類都聚合起來
// 到此為止,獲取的資料格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
throws Exception {
String sessionid = tuple._1;
Iterator<Row> iterator = tuple._2.iterator();
StringBuffer searchKeywordsBuffer = new StringBuffer("");
StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
Long userid = null;
// session的起始和結束時間
Date startTime = null;
Date endTime = null;
// session的訪問步長
int stepLength = 0;
// 遍歷session所有的訪問行為
while(iterator.hasNext()) {
// 提取每個訪問行為的搜尋詞欄位和點選品類欄位
Row row = iterator.next();
if(userid == null) {
userid = row.getLong(1);
}
String searchKeyword = row.getString(5);
Long clickCategoryId = row.getLong(6);
// 實際上這裡要對資料說明一下
// 並不是每一行訪問行為都有searchKeyword何clickCategoryId兩個欄位的
// 其實,只有搜尋行為,是有searchKeyword欄位的
// 只有點選品類的行為,是有clickCategoryId欄位的
// 所以,任何一行行為資料,都不可能兩個欄位都有,所以資料是可能出現null值的
// 我們決定是否將搜尋詞或點選品類id拼接到字串中去
// 首先要滿足:不能是null值
// 其次,之前的字串中還沒有搜尋詞或者點選品類id
if(StringUtils.isNotEmpty(searchKeyword)) {
if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
searchKeywordsBuffer.append(searchKeyword + ",");
}
}
if(clickCategoryId != null) {
if(!clickCategoryIdsBuffer.toString().contains(
String.valueOf(clickCategoryId))) {
clickCategoryIdsBuffer.append(clickCategoryId + ",");
}
}
// 計算session開始和結束時間
Date actionTime = DateUtils.parseTime(row.getString(4));
if(startTime == null) {
startTime = actionTime;
}
if(endTime == null) {
endTime = actionTime;
}
if(actionTime.before(startTime)) {
startTime = actionTime;
}
if(actionTime.after(endTime)) {
endTime = actionTime;
}
// 計算session訪問步長
stepLength++;
}
String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
// 計算session訪問時長(秒)
long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;
// 大家思考一下
// 我們返回的資料格式,即使<sessionid,partAggrInfo>
// 但是,這一步聚合完了以後,其實,我們是還需要將每一行資料,跟對應的使用者資訊進行聚合
// 問題就來了,如果是跟使用者資訊進行聚合的話,那麼key,就不應該是sessionid
// 就應該是userid,才能夠跟<userid,Row>格式的使用者資訊進行聚合
// 如果我們這裡直接返回<sessionid,partAggrInfo>,還得再做一次mapToPair運算元
// 將RDD對映成<userid,partAggrInfo>的格式,那麼就多此一舉
// 所以,我們這裡其實可以直接,返回的資料格式,就是<userid,partAggrInfo>
// 然後跟使用者資訊join的時候,將partAggrInfo關聯上userInfo
// 然後再直接將返回的Tuple的key設定成sessionid
// 最後的資料格式,還是<sessionid,fullAggrInfo>
// 聚合資料,用什麼樣的格式進行拼接?
// 我們這裡統一定義,使用key=value|key=value
String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
+ Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
+ Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
+ Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
+ Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|"
+ Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime);
return new Tuple2<Long, String>(userid, partAggrInfo);
}
});
// 查詢所有使用者資料,並對映成<userid,Row>的格式
String sql = "select * from user_info";
JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
new PairFunction<Row, Long, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Row> call(Row row) throws Exception {
return new Tuple2<Long, Row>(row.getLong(0), row);
}
});
/**
* 這裡就可以說一下,比較適合採用reduce join轉換為map join的方式
*
* userid2PartAggrInfoRDD,可能資料量還是比較大,比如,可能在1千萬資料
* userid2InfoRDD,可能資料量還是比較小的,你的使用者數量才10萬用戶
*
*/
// 將session粒度聚合資料,與使用者資訊進行join
JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
userid2PartAggrInfoRDD.join(userid2InfoRDD);
// 對join起來的資料進行拼接,並且返回<sessionid,fullAggrInfo>格式的資料
JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(
Tuple2<Long, Tuple2<String, Row>> tuple)
throws Exception {
String partAggrInfo = tuple._2._1;
Row userInfoRow = tuple._2._2;
String sessionid = StringUtils.getFieldFromConcatString(
partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
int age = userInfoRow.getInt(3);
String professional = userInfoRow.getString(4);
String city = userInfoRow.getString(5);
String sex = userInfoRow.getString(6);
String fullAggrInfo = partAggrInfo + "|"
+ Constants.FIELD_AGE + "=" + age + "|"
+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
+ Constants.FIELD_CITY + "=" + city + "|"
+ Constants.FIELD_SEX + "=" + sex;
return new Tuple2<String, String>(sessionid, fullAggrInfo);
}
});
return sessionid2FullAggrInfoRDD;
}
(5)同時進行過濾和統計
//下面這個Spark自定義累加器是用來統計的,可以通過add方法在累加器上進行累加操作
Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
"", new SessionAggrStatAccumulator());
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);
filteredSessionid2AggrInfoRDD = filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY());
/**
* 過濾session資料,並進行聚合統計
* @param sessionid2AggrInfoRDD
* @return
*/
private static JavaPairRDD<String, String> filterSessionAndAggrStat(
JavaPairRDD<String, String> sessionid2AggrInfoRDD,
final JSONObject taskParam,
final Accumulator<String> sessionAggrStatAccumulator) {
// 為了使用我們後面的ValieUtils,所以,首先將所有的篩選引數拼接成一個連線串
// 此外,這裡其實大家不要覺得是多此一舉
// 其實我們是給後面的效能優化埋下了一個伏筆
String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);
String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);
String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
+ (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
+ (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
+ (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
+ (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
+ (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
+ (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: "");
if(_parameter.endsWith("\\|")) {
_parameter = _parameter.substring(0, _parameter.length() - 1);
}
final String parameter = _parameter;
// 根據篩選引數進行過濾
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
new Function<Tuple2<String,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, String> tuple) throws Exception {
// 首先,從tuple中,獲取聚合資料
String aggrInfo = tuple._2;
// 接著,依次按照篩選條件進行過濾
// 按照年齡範圍進行過濾(startAge、endAge)
if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
return false;
}
// 按照職業範圍進行過濾(professionals)
// 網際網路,IT,軟體
// 網際網路
if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
parameter, Constants.PARAM_PROFESSIONALS)) {
return false;
}
// 按照城市範圍進行過濾(cities)
// 北京,上海,廣州,深圳
// 成都
if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
parameter, Constants.PARAM_CITIES)) {
return false;
}
// 按照性別進行過濾
// 男/女
// 男,女
if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
parameter, Constants.PARAM_SEX)) {
return false;
}
// 按照搜尋詞進行過濾
// 我們的session可能搜尋了 火鍋,蛋糕,燒烤
// 我們的篩選條件可能是 火鍋,串串香,iphone手機
// 那麼,in這個校驗方法,主要判定session搜尋的詞中,有任何一個,與篩選條件中
// 任何一個搜尋詞相當,即通過
if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
parameter, Constants.PARAM_KEYWORDS)) {
return false;
}
// 按照點選品類id進行過濾
if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
parameter, Constants.PARAM_CATEGORY_IDS)) {
return false;
}
// 如果經過了之前的多個過濾條件之後,程式能夠走到這裡
// 那麼就說明,該session是通過了使用者指定的篩選條件的,也就是需要保留的session
// 那麼就要對session的訪問時長和訪問步長,進行統計,根據session對應的範圍
// 進行相應的累加計數
// 主要走到這一步,那麼就是需要計數的session
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
// 計算出session的訪問時長和訪問步長的範圍,並進行相應的累加
long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(
aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH));
long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(
aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));
calculateVisitLength(visitLength);
calculateStepLength(stepLength);
return true;
}
/**
* 計算訪問時長範圍
* @param visitLength
*/
private void calculateVisitLength(long visitLength) {
if(visitLength >=1 && visitLength <= 3) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
} else if(visitLength >=4 && visitLength <= 6) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
} else if(visitLength >=7 && visitLength <= 9) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
} else if(visitLength >=10 && visitLength <= 30) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
} else if(visitLength > 30 && visitLength <= 60) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
} else if(visitLength > 60 && visitLength <= 180) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
} else if(visitLength > 180 && visitLength <= 600) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
} else if(visitLength > 600 && visitLength <= 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
} else if(visitLength > 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
}
}
/**
* 計算訪問步長範圍
* @param stepLength
*/
private void calculateStepLength(long stepLength) {
if(stepLength >= 1 && stepLength <= 3) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
} else if(stepLength >= 4 && stepLength <= 6) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
} else if(stepLength >= 7 && stepLength <= 9) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
} else if(stepLength >= 10 && stepLength <= 30) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
} else if(stepLength > 30 && stepLength <= 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
} else if(stepLength > 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
}
});
return filteredSessionid2AggrInfoRDD;
}
(6)
//隨機抽取
randomExtractSession(sc, task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2detailRDD);
/**
* 特別說明
* 我們知道,要將上一個功能的session聚合統計資料獲取到,就必須是在一個action操作觸發job之後
* 才能從Accumulator中獲取資料,否則是獲取不到資料的,因為沒有job執行,Accumulator的值為空
* 所以,我們在這裡,將隨機抽取的功能的實現程式碼,放在session聚合統計功能的最終計算和寫庫之前
* 因為隨機抽取功能中,有一個countByKey運算元,是action操作,會觸發job
*/
// 計算出各個範圍的session佔比,並寫入MySQL
calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(),task.getTaskid());
/**
* 隨機抽取session
* @param sessionid2AggrInfoRDD
*/
private static void randomExtractSession(
JavaSparkContext sc,
final long taskid,
JavaPairRDD<String, String> sessionid2AggrInfoRDD,
JavaPairRDD<String, Row> sessionid2actionRDD) {
/**
* 第一步,計算出每天每小時的session數量
*/
// 獲取<yyyy-MM-dd_HH,aggrInfo>格式的RDD
JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(
new PairFunction<Tuple2<String,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(
Tuple2<String, String> tuple) throws Exception {
String aggrInfo = tuple._2;
String startTime = StringUtils.getFieldFromConcatString(
aggrInfo, "\\|", Constants.FIELD_START_TIME);
String dateHour = DateUtils.getDateHour(startTime);
return new Tuple2<String, String>(dateHour, aggrInfo);
}
});
/**
* 思考一下:這裡我們不要著急寫大量的程式碼,做專案的時候,一定要用腦子多思考
*
* 每天每小時的session數量,然後計算出每天每小時的session抽取索引,遍歷每天每小時session
* 首先抽取出的session的聚合資料,寫入session_random_extract表
* 所以第一個RDD的value,應該是session聚合資料
*
*/
// 得到每天每小時的session數量
/**
* 每天每小時的session數量的計算
* 是有可能出現數據傾斜的吧,這個是沒有疑問的
* 比如說大部分小時,一般訪問量也就10萬;但是,中午12點的時候,高峰期,一個小時1000萬
* 這個時候,就會發生資料傾斜
*
* 我們就用這個countByKey操作,給大家演示第三種和第四種方案
*
*/
Map<String, Object> countMap = time2sessionidRDD.countByKey();
/**
* 第二步,使用按時間比例隨機抽取演算法,計算出每天每小時要抽取session的索引
*/
// 將<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
Map<String, Map<String, Long>> dateHourCountMap =
new HashMap<String, Map<String, Long>>();
for(Map.Entry<String, Object> countEntry : countMap.entrySet()) {
String dateHour = countEntry.getKey();
String date = dateHour.split("_")[0];
String hour = dateHour.split("_")[1];
long count = Long.valueOf(String.valueOf(countEntry.getValue()));
Map<String, Long> hourCountMap = dateHourCountMap.get(date);
if(hourCountMap == null) {
hourCountMap = new HashMap<String, Long>();
dateHourCountMap.put(date, hourCountMap);
}
hourCountMap.put(hour, count);
}
// 開始實現我們的按時間比例隨機抽取演算法
// 總共要抽取100個session,先按照天數,進行平分
int extractNumberPerDay = 100 / dateHourCountMap.size();
// <date,<hour,(3,5,20,102)>>
/**
* session隨機抽取功能
*
* 用到了一個比較大的變數,隨機抽取索引map
* 之前是直接在運算元裡面使用了這個map,那麼根據我們剛才講的這個原理,每個task都會拷貝一份map副本
* 還是比較消耗記憶體和網路傳輸效能的
*
* 將map做成廣播變數
*
*/
Map<String, Map<String, List<Integer>>> dateHourExtractMap =
new HashMap<String, Map<String, List<Integer>>>();
Random random = new Random();
for(Map.Entry<String, Map<String, Long>> dateHourCountEntry : dateHourCountMap.entrySet()) {
String date = dateHourCountEntry.getKey();
Map<String, Long> hourCountMap = dateHourCountEntry.getValue();
// 計算出這一天的session總數
long sessionCount = 0L;
for(long hourCount : hourCountMap.values()) {
sessionCount += hourCount;
}
Map<String, List<Integer>> hourExtractMap = dateHourExtractMap.get(date);
if(hourExtractMap == null) {
hourExtractMap = new HashMap<String, List<Integer>>();
dateHourExtractMap.put(date, hourExtractMap);
}
// 遍歷每個小時
for(Map.Entry<String, Long> hourCountEntry : hourCountMap.entrySet()) {
String hour = hourCountEntry.getKey();
long count = hourCountEntry.getValue();
// 計算每個小時的session數量,佔據當天總session數量的比例,直接乘以每天要抽取的數量
// 就可以計算出,當前小時需要抽取的session數量
int hourExtractNumber = (int)(((double)count / (double)sessionCount)
* extractNumberPerDay);
if(hourExtractNumber > count) {
hourExtractNumber = (int) count;
}
// 先獲取當前小時的存放隨機數的list
List<Integer> extractIndexList = hourExtractMap.get(hour);
if(extractIndexList == null) {
extractIndexList = new ArrayList<Integer>();
hourExtractMap.put(hour, extractIndexList);
}
// 生成上面計算出來的數量的隨機數
for(int i = 0; i < hourExtractNumber; i++) {
int extractIndex = random.nextInt((int) count);
while(extractIndexList.contains(extractIndex)) {
extractIndex = random.nextInt((int) count);
}
extractIndexList.add(extractIndex);
}
}
}
/**
* fastutil的使用,很簡單,比如List<Integer>的list,對應到fastutil,就是IntList
*/
Map<String, Map<String, IntList>> fastutilDateHourExtractMap =
new HashMap<String, Map<String, IntList>>();
for(Map.Entry<String, Map<String, List<Integer>>> dateHourExtractEntry :
dateHourExtractMap.entrySet()) {
String date = dateHourExtractEntry.getKey();
Map<String, List<Integer>> hourExtractMap = dateHourExtractEntry.getValue();
Map<String, IntList> fastutilHourExtractMap = new HashMap<String, IntList>();
for(Map.Entry<String, List<Integer>> hourExtractEntry : hourExtractMap.entrySet()) {
String hour = hourExtractEntry.getKey();
List<Integer> extractList = hourExtractEntry.getValue();
IntList fastutilExtractList = new IntArrayList();
for(int i = 0; i < extractList.size(); i++) {
fastutilExtractList.add(extractList.get(i));
}
fastutilHourExtractMap.put(hour, fastutilExtractList);
}
fastutilDateHourExtractMap.put(date, fastutilHourExtractMap);
}
/**
* 廣播變數,很簡單
* 其實就是SparkContext的broadcast()方法,傳入你要廣播的變數,即可
*/
final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast =
sc.broadcast(fastutilDateHourExtractMap);
/**
* 第三步:遍歷每天每小時的session,然後根據隨機索引進行抽取
*/
// 執行groupByKey運算元,得到<dateHour,(session aggrInfo)>
JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();
// 我們用flatMap運算元,遍歷所有的<dateHour,(session aggrInfo)>格式的資料
// 然後呢,會遍歷每天每小時的session
// 如果發現某個session恰巧在我們指定的這天這小時的隨機抽取索引上
// 那麼抽取該session,直接寫入MySQL的random_extract_session表
// 將抽取出來的session id返回回來,形成一個新的JavaRDD<String>
// 然後最後一步,是用抽取出來的sessionid,去join它們的訪問行為明細資料,寫入session表
JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, String>> call(
Tuple2<String, Iterable<String>> tuple)
throws Exception {
List<Tuple2<String, String>> extractSessionids =
new ArrayList<Tuple2<String, String>>();
String dateHour = tuple._1;
String date = dateHour.split("_")[0];
String hour = dateHour.split("_")[1];
Iterator<String> iterator = tuple._2.iterator();
/**
* 使用廣播變數的時候
* 直接呼叫廣播變數(Broadcast型別)的value() / getValue()
* 可以獲取到之前封裝的廣播變數
*/
Map<String, Map<String, IntList>> dateHourExtractMap =
dateHourExtractMapBroadcast.value();
List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);
ISessionRandomExtractDAO sessionRandomExtractDAO =
DAOFactory.getSessionRandomExtractDAO();
int index = 0;
while(iterator.hasNext()) {
String sessionAggrInfo = iterator.next();
if(extractIndexList.contains(index)) {
String sessionid = StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
// 將資料寫入MySQL
SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
sessionRandomExtract.setTaskid(taskid);
sessionRandomExtract.setSessionid(sessionid);
sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.FIELD_START_TIME));
sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));
sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));
sessionRandomExtractDAO.insert(sessionRandomExtract);
// 將sessionid加入list
extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));
}
index++;
}
return extractSessionids;
}
});
/**
* 第四步:獲取抽取出來的session的明細資料
*/
JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
extractSessionidsRDD.join(sessionid2actionRDD);
extractSessionDetailRDD.foreachPartition(
new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(
Iterator<Tuple2<String, Tuple2<String, Row>>> iterator)
throws Exception {
List<SessionDetail> sessionDetails = new ArrayList<SessionDetail>();
while(iterator.hasNext()) {
Tuple2<String, Tuple2<String, Row>> tuple = iterator.next();
Row row = tuple._2._2;
SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(1));
sessionDetail.setSessionid(row.getString(2));
sessionDetail.setPageid(row.getLong(3));
sessionDetail.setActionTime(row.getString(4));
sessionDetail.setSearchKeyword(row.getString(5));
sessionDetail.setClickCategoryId(row.getLong(6));
sessionDetail.setClickProductId(row.getLong(7));
sessionDetail.setOrderCategoryIds(row.getString(8));
sessionDetail.setOrderProductIds(row.getString(9));
sessionDetail.setPayCategoryIds(row.getString(10));
sessionDetail.setPayProductIds(row.getString(11));
sessionDetails.add(sessionDetail);
}
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
sessionDetailDAO.insertBatch(sessionDetails);
}
});
}
/**
* 計算各session範圍佔比,並寫入MySQL
* @param value
*/
private static void calculateAndPersistAggrStat(String value, long taskid) {
// 從Accumulator統計串中獲取值
long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.SESSION_COUNT));
long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_1s_3s));
long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_4s_6s));
long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_7s_9s));
long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_10s_30s));
long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_30s_60s));
long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_1m_3m));
long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_3m_10m));
long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_10m_30m));
long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.TIME_PERIOD_30m));
long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_1_3));
long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_4_6));
long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_7_9));
long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_10_30));
long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_30_60));
long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
value, "\\|", Constants.STEP_PERIOD_60));
// 計算各個訪問時長和訪問步長的範圍
double visit_length_1s_3s_ratio = NumberUtils.formatDouble(
(double)visit_length_1s_3s / (double)session_count, 2);
double visit_length_4s_6s_ratio = NumberUtils.formatDouble(
(double)visit_length_4s_6s / (double)session_count, 2);
double visit_length_7s_9s_ratio = NumberUtils.formatDouble(
(double)visit_length_7s_9s / (double)session_count, 2);
double visit_length_10s_30s_ratio = NumberUtils.formatDouble(
(double)visit_length_10s_30s / (double)session_count, 2);
double visit_length_30s_60s_ratio = NumberUtils.formatDouble(
(double)visit_length_30s_60s / (double)session_count, 2);
double visit_length_1m_3m_ratio = NumberUtils.formatDouble(
(double)visit_length_1m_3m / (double)session_count, 2);
double visit_length_3m_10m_ratio = NumberUtils.formatDouble(
(double)visit_length_3m_10m / (double)session_count, 2);
double visit_length_10m_30m_ratio = NumberUtils.formatDouble(
(double)visit_length_10m_30m / (double)session_count, 2);
double visit_length_30m_ratio = NumberUtils.formatDouble(
(double)visit_length_30m / (double)sessio