1. 程式人生 > >Spark日誌分析專案Demo(4)--RDD使用,使用者行為統計分析

Spark日誌分析專案Demo(4)--RDD使用,使用者行為統計分析

先說說需求,日誌挖掘
(1)隨機抽取100個session,統計時長(session時間),步長(session訪問頁面個數)
(2)統計top10熱門品類
(3)統計top10熱門品類上的top10使用者
下面介紹通過日誌分析使用者行為流程
(1)某個J2EE專案在接收使用者建立任務的請求之後,會將任務資訊插入MySQL的task表中,任務引數以JSON格式封裝在task_param 欄位中。這是專案前提,不是本專案的內容。
接著J2EE平臺會執行我們的spark-submit shell指令碼,並將taskid作為引數傳遞給spark-submit shell指令碼. spark-submit shell指令碼在執行時,是可以接收引數的,並且會將接收的引數,傳遞給Spark作業的main函式 引數就封裝在main函式的args陣列中
使用者可能指定的條件如下:
* 1、時間範圍:起始日期~結束日期
* 2、性別:男或女
* 3、年齡範圍
* 4、職業:多選
* 5、城市:多選
* 6、搜尋詞:多個搜尋詞,只要某個session中的任何一個action搜尋過指定的關鍵詞,那麼session就符合條件
* 7、點選品類:多個品類,只要某個session中的任何一個action點選過某個品類,那麼session就符合條件
根據main函式獲得的引數,程式從資料庫裡獲得查詢引數。
(2)首先要從user_visit_action記憶體臨時表中,查詢出來指定日期範圍內的行為資料。
(3)上面查詢出的資料進行map操作,以sessionid為key
(4)將上面的資料進行session粒度的資料聚合,session粒度的資料 與使用者資訊資料進行join,就可以獲取到session粒度的資料+session對應的user的資訊
(5)按照使用者在j2ee 平臺指定的篩選引數進行資料過濾,生成公共的RDD:就是通過篩選條件的session的訪問明細資料。在過濾的時候,對每個session的訪問時長(訪問開始到結束時間)和訪問步長(訪問頁面的次數),進行計算。
(6)抽取100個session,算出平均每天多少session抽取。統計每個小時佔當天session的比例。根據前兩個數值確定隨機抽取的session。根據隨機抽取的session,統計訪問時長和訪問步長的情況,寫入資料庫
(7)在(5)的結果上獲取top10熱門品類
(8)在(7)的結果上獲取top10熱門品類上,每個品類的Top10活躍使用者

下面結合程式碼說說每一步的實現:
(1)

// 建立需要使用的DAO元件
        ITaskDAO taskDAO = DAOFactory.getTaskDAO();

        // 首先得查詢出來指定的任務,並獲取任務的查詢引數
        long taskid = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_TASKID_SESSION);
        Task task = taskDAO.findById(taskid);
        if(task == null) {
            System.out.println(new
Date() + ": cannot find this task with id [" + taskid + "]."); return; } JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());

上面的ITaskDAO,Task,ParamUtils都是專案自己實現的工具類,目的是為了查詢引數。

(2)通過SQL過濾資料

JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext, taskParam);
/**
     * 獲取指定日期範圍內的使用者行為資料RDD
     * @param sqlContext
     * @param taskParam
     * @return
     */
    public static JavaRDD<Row> getActionRDDByDateRange(
            SQLContext sqlContext, JSONObject taskParam) {
        String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);

        String sql = 
                "select * "
                + "from user_visit_action "
                + "where date>='" + startDate + "' "
                + "and date<='" + endDate + "'";  

        DataFrame actionDF = sqlContext.sql(sql);

        return actionDF.javaRDD();
    }

(3)下面實現map操作,以sessionid為key

JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
/**
     * 獲取sessionid2到訪問行為資料的對映的RDD
     * @param actionRDD 
     * @return
*/
public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {

        return actionRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<Tuple2<String, Row>> call(Iterator<Row> iterator)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

                while(iterator.hasNext()) {
                    Row row = iterator.next();
                    list.add(new Tuple2<String, Row>(row.getString(2), row));  
                }

                return list;
            }

        });
    }

(4)把相同sessionId key的做內容聚合後,與使用者資訊資料進行join。

    JavaPairRDD<String, String> sessionid2AggrInfoRDD = 
                aggregateBySession(sc, sqlContext, sessionid2actionRDD);
/**
     * 對行為資料按session粒度進行聚合
     * @param actionRDD 行為資料RDD
     * @return session粒度聚合資料
    */
    private static JavaPairRDD<String, String> aggregateBySession(
            JavaSparkContext sc,
            SQLContext sqlContext, 
            JavaPairRDD<String, Row> sessinoid2actionRDD) {
        // 對行為資料按session粒度進行分組
        JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = 
                sessinoid2actionRDD.groupByKey();

        // 對每一個session分組進行聚合,將session中所有的搜尋詞和點選品類都聚合起來
        // 到此為止,獲取的資料格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
        JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(

                new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
                            throws Exception {
                        String sessionid = tuple._1;
                        Iterator<Row> iterator = tuple._2.iterator();

                        StringBuffer searchKeywordsBuffer = new StringBuffer("");
                        StringBuffer clickCategoryIdsBuffer = new StringBuffer("");

                        Long userid = null;

                        // session的起始和結束時間
                        Date startTime = null;
                        Date endTime = null;
                        // session的訪問步長
                        int stepLength = 0;

                        // 遍歷session所有的訪問行為
                        while(iterator.hasNext()) {
                            // 提取每個訪問行為的搜尋詞欄位和點選品類欄位
                            Row row = iterator.next();
                            if(userid == null) {
                                userid = row.getLong(1);
                            }
                            String searchKeyword = row.getString(5);
                            Long clickCategoryId = row.getLong(6);

                            // 實際上這裡要對資料說明一下
                            // 並不是每一行訪問行為都有searchKeyword何clickCategoryId兩個欄位的
                            // 其實,只有搜尋行為,是有searchKeyword欄位的
                            // 只有點選品類的行為,是有clickCategoryId欄位的
                            // 所以,任何一行行為資料,都不可能兩個欄位都有,所以資料是可能出現null值的

                            // 我們決定是否將搜尋詞或點選品類id拼接到字串中去
                            // 首先要滿足:不能是null值
                            // 其次,之前的字串中還沒有搜尋詞或者點選品類id

                            if(StringUtils.isNotEmpty(searchKeyword)) {
                                if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
                                    searchKeywordsBuffer.append(searchKeyword + ",");  
                                }
                            }
                            if(clickCategoryId != null) {
                                if(!clickCategoryIdsBuffer.toString().contains(
                                        String.valueOf(clickCategoryId))) {   
                                    clickCategoryIdsBuffer.append(clickCategoryId + ",");  
                                }
                            }

                            // 計算session開始和結束時間
                            Date actionTime = DateUtils.parseTime(row.getString(4));

                            if(startTime == null) {
                                startTime = actionTime;
                            }
                            if(endTime == null) {
                                endTime = actionTime;
                            }

                            if(actionTime.before(startTime)) {
                                startTime = actionTime;
                            }
                            if(actionTime.after(endTime)) {
                                endTime = actionTime;
                            }

                            // 計算session訪問步長
                            stepLength++;
                        }

                        String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
                        String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());

                        // 計算session訪問時長(秒)
                        long visitLength = (endTime.getTime() - startTime.getTime()) / 1000; 

                        // 大家思考一下
                        // 我們返回的資料格式,即使<sessionid,partAggrInfo>
                        // 但是,這一步聚合完了以後,其實,我們是還需要將每一行資料,跟對應的使用者資訊進行聚合
                        // 問題就來了,如果是跟使用者資訊進行聚合的話,那麼key,就不應該是sessionid
                        // 就應該是userid,才能夠跟<userid,Row>格式的使用者資訊進行聚合
                        // 如果我們這裡直接返回<sessionid,partAggrInfo>,還得再做一次mapToPair運算元
                        // 將RDD對映成<userid,partAggrInfo>的格式,那麼就多此一舉

                        // 所以,我們這裡其實可以直接,返回的資料格式,就是<userid,partAggrInfo>
                        // 然後跟使用者資訊join的時候,將partAggrInfo關聯上userInfo
                        // 然後再直接將返回的Tuple的key設定成sessionid
                        // 最後的資料格式,還是<sessionid,fullAggrInfo>

                        // 聚合資料,用什麼樣的格式進行拼接?
                        // 我們這裡統一定義,使用key=value|key=value
                        String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
                                + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
                                + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
                                + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
                                + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|"
                                + Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime);    

                        return new Tuple2<Long, String>(userid, partAggrInfo);
                    }

                });

        // 查詢所有使用者資料,並對映成<userid,Row>的格式
        String sql = "select * from user_info";  
        JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();

        JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(

                new PairFunction<Row, Long, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Row> call(Row row) throws Exception {
                        return new Tuple2<Long, Row>(row.getLong(0), row);
                    }

                });

        /**
         * 這裡就可以說一下,比較適合採用reduce join轉換為map join的方式
         * 
         * userid2PartAggrInfoRDD,可能資料量還是比較大,比如,可能在1千萬資料
         * userid2InfoRDD,可能資料量還是比較小的,你的使用者數量才10萬用戶
         * 
         */

        // 將session粒度聚合資料,與使用者資訊進行join
        JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = 
                userid2PartAggrInfoRDD.join(userid2InfoRDD);

        // 對join起來的資料進行拼接,並且返回<sessionid,fullAggrInfo>格式的資料
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });


        return sessionid2FullAggrInfoRDD;
    }

(5)同時進行過濾和統計

//下面這個Spark自定義累加器是用來統計的,可以通過add方法在累加器上進行累加操作
Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
                "", new SessionAggrStatAccumulator());

JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
                sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);

filteredSessionid2AggrInfoRDD = filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY());
/**
     * 過濾session資料,並進行聚合統計
     * @param sessionid2AggrInfoRDD
     * @return 
 */
private static JavaPairRDD<String, String> filterSessionAndAggrStat(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD, 
            final JSONObject taskParam,
            final Accumulator<String> sessionAggrStatAccumulator) {  
        // 為了使用我們後面的ValieUtils,所以,首先將所有的篩選引數拼接成一個連線串
        // 此外,這裡其實大家不要覺得是多此一舉
        // 其實我們是給後面的效能優化埋下了一個伏筆
        String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);
        String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
        String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
        String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
        String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
        String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
        String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);

        String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
                + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
                + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
                + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
                + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
                + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
                + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: "");

        if(_parameter.endsWith("\\|")) {
            _parameter = _parameter.substring(0, _parameter.length() - 1);
        }

        final String parameter = _parameter;

// 根據篩選引數進行過濾
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(

                new Function<Tuple2<String,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, String> tuple) throws Exception {
                        // 首先,從tuple中,獲取聚合資料
                        String aggrInfo = tuple._2;

                        // 接著,依次按照篩選條件進行過濾
                        // 按照年齡範圍進行過濾(startAge、endAge)
                        if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, 
                                parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
                            return false;
                        }

                        // 按照職業範圍進行過濾(professionals)
                        // 網際網路,IT,軟體
                        // 網際網路
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, 
                                parameter, Constants.PARAM_PROFESSIONALS)) {
                            return false;
                        }

                        // 按照城市範圍進行過濾(cities)
                        // 北京,上海,廣州,深圳
                        // 成都
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, 
                                parameter, Constants.PARAM_CITIES)) {
                            return false;
                        }

                        // 按照性別進行過濾
                        // 男/女
                        // 男,女
                        if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, 
                                parameter, Constants.PARAM_SEX)) {
                            return false;
                        }

                        // 按照搜尋詞進行過濾
                        // 我們的session可能搜尋了 火鍋,蛋糕,燒烤
                        // 我們的篩選條件可能是 火鍋,串串香,iphone手機
                        // 那麼,in這個校驗方法,主要判定session搜尋的詞中,有任何一個,與篩選條件中
                        // 任何一個搜尋詞相當,即通過
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, 
                                parameter, Constants.PARAM_KEYWORDS)) {
                            return false;
                        }

                        // 按照點選品類id進行過濾
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, 
                                parameter, Constants.PARAM_CATEGORY_IDS)) {
                            return false;
                        }

                        // 如果經過了之前的多個過濾條件之後,程式能夠走到這裡
                        // 那麼就說明,該session是通過了使用者指定的篩選條件的,也就是需要保留的session
                        // 那麼就要對session的訪問時長和訪問步長,進行統計,根據session對應的範圍
                        // 進行相應的累加計數

                        // 主要走到這一步,那麼就是需要計數的session
                        sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);  

                        // 計算出session的訪問時長和訪問步長的範圍,並進行相應的累加
                        long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH)); 
                        long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));  
                        calculateVisitLength(visitLength); 
                        calculateStepLength(stepLength);  

                        return true;
                    }

                    /**
                     * 計算訪問時長範圍
                     * @param visitLength
                     */
                    private void calculateVisitLength(long visitLength) {
                        if(visitLength >=1 && visitLength <= 3) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);  
                        } else if(visitLength >=4 && visitLength <= 6) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);  
                        } else if(visitLength >=7 && visitLength <= 9) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);  
                        } else if(visitLength >=10 && visitLength <= 30) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);  
                        } else if(visitLength > 30 && visitLength <= 60) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);  
                        } else if(visitLength > 60 && visitLength <= 180) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);  
                        } else if(visitLength > 180 && visitLength <= 600) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);  
                        } else if(visitLength > 600 && visitLength <= 1800) {  
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);  
                        } else if(visitLength > 1800) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);  
                        } 
                    }

/**
                     * 計算訪問步長範圍
                     * @param stepLength
*/
private void calculateStepLength(long stepLength) {
                        if(stepLength >= 1 && stepLength <= 3) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);  
                        } else if(stepLength >= 4 && stepLength <= 6) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);  
                        } else if(stepLength >= 7 && stepLength <= 9) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);  
                        } else if(stepLength >= 10 && stepLength <= 30) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);  
                        } else if(stepLength > 30 && stepLength <= 60) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);  
                        } else if(stepLength > 60) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);    
                        }
                    }

                });

        return filteredSessionid2AggrInfoRDD;
    }

(6)

//隨機抽取
randomExtractSession(sc, task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2detailRDD);

/**
         * 特別說明
         * 我們知道,要將上一個功能的session聚合統計資料獲取到,就必須是在一個action操作觸發job之後
         * 才能從Accumulator中獲取資料,否則是獲取不到資料的,因為沒有job執行,Accumulator的值為空
         * 所以,我們在這裡,將隨機抽取的功能的實現程式碼,放在session聚合統計功能的最終計算和寫庫之前
         * 因為隨機抽取功能中,有一個countByKey運算元,是action操作,會觸發job
*/

// 計算出各個範圍的session佔比,並寫入MySQL   
    calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(),task.getTaskid());   
    /**
     * 隨機抽取session
     * @param sessionid2AggrInfoRDD  
     */
    private static void randomExtractSession(
            JavaSparkContext sc,
            final long taskid,
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            JavaPairRDD<String, Row> sessionid2actionRDD) { 
        /**
         * 第一步,計算出每天每小時的session數量
         */

        // 獲取<yyyy-MM-dd_HH,aggrInfo>格式的RDD
        JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<String,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, String> tuple) throws Exception {
                        String aggrInfo = tuple._2;

                        String startTime = StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_START_TIME);
                        String dateHour = DateUtils.getDateHour(startTime);

                        return new Tuple2<String, String>(dateHour, aggrInfo);  
                    }

                });

        /**
         * 思考一下:這裡我們不要著急寫大量的程式碼,做專案的時候,一定要用腦子多思考
         * 
         * 每天每小時的session數量,然後計算出每天每小時的session抽取索引,遍歷每天每小時session
         * 首先抽取出的session的聚合資料,寫入session_random_extract表
         * 所以第一個RDD的value,應該是session聚合資料
         * 
         */

        // 得到每天每小時的session數量

        /**
         * 每天每小時的session數量的計算
         * 是有可能出現數據傾斜的吧,這個是沒有疑問的
         * 比如說大部分小時,一般訪問量也就10萬;但是,中午12點的時候,高峰期,一個小時1000萬
         * 這個時候,就會發生資料傾斜
         * 
         * 我們就用這個countByKey操作,給大家演示第三種和第四種方案
         * 
         */

        Map<String, Object> countMap = time2sessionidRDD.countByKey();

        /**
         * 第二步,使用按時間比例隨機抽取演算法,計算出每天每小時要抽取session的索引
         */

        // 將<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
        Map<String, Map<String, Long>> dateHourCountMap = 
                new HashMap<String, Map<String, Long>>();

        for(Map.Entry<String, Object> countEntry : countMap.entrySet()) {
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];  

            long count = Long.valueOf(String.valueOf(countEntry.getValue()));  

            Map<String, Long> hourCountMap = dateHourCountMap.get(date);
            if(hourCountMap == null) {
                hourCountMap = new HashMap<String, Long>();
                dateHourCountMap.put(date, hourCountMap);
            }

            hourCountMap.put(hour, count);
        }

        // 開始實現我們的按時間比例隨機抽取演算法

        // 總共要抽取100個session,先按照天數,進行平分
        int extractNumberPerDay = 100 / dateHourCountMap.size();

        // <date,<hour,(3,5,20,102)>>  

        /**
         * session隨機抽取功能
         * 
         * 用到了一個比較大的變數,隨機抽取索引map
         * 之前是直接在運算元裡面使用了這個map,那麼根據我們剛才講的這個原理,每個task都會拷貝一份map副本
         * 還是比較消耗記憶體和網路傳輸效能的
         * 
         * 將map做成廣播變數
         * 
         */
        Map<String, Map<String, List<Integer>>> dateHourExtractMap = 
                new HashMap<String, Map<String, List<Integer>>>();

        Random random = new Random();

        for(Map.Entry<String, Map<String, Long>> dateHourCountEntry : dateHourCountMap.entrySet()) {
            String date = dateHourCountEntry.getKey();
            Map<String, Long> hourCountMap = dateHourCountEntry.getValue();

            // 計算出這一天的session總數
            long sessionCount = 0L;
            for(long hourCount : hourCountMap.values()) {
                sessionCount += hourCount;
            }

            Map<String, List<Integer>> hourExtractMap = dateHourExtractMap.get(date);
            if(hourExtractMap == null) {
                hourExtractMap = new HashMap<String, List<Integer>>();
                dateHourExtractMap.put(date, hourExtractMap);
            }

            // 遍歷每個小時
            for(Map.Entry<String, Long> hourCountEntry : hourCountMap.entrySet()) {
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 計算每個小時的session數量,佔據當天總session數量的比例,直接乘以每天要抽取的數量
                // 就可以計算出,當前小時需要抽取的session數量
                int hourExtractNumber = (int)(((double)count / (double)sessionCount) 
                        * extractNumberPerDay);
                if(hourExtractNumber > count) {
                    hourExtractNumber = (int) count;
                }

                // 先獲取當前小時的存放隨機數的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if(extractIndexList == null) {
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour, extractIndexList);
                }

                // 生成上面計算出來的數量的隨機數
                for(int i = 0; i < hourExtractNumber; i++) {
                    int extractIndex = random.nextInt((int) count);
                    while(extractIndexList.contains(extractIndex)) {
                        extractIndex = random.nextInt((int) count);
                    }
                    extractIndexList.add(extractIndex);
                }
            }
        }

        /**
         * fastutil的使用,很簡單,比如List<Integer>的list,對應到fastutil,就是IntList
         */
        Map<String, Map<String, IntList>> fastutilDateHourExtractMap = 
                new HashMap<String, Map<String, IntList>>();



        for(Map.Entry<String, Map<String, List<Integer>>> dateHourExtractEntry : 
                dateHourExtractMap.entrySet()) {
            String date = dateHourExtractEntry.getKey();
            Map<String, List<Integer>> hourExtractMap = dateHourExtractEntry.getValue();

            Map<String, IntList> fastutilHourExtractMap = new HashMap<String, IntList>();

            for(Map.Entry<String, List<Integer>> hourExtractEntry : hourExtractMap.entrySet()) {
                String hour = hourExtractEntry.getKey();
                List<Integer> extractList = hourExtractEntry.getValue();

                IntList fastutilExtractList = new IntArrayList();

                for(int i = 0; i < extractList.size(); i++) {
                    fastutilExtractList.add(extractList.get(i));  
                }

                fastutilHourExtractMap.put(hour, fastutilExtractList);
            }

            fastutilDateHourExtractMap.put(date, fastutilHourExtractMap);
        }

        /**
         * 廣播變數,很簡單
         * 其實就是SparkContext的broadcast()方法,傳入你要廣播的變數,即可
         */     


        final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast = 
                sc.broadcast(fastutilDateHourExtractMap);

        /**
         * 第三步:遍歷每天每小時的session,然後根據隨機索引進行抽取
         */

        // 執行groupByKey運算元,得到<dateHour,(session aggrInfo)>  
        JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();

        // 我們用flatMap運算元,遍歷所有的<dateHour,(session aggrInfo)>格式的資料
        // 然後呢,會遍歷每天每小時的session
        // 如果發現某個session恰巧在我們指定的這天這小時的隨機抽取索引上
        // 那麼抽取該session,直接寫入MySQL的random_extract_session表
        // 將抽取出來的session id返回回來,形成一個新的JavaRDD<String>
        // 然後最後一步,是用抽取出來的sessionid,去join它們的訪問行為明細資料,寫入session表
        JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(

                new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<Tuple2<String, String>> call(
                            Tuple2<String, Iterable<String>> tuple)
                            throws Exception {
                        List<Tuple2<String, String>> extractSessionids = 
                                new ArrayList<Tuple2<String, String>>();

                        String dateHour = tuple._1;
                        String date = dateHour.split("_")[0];
                        String hour = dateHour.split("_")[1];
                        Iterator<String> iterator = tuple._2.iterator();

                        /**
                         * 使用廣播變數的時候
                         * 直接呼叫廣播變數(Broadcast型別)的value() / getValue() 
                         * 可以獲取到之前封裝的廣播變數
                         */
                        Map<String, Map<String, IntList>> dateHourExtractMap = 
                                dateHourExtractMapBroadcast.value();
                        List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);  

                        ISessionRandomExtractDAO sessionRandomExtractDAO = 
                                DAOFactory.getSessionRandomExtractDAO();

                        int index = 0;
                        while(iterator.hasNext()) {
                            String sessionAggrInfo = iterator.next();

                            if(extractIndexList.contains(index)) {
                                String sessionid = StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                                // 將資料寫入MySQL
                                SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
                                sessionRandomExtract.setTaskid(taskid);  
                                sessionRandomExtract.setSessionid(sessionid);  
                                sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_START_TIME));  
                                sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));
                                sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));

                                sessionRandomExtractDAO.insert(sessionRandomExtract);  

                                // 將sessionid加入list
                                extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));  
                            }

                            index++;
                        }

                        return extractSessionids;
                    }

                });

        /**
         * 第四步:獲取抽取出來的session的明細資料
         */
        JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
                extractSessionidsRDD.join(sessionid2actionRDD);
        extractSessionDetailRDD.foreachPartition(

                new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(
                            Iterator<Tuple2<String, Tuple2<String, Row>>> iterator) 
                            throws Exception {
                        List<SessionDetail> sessionDetails = new ArrayList<SessionDetail>();

                        while(iterator.hasNext()) {
                            Tuple2<String, Tuple2<String, Row>> tuple = iterator.next();

                            Row row = tuple._2._2;

                            SessionDetail sessionDetail = new SessionDetail();
                            sessionDetail.setTaskid(taskid);  
                            sessionDetail.setUserid(row.getLong(1));  
                            sessionDetail.setSessionid(row.getString(2));  
                            sessionDetail.setPageid(row.getLong(3));  
                            sessionDetail.setActionTime(row.getString(4));
                            sessionDetail.setSearchKeyword(row.getString(5));  
                            sessionDetail.setClickCategoryId(row.getLong(6));  
                            sessionDetail.setClickProductId(row.getLong(7));   
                            sessionDetail.setOrderCategoryIds(row.getString(8));  
                            sessionDetail.setOrderProductIds(row.getString(9));  
                            sessionDetail.setPayCategoryIds(row.getString(10)); 
                            sessionDetail.setPayProductIds(row.getString(11));  

                            sessionDetails.add(sessionDetail);
                        }

                        ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
                        sessionDetailDAO.insertBatch(sessionDetails);
                    }

                });
    }

    /**
     * 計算各session範圍佔比,並寫入MySQL
     * @param value
     */
    private static void calculateAndPersistAggrStat(String value, long taskid) {
        // 從Accumulator統計串中獲取值
        long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.SESSION_COUNT));  

        long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_1s_3s));  
        long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_4s_6s));
        long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_7s_9s));
        long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_10s_30s));
        long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_30s_60s));
        long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_1m_3m));
        long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_3m_10m));
        long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_10m_30m));
        long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_30m));

        long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_1_3));
        long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_4_6));
        long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_7_9));
        long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_10_30));
        long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_30_60));
        long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_60));

        // 計算各個訪問時長和訪問步長的範圍
        double visit_length_1s_3s_ratio = NumberUtils.formatDouble(
                (double)visit_length_1s_3s / (double)session_count, 2);  
        double visit_length_4s_6s_ratio = NumberUtils.formatDouble(
                (double)visit_length_4s_6s / (double)session_count, 2);  
        double visit_length_7s_9s_ratio = NumberUtils.formatDouble(
                (double)visit_length_7s_9s / (double)session_count, 2);  
        double visit_length_10s_30s_ratio = NumberUtils.formatDouble(
                (double)visit_length_10s_30s / (double)session_count, 2);  
        double visit_length_30s_60s_ratio = NumberUtils.formatDouble(
                (double)visit_length_30s_60s / (double)session_count, 2);  
        double visit_length_1m_3m_ratio = NumberUtils.formatDouble(
                (double)visit_length_1m_3m / (double)session_count, 2);
        double visit_length_3m_10m_ratio = NumberUtils.formatDouble(
                (double)visit_length_3m_10m / (double)session_count, 2);  
        double visit_length_10m_30m_ratio = NumberUtils.formatDouble(
                (double)visit_length_10m_30m / (double)session_count, 2);
        double visit_length_30m_ratio = NumberUtils.formatDouble(
                (double)visit_length_30m / (double)sessio