1. 程式人生 > 其它 >線上教育 (實時實現)

線上教育 (實時實現)

1專案需求架構

1.1 專案需求分析

  一、資料採集平臺搭建

  二、Kafka中介軟體準備

  三、下游Spark Streaming對接Kafka接收資料,實現vip個數統計、頁面之間的跳轉率、做題正確率與掌握度、播放時長統計及歷史區間統計的實時計算功能。

1.2 專案框架

1.2.1 技術選型

  一、資料儲存:KafkaMySQL

  二、資料處理:Spark

  三、其他元件:Zookeeper

1.2.2 流程設計

2章 需求

2.1環境準備

  在本機三臺虛擬機器上分別搭建好zookeeperkafka,建立所需topic(注:CDH6.3.2kafka的版本為

2.2.1

kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic register_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic qz_log
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic page_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic course_learn

  如果使用--bootstrap-server hadoop102:9092建立,則消費者的offset儲存在kafka中,如果使用--zookeeper hadoop:2181建立,則消費者的offset儲存在zk中

2.2原始資料格式及kafka對應topic

2.2.1實時統計註冊人數- register.log

  kafka對應topicregister_topic

  資料格式:

欄位

欄位說明

1

使用者id

2

平臺id

1:PC

2:APP

3:Other

3

建立時間

  資料示例:

# 資料使用/t作為分隔符
7188    2    2019-07-16 16:01:55
7189    1    2019-07-16 16:01:55
7190    1    2019-07-16 16:01:55
7191    1    2019-07-16 16:01:55
7192    1    2019-07-16 16:01:55
7193    3    2019-07-16 16:01:55
7194    1    2019-07-16 16:01:55
7195    3    2019-07-16 16:01:55

2.2.2做題正確率數與知識點掌握度資料格式- qz.log

  kafka對應topicqz_log

  資料格式:

欄位

欄位說明

1

使用者id

2

課程id

3

知識點id

4

題目id

5

是否正確

0錯誤

1正確

6

建立時間

  資料示例:

# 資料使用/t作為分隔符
1006    504    8    7    0    2019-07-12 11:17:45
1007    505    16    9    1    2019-07-12 11:17:45
1002    505    29    3    0    2019-07-12 11:17:45
1006    504    10    5    0    2019-07-12 11:17:45
1001    502    28    8    0    2019-07-12 11:17:45
1006    505    27    0    1    2019-07-12 11:17:45
1004    503    25    3    0    2019-07-12 11:17:45
1007    504    12    1    0    2019-07-12 11:17:45
1006    501    7    6    0    2019-07-12 11:17:45

2.2.3商品頁面到訂單頁,訂單頁到支付頁資料格式- page.log

  kafka對應topicpage_topic

  資料格式:

序號

欄位

欄位說明

1

app_id

平臺id

1:PC

2:APP

3:Other

2

device_id

平臺id

3

distinct_id

唯一標識

4

ip

使用者ip地址

5

last_event_name

上一事件名稱

6

last_page_id

上一頁面id

7

next_event_name

下一事件名稱

8

next_page_id

下一頁面id

9

page_id

當前頁面id

1:商品課程頁

2:訂單頁面

3:支付頁面

10

server_time

伺服器時間

11

uid

使用者id

  資料示例:

# 資料為json格式
{"app_id":"2","device_id":"100","distinct_id":"23a6d4a7-6903-46a4-bce2-a8317693da45","event_name":"-","ip":"123.235.113.225","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"0"}
# json格式化之後
{
  "app_id": "2",
  "device_id": "100",
  "distinct_id": "23a6d4a7-6903-46a4-bce2-a8317693da45",
  "event_name": "-",
  "ip": "123.235.113.225",
  "last_event_name": "-",
  "last_page_id": "0",
  "next_event_name": "-",
  "next_page_id": "2",
  "page_id": "1",
  "server_time": "-",
  "uid": "0"
}

2.2.4實時統計學員播放視訊各時長- course_learn.log

  Kafka對應topiccourse_learn

  資料格式:

序號

欄位

欄位說明

1

biz

唯一標識

2

chapterid

章節id

3

cwareid

課件id

4

edutypeid

輔導id

5

pe

視訊播放結束區間

6

ps

視訊播放開始區間

7

sourceType

播放平臺

8

speed

播放倍速

9

subjectid

主題id

10

te

視訊播放結束時間(時間戳)

11

ts

視訊播放開始時間(時間戳)

12

uid

使用者id

13

videoid

視訊id

  資料示例:

# 資料為json格式
{"biz":"34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9","chapterid":"2","cwareid":"2","edutypeid":"1","pe":"56","ps":"24","sourceType":"PC","speed":"2","subjectid":"1","te":"1563352144131","ts":"1563352128131","uid":"219","videoid":"6"}
# json格式化之後
{
  "biz": "34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9",
  "chapterid": "2",
  "cwareid": "2",
  "edutypeid": "1",
  "pe": "56",
  "ps": "24",
  "sourceType": "PC",
  "speed": "2",
  "subjectid": "1",
  "te": "1563352144131",
  "ts": "1563352128131",
  "uid": "219",
  "videoid": "6"
}

2.3模擬資料採集

  將準備好的log檔案使用kafka生產者程式碼傳送資訊到對應的topiclog檔案均在資料包的.\2.資料\01日誌資料\04_實時-kafka主題資料中)

資料說明

日誌檔案

Kafka topic

程式碼檔案

註冊日誌資料

register.log

register_topic

做題資料

qz.log

qz_log

商品頁面資料

page.log

page_topic

視訊播放時長資料

course_learn.log

course_learn

  注:如果windows下沒有安裝hadoop環境,先windows配置環境變數。(程式碼執行時候會尋找環境變數中的HADOOP_HOME,然後找%HADOOP_HOME%/bin/winutils.exe,所以我們不需要下載全部的程式碼,只需要把bin包配置好,能讓系統找到%HADOOP_HOME%/bin/winutils.exe即可)

  該檔案為hadoop-3.0.0bin目錄壓縮包

2.4 ip解析工具測試

  ip解析本地庫檔案

  測試ip解析工具程式碼:

import org.lionsoul.ip2region.{DbConfig, DbSearcher}

object IpTest {
  def main(args: Array[String]): Unit = {
    val ipSearch = new DbSearcher(new DbConfig(), this.getClass.getResource("/ip2region.db").getPath)
    val region = ipSearch.binarySearch("182.250.250.42").getRegion
    println(region)
    val city = region.split("\\|")(2)
    println(city)
  }
}

  測試結果:

2.5實時統計註冊人員資訊

2.5.1 MySQL建表語句

CREATE TABLE `offset_manager` (
  `groupid` varchar(50) DEFAULT NULL,
  `topic` varchar(50) DEFAULT NULL,
  `partition` int(11) DEFAULT NULL,
  `untiloffset` mediumtext,
  UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.5.2表結構說明

表名:

offset_manager

主鍵:

欄位名

欄位說明

1

groupid

Kafka consumergroupid

2

topic

Kafka consumertopic

3

partition

Kafka consumerpartition

4

untiloffset

最新的消費offset(由上面的GTP進行定位)

2.5.3業務流程說明

  使用者使用網站或APP進行註冊,後臺實時收集資料傳輸KafkaSpark Streaming進行對接統計,實時統計註冊人數。

2.5.4需求說明

  需求1:實時統計註冊人數,批次為3秒一批,使用updateStateBykey運算元計算曆史資料和當前批次的資料總數,僅此需求使用updateStateBykey,後續需求不使用updateStateBykey

  需求2:每6秒統統計一次1分鐘內的註冊資料,不需要歷史資料(提示:reduceByKeyAndWindow運算元)

  需求3:觀察對接資料,嘗試進行調優。

2.6實時統計學員做題正確率與知識點掌握度

2.6.1 MySQL建表語句

CREATE TABLE `qz_point_detail` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `qz_sum` int(11) DEFAULT NULL,
  `qz_count` int(11) DEFAULT NULL,
  `qz_istrue` int(11) DEFAULT NULL,
  `correct_rate` double(4,2) DEFAULT NULL,
  `mastery_rate` double(4,2) DEFAULT NULL,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_detail_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `qz_point_history` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `questionids` text,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_set_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.6.2表結構說明

表名:

qz_point_detail

主鍵:

`qz_point_detail_unique` (`userid`,`courseid`,`pointid`)

欄位名

欄位說明

1

userid

使用者id

2

courseid

課程id

3

pointid

知識點id

4

qz_sum

做題總數(與歷史進行累加)

5

qz_count

當前批次做題個數(去重)

6

qz_istrue

做題正確題目總數(與歷史進行累加)

7

correct_rate

正確率

題目正確率=qz_istrue/qz_count

8

mastery_rate

知識點掌握程度=(題目正確率*題目完成度)

題目完成度=當前知識點去重完成題目數/當前知識點題目總數10

9

createtime

建立時間

10

updatetime

更新時間

表名:

qz_point_history

主鍵:

`qz_point_set_unique` (`userid`,`courseid`,`pointid`)

欄位名

欄位說明

1

userid

使用者id

2

courseid

課程id

3

pointid

知識點id

4

questionids

題目id(使用,”作為拼接)

5

createtime

建立時間

6

updatetime

更新時間

2.6.3業務流程說明

  使用者在網站或APP進行做題,做完題點選交卷按鈕,程式將做題記錄提交,傳輸到Kafka中,下游Spark Streaming對接kafka實現實時計算做題正確率和掌握度的計算,將正確率和掌握度存入MySQL中,使用者點選交卷後重新整理頁面能立馬(思考:這個更新的速度取決於什麼?)看到自己做題的詳情。

2.6.4需求說明

  需求1:要求Spark Streaming保證資料不丟失,每秒100條處理速度,需要手動維護偏移量

  需求2:同一個使用者做在同一門課程同一知識點下做題需要去重,需要根據歷史資料進行去重並且記錄去重後的做題id與個數

  需求3:計算知識點正確率正確率計算公式:做題正確總個數/做題總數)保留兩位小數

  需求4:計算知識點掌握度,(知識點掌握度=去重後的做題個數/當前知識點總題數(已知10題)*當前知識點的正確率

2.7實時統計商品頁到訂單頁,訂單頁到支付頁轉換率

2.7.1 MySQL建表語句

CREATE TABLE `page_jump_rate` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `last_page_id` INT(11) DEFAULT NULL,
  `page_id` INT(11) DEFAULT NULL,
  `next_page_id` INT(11) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  `jump_rate` VARCHAR(10) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `page_jum_rate_unique` (`page_id`)
) ENGINE=INNODB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8;

CREATE TABLE `tmp_city_num_detail` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `tmp_cityp_num_index_province` (`province`)
) ENGINE=INNODB AUTO_INCREMENT=4191 DEFAULT CHARSET=utf8;

CREATE TABLE `top_city_num` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

2.7.2表結構說明

表名:

page_jump_rate

主鍵:

`id`

唯一鍵:

`page_jum_rate_unique` (`page_id`)

欄位名

欄位說明

1

id

使用者id

2

last_page_id

上一頁面id 1:商品課程頁 2:訂單頁面 3:支付頁面

3

page_id

當前頁面id 1:商品課程頁 2:訂單頁面 3:支付頁面

4

next_page_id

下一頁面id 1:商品課程頁 2:訂單頁面 3:支付頁面

5

num

6

jump_rate

頁面跳轉率

表名:

tmp_city_num_detail

主鍵:

`id`

唯一鍵:

`tmp_cityp_num_index_province` (`province`)

欄位名

欄位說明

1

id

自增id

2

province

省份

3

num

各省資料統計結果

表名:

top_city_num

主鍵:

`id`

唯一鍵:

`page_jum_rate_unique` (`page_id`)

欄位名

欄位說明

1

id

自增id

2

province

省份

3

num

各省資料統計結果(只取前3

2.7.3業務流程說明

  使用者瀏覽課程首頁點選下訂單,跳轉到訂單頁面,再點選支付跳轉到支付頁面進行支付,收集各頁面跳轉json資料,解析json資料計算各頁面點選數和轉換率,計算top3點選量按地區排名(ip欄位,需要根據歷史資料累計)

2.7.4需求說明

  需求1:計算首頁(商品詳情頁)總瀏覽數、訂單頁總瀏覽數、支付頁面總瀏覽數

  需求2:計算商品課程頁面到訂單頁的跳轉轉換率、訂單頁面到支付頁面的跳轉轉換率

  需求3:根據ip得出相應省份,展示出top3省份的點選數,需要根據歷史資料累加

  注:此處預設首頁為商品頁,如果當前頁為商品頁則無需計算轉化率,記為100%

  為了簡化需求,該頁面跳轉邏輯預設為1號頁面跳轉至2號頁面,2號頁面才能跳轉3號頁面。3號不能跳轉回2號和1號。即頁面是按序號順序前進。

2.8實時統計學員播放視訊各時長

2.8.1 MySQL建表語句

CREATE TABLE `video_learn_detail` (
  `userid` INT(11) NOT NULL DEFAULT '0',
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `videoid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  `effecttime` BIGINT(20) DEFAULT NULL,
  `completetime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`userid`,`cwareid`,`videoid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `chapter_learn_detail` (
  `chapterid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`chapterid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `cwareid_learn_detail` (
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`cwareid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;


CREATE TABLE `edutype_learn_detail` (
  `edutypeid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`edutypeid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;


CREATE TABLE `sourcetype_learn_detail` (
  `sourcetype` VARCHAR(10) NOT NULL DEFAULT '',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`sourcetype_learn`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `subject_learn_detail` (
  `subjectid` INT(11) NOT NULL DEFAULT '0',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`subjectid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `video_interval` (
  `userid` INT(11) NOT NULL DEFAULT '0',
  `cwareid` INT(11) NOT NULL DEFAULT '0',
  `videoid` INT(11) NOT NULL DEFAULT '0',
  `play_interval` TEXT,
  PRIMARY KEY (`userid`,`cwareid`,`videoid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

2.8.2表結構說明

表名:

video_learn_detail

主鍵:

`userid`,`cwareid`,`videoid`

欄位名

欄位說明

1

userid

使用者id

2

cwareid

課件id

3

videoid

視訊id

4

totaltime

播放總時長:(te-ts)/1000

5

effecttime

有效總時長:[((te-ts)/1000)/(pe-ps)] * complete_duration

6

completetime

完成總時長:(pe-ps)需要對歷史資料進行對比並去重

表名:

chapter_learn_detail

主鍵:

chapterid

欄位名

欄位說明

1

chapterid

章節id

2

totaltime

統計總時長

表名:

cwareid_learn_detail

主鍵:

cwareid

欄位名

欄位說明

1

cwareid

課件id

2

totaltime

統計總時長

表名:

edutype_learn_detail

主鍵:

edutypeid

欄位名

欄位說明

1

edutypeid

輔導id

2

totaltime

統計總時長

表名:

sourcetype_learn_detail

主鍵:

sourcetype

欄位名

欄位說明

1

sourcetype

播放裝置來源型別

2

totaltime

統計總時長

表名:

subject_learn_detail

主鍵:

subjectid

欄位名

欄位說明

1

subjectid

主題id

2

totaltime

統計總時長

表名:

video_interval

主鍵:

`userid`,`cwareid`,`videoid`

欄位名

欄位說明

1

userid

使用者id

2

cwareid

課件id

3

videoid

視訊id

4

play_interval

播放歷史區間

2.8.3業務流程說明

  使用者線上播放視訊進行學習課程,後臺記錄視訊播放開始區間和結束區間,及播放開始時間和播放結束時間,後臺手機資料傳輸kafka需要計算使用者播放視訊總時長、有效時長、完成時長,及各維度總播放時長。

2.8.4需求說明

  需求1:計算各章節下的播放總時長(chapterid聚合統計播放總時長)

  需求2:計算各課件下的播放總時長(cwareid聚合統計播放總時長)

  需求3:計算各輔導下的播放總時長(edutypeid聚合統計播放總時長)

  需求4:計算各播放平臺下的播放總時長(sourcetype聚合統計播放總時長)

  需求5:計算各科目下的播放總時長(subjectid聚合統計播放總時長)

  需求6:計算使用者學習視訊的播放總時長、有效時長、完成時長,需求記錄視訊播歷史區間,對於使用者多次學習的播放區間不累計有效時長和完成時長。

  播放總時長計算:(te-ts)/1000 向下取整 單位:秒

  完成時長計算:根據pe-ps 計算 需要對歷史資料進行去重處理

  有效時長計算:根據te-ts 除以pe-ps 先計算出播放每一區間需要的實際時長 * 完成時長

3章 思考

  (1Spark Streaming下每個stage的耗時由什麼決定

  (2Spark Streaming task發生資料傾斜如何解決

  (3Spark Streaming操作MySQL時,相同維度的資料如何保證執行緒安全問題

  (4)如何保證kill Spark Streaming任務的時候不丟失資料

  (5)如何保證Spark Streaming的第一次啟動和kill後第二次啟動時據不丟失資料

  (6Spark Streaming下如何正確操作MySQL(如何正確使用連線)

  (7MySQL建表時 注意索引問題

4章 建立maven專案

4.1education-online父工程下建立子專案

4.2建立相應的包

4.3配置pom.xml檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>education-online</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>com_atguigu_sparkstreaming</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
                        <scope>provided</scope>
            <version>${spark.version}</version>
            <!-- provider如果存在,那麼執行時該Jar包不存在,也不會打包到最終的釋出版本中,只是編譯器有效 -->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
                        <scope>provided</scope>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
                        <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/bk/ring-gzip -->

        <!--MySql連線池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.16</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.29</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.lionsoul</groupId>
            <artifactId>ip2region</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <scope>provided</scope>
            <artifactId>snappy-java</artifactId>
            <version>1.1.2.6</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.1</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

5章 需求實現

5.1建立MySql配置檔案

  在resources原始碼包下建立comerce.peoperties檔案

jdbc.url=jdbc:mysql://hadoop102:3306/course_learn?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false
jdbc.user=root
jdbc.password=123456

5.2建立讀取配置檔案的工具類

  在com.atguigu.qzpoint.util建立ConfigurationManager

package com.atguigu.qzpoint.util;

import java.io.InputStream;
import java.util.Properties;

/**
 *
 * 讀取配置檔案工具類
 */
public class ConfigurationManager {

  private static Properties prop = new Properties();

  static {
    try {
      InputStream inputStream = ConfigurationManager.class.getClassLoader()
          .getResourceAsStream("comerce.properties");
      prop.load(inputStream);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  //獲取配置項
  public static String getProperty(String key) {
    return prop.getProperty(key);
  }

  //獲取布林型別的配置項
  public static boolean getBoolean(String key) {
    String value = prop.getProperty(key);
    try {
      return Boolean.valueOf(value);
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }

}

5.3建立Json解析工具類

在com.atguigu.qz.point.util建立ParseJsonData類

package com.atguigu.qzpoint.util;

import com.alibaba.fastjson.JSONObject;


public class ParseJsonData {

    public static JSONObject getJsonData(String data) {
        try {
            return JSONObject.parseObject(data);
        } catch (Exception e) {
            return null;
        }
    }
}

5.4建立Druid連線池

  Druid連線池專案地址:https://github.com/alibaba/druid

  Druid連線池中文文件:https://github.com/alibaba/druid/wiki/%E9%A6%96%E9%A1%B5

  Druid連線池配置屬性列表:https://github.com/alibaba/druid/wiki/DruidDataSource%E9%85%8D%E7%BD%AE%E5%B1%9E%E6%80%A7%E5%88%97%E8%A1%A8

  在com.atgugiu.qzpoint.util建立DataSourceUtil類

package com.atguigu.qzpoint.util;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.Serializable;
import java.sql.*;
import java.util.Properties;

/**
 * 德魯伊連線池
 */
public class DataSourceUtil implements Serializable {
    public static DataSource dataSource = null;

    static {
        try {
            Properties props = new Properties();
            props.setProperty("url", ConfigurationManager.getProperty("jdbc.url"));
            props.setProperty("username", ConfigurationManager.getProperty("jdbc.user"));
            props.setProperty("password", ConfigurationManager.getProperty("jdbc.password"));
            props.setProperty("initialSize", "5"); //初始化大小
            props.setProperty("maxActive", "10"); //最大連線
            props.setProperty("minIdle", "5");  //最小連線
            props.setProperty("maxWait", "60000"); //等待時長
            props.setProperty("timeBetweenEvictionRunsMillis", "2000");//配置多久進行一次檢測,檢測需要關閉的連線 單位毫秒
            props.setProperty("minEvictableIdleTimeMillis", "600000");//配置連線在連線池中最小生存時間 單位毫秒
            props.setProperty("maxEvictableIdleTimeMillis", "900000"); //配置連線在連線池中最大生存時間 單位毫秒
            props.setProperty("validationQuery", "select 1");
            props.setProperty("testWhileIdle", "true");
            props.setProperty("testOnBorrow", "false");
            props.setProperty("testOnReturn", "false");
            props.setProperty("keepAlive", "true");
            props.setProperty("phyMaxUseCount", "100000");
//            props.setProperty("driverClassName", "com.mysql.jdbc.Driver");
            dataSource = DruidDataSourceFactory.createDataSource(props);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //提供獲取連線的方法
    public static Connection getConnection() throws SQLException {
        return dataSource.getConnection();
    }

    // 提供關閉資源的方法【connection是歸還到連線池】
    // 提供關閉資源的方法 【方法過載】3 dql
    public static void closeResource(ResultSet resultSet, PreparedStatement preparedStatement,
                                     Connection connection) {
        // 關閉結果集
        // ctrl+alt+m 將java語句抽取成方法
        closeResultSet(resultSet);
        // 關閉語句執行者
        closePrepareStatement(preparedStatement);
        // 關閉連線
        closeConnection(connection);
    }

    private static void closeConnection(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    private static void closePrepareStatement(PreparedStatement preparedStatement) {
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }


    private static void closeResultSet(ResultSet resultSet) {
        if (resultSet != null) {
            try {
                resultSet.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

5.5建立操作MySQL的代理類

  在com.atguigu.qzpoint.util建立SqlProxy

package com.atguigu.qzpoint.util

import java.sql.{Connection, PreparedStatement, ResultSet}

trait QueryCallback {
  def process(rs: ResultSet)
}

class SqlProxy {
  private var rs: ResultSet = _
  private var psmt: PreparedStatement = _

  /**
    * 執行修改語句
    *
    * @param conn
    * @param sql
    * @param params
    * @return
    */
  def executeUpdate(conn: Connection, sql: String, params: Array[Any]): Int = {
    var rtn = 0
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }
      rtn = psmt.executeUpdate()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }

  /**
    * 執行查詢語句
    * 執行查詢語句
    *
    * @param conn
    * @param sql
    * @param params
    * @return
    */
  def executeQuery(conn: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback) = {
    rs = null
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }
      rs = psmt.executeQuery()
      queryCallback.process(rs)
    } catch {
      case e: Exception => e.printStackTrace()
    }
  }

  def shutdown(conn: Connection): Unit = DataSourceUtil.closeResource(rs, psmt, conn)
}

5.6實時統計註冊人數程式碼實現

package com.atguigu.qzpoint.streaming

import java.lang
import java.sql.ResultSet
import java.util.Random

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RegisterStreaming {
  private val groupid = "register_group_test"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("register_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    ssc.checkpoint("hdfs://hadoop102:9000/user/atguigu/sparkstreaming/checkpoint")
    //查詢mysql中是否有偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close() //關閉遊標
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //設定kafka消費資料的引數  判斷本地是否有偏移量  有則根據偏移量繼續消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    val resultDStream = stream.filter(item => item.value().split("\t").length == 3).
      mapPartitions(partitions => {
        partitions.map(item => {
          val line = item.value()
          val arr = line.split("\t")
          val app_name = arr(1) match {
            case "1" => "PC"
            case "2" => "APP"
            case _ => "Other"
          }
          (app_name, 1)
        })
      })
    resultDStream.cache()
    //    resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum //本批次求和
      val previousCount = state.getOrElse(0) //歷史資料
      Some(currentCount + previousCount)
    }
    resultDStream.updateStateByKey(updateFunc).print()


//        val dsStream = stream.filter(item => item.value().split("\t").length == 3)
//          .mapPartitions(partitions =>
//            partitions.map(item => {
//              val rand = new Random()
//              val line = item.value()
//              val arr = line.split("\t")
//              val app_id = arr(1)
//              (rand.nextInt(3) + "_" + app_id, 1)
//            }))
//        dsStream.print()
//        val a = dsStream.reduceByKey(_ + _)
//        a.print()
//        a.map(item => {
//          val appid = item._1.split("_")(1)
//          (appid, item._2)
//        }).reduceByKey(_ + _).print()

    //處理完 業務邏輯後 手動提交offset維護到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

5.7實時統計學員做題正確率與知識點掌握度

package com.atguigu.qzpoint.streaming

import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * 知識點掌握度實時統計
  */
object QzPointStreaming {

  private val groupid = "qz_point_group"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("qz_log")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close() //關閉遊標
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //設定kafka消費資料的引數  判斷本地是否有偏移量  有則根據偏移量繼續消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //過濾不正常資料 獲取資料
    val dsStream = stream.filter(item => item.value().split("\t").length == 6).
      mapPartitions(partition => partition.map(item => {
        val line = item.value()
        val arr = line.split("\t")
        val uid = arr(0) //使用者id
        val courseid = arr(1) //課程id
        val pointid = arr(2) //知識點id
        val questionid = arr(3) //題目id
        val istrue = arr(4) //是否正確
        val createtime = arr(5) //建立時間
        (uid, courseid, pointid, questionid, istrue, createtime)
      }))
    dsStream.foreachRDD(rdd => {
      //獲取相同使用者 同一課程 同一知識點的資料
      val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3)
      groupRdd.foreachPartition(partition => {
        //在分割槽下獲取jdbc連線
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach { case (key, iters) =>
            qzQuestionUpdate(key, iters, sqlProxy, client) //對題庫進行更新操作
          }
        } catch {
          case e: Exception => e.printStackTrace()
        }
        finally {
          sqlProxy.shutdown(client)
        }
      }
      )
    })
    //處理完 業務邏輯後 手動提交offset維護到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 對題目表進行更新操作
    *
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
    * @return
    */
  def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("-")
    val userid = keys(0).toInt
    val courseid = keys(1).toInt
    val pointid = keys(2).toInt
    val array = iters.toArray
    val questionids = array.map(_._4).distinct //對當前批次的資料下questionid 去重
    //查詢歷史資料下的 questionid
    var questionids_history: Array[String] = Array()
    sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            questionids_history = rs.getString(1).split(",")
          }
          rs.close() //關閉遊標
        }
      })
    //獲取到歷史資料後再與當前資料進行拼接 去重
    val resultQuestionid = questionids.union(questionids_history).distinct
    val countSize = resultQuestionid.length
    val resultQuestionid_str = resultQuestionid.mkString(",")
    val qz_count = questionids.length //去重後的題個數
    var qz_sum = array.length //獲取當前批次題總數
    var qz_istrue = array.filter(_._5.equals("1")).size //獲取當前批次做正確的題個數
    val createtime = array.map(_._6).min //獲取最早的建立時間 作為表中建立時間
    //更新qz_point_set 記錄表 此表用於存當前使用者做過的questionid表
    val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
    sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " +
      " on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))

    var qzSum_history = 0
    var istrue_history = 0
    sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            qzSum_history += rs.getInt(1)
            istrue_history += rs.getInt(2)
          }
          rs.close()
        }
      })
    qz_sum += qzSum_history
    qz_istrue += istrue_history
    val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //計算正確率
    //計算完成率
    //假設每個知識點下一共有30道題  先計算題的做題情況 再計知識點掌握度
    val qz_detail_rate = countSize.toDouble / 30 //算出做題情況乘以 正確率 得出完成率 假如30道題都做了那麼正確率等於 知識點掌握度
    val mastery_rate = qz_detail_rate * correct_rate
    sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" +
      " values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
      Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))

  }
}

5.8實時統計商品頁到訂單頁,訂單頁到支付頁轉換率

package com.atguigu.qzpoint.streaming

import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat

import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
  * 頁面轉換率實時統計
  */
object PageStreaming {
  private val groupid = "vip_count_groupid"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
  
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("page_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }

    //設定kafka消費資料的引數 判斷本地是否有偏移量  有則根據偏移量繼續消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //解析json資料
    val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
        val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
        val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
        val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
        val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
        val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
        val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
        (uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
      })
    }).filter(item => {
      !item._5.equals("") && !item._6.equals("") && !item._7.equals("")
    })
    dsStream.cache()
    val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1))
    val resultDStream = pageValueDStream.reduceByKey(_ + _)
    resultDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        //在分割槽下獲取jdbc連線
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach(item => {
            calcPageJumpCount(sqlProxy, item, client) //計算頁面跳轉個數
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
    })

    ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath) //廣播檔案
    val ipDStream = dsStream.mapPartitions(patitions => {
      val dbFile = SparkFiles.get("ip2region.db")
      val ipsearch = new DbSearcher(new DbConfig(), dbFile)
      patitions.map { item =>
        val ip = item._4
        val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //獲取ip詳情   中國|0|上海|上海市|有線通
        (province, 1l) //根據省份 統計點選個數
      }
    }).reduceByKey(_ + _)


    ipDStream.foreachRDD(rdd => {
      //查詢mysql歷史資料 轉成rdd
      val ipSqlProxy = new SqlProxy()
      val ipClient = DataSourceUtil.getConnection
      try {
        val history_data = new ArrayBuffer[(String, Long)]()
        ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
          override def process(rs: ResultSet): Unit = {
            while (rs.next()) {
              val tuple = (rs.getString(1), rs.getLong(2))
              history_data += tuple
            }
          }
        })
        val history_rdd = ssc.sparkContext.makeRDD(history_data)
        val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
          val province = item._1
          val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l)
          (province, nums)
        })
        resultRdd.foreachPartition(partitions => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitions.foreach(item => {
              val province = item._1
              val num = item._2
              //修改mysql資料 並重組返回最新結果資料
              sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
                Array(province, num, num))
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            sqlProxy.shutdown(client)
          }
        })
        val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
        sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
        top3Rdd.foreach(item => {
          sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
        })
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(ipClient)
      }
    })

    //計算轉換率
    //處理完 業務邏輯後 手動提交offset維護到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        calcJumRate(sqlProxy, client) //計算轉換率
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 計算頁面跳轉個數
    *
    * @param sqlProxy
    * @param item
    * @param client
    */
  def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
    val keys = item._1.split("_")
    var num: Long = item._2
    val page_id = keys(1).toInt //獲取當前page_id
    val last_page_id = keys(0).toInt //獲取上一page_id
    val next_page_id = keys(2).toInt //獲取下頁面page_id
    //查詢當前page_id的歷史num個數
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          num += rs.getLong(1)
        }
        rs.close()
      }

      //對num 進行修改 並且判斷當前page_id是否為首頁
      if (page_id == 1) {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" +
          "values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
      } else {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" +
          "values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num))
      }
    })
  }

  /**
    * 計算轉換率
    */
  def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
    var page1_num = 0l
    var page2_num = 0l
    var page3_num = 0l
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page1_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page2_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page3_num = rs.getLong(1)
        }
      }
    })
    val nf = NumberFormat.getPercentInstance
    val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
    val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))
  }

}

5.9實時統計學員播放視訊各時長

package com.atguigu.qzpoint.streaming

import java.lang
import java.sql.{Connection, ResultSet}

import com.atguigu.qzpoint.bean.LearnModel
import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

object CourseLearnStreaming {
  private val groupid = "course_learn_test1"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")

    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("course_learn")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //設定kafka消費資料的引數 判斷本地是否有偏移量  有則根據偏移量繼續消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }

    //解析json資料
    val dsStream = stream.mapPartitions(partitions => {
      partitions.map(item => {
        val json = item.value()
        val jsonObject = ParseJsonData.getJsonData(json)
        val userId = jsonObject.getIntValue("uid")
        val cwareid = jsonObject.getIntValue("cwareid")
        val videoId = jsonObject.getIntValue("videoid")
        val chapterId = jsonObject.getIntValue("chapterid")
        val edutypeId = jsonObject.getIntValue("edutypeid")
        val subjectId = jsonObject.getIntValue("subjectid")
        val sourceType = jsonObject.getString("sourceType")
        val speed = jsonObject.getIntValue("speed")
        val ts = jsonObject.getLong("ts")
        val te = jsonObject.getLong("te")
        val ps = jsonObject.getIntValue("ps")
        val pe = jsonObject.getIntValue("pe")
        LearnModel(userId, cwareid, videoId, chapterId, edutypeId, subjectId, sourceType, speed, ts, te, ps, pe)
      })
    })

    dsStream.foreachRDD(rdd => {
      rdd.cache()
      //統計播放視訊 有效時長 完成時長 總時長
      rdd.groupBy(item => item.userId + "_" + item.cwareId + "_" + item.videoId).foreachPartition(partitoins => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitoins.foreach { case (key, iters) =>
            calcVideoTime(key, iters, sqlProxy, client) //計算視訊時長
          }
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
      //統計章節下視訊播放總時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.chapterId
          (key, totaltime)
        })
      }).reduceByKey(_ + _)
        .foreachPartition(partitoins => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitoins.foreach(item => {
              sqlProxy.executeUpdate(client, "insert into chapter_learn_detail(chapterid,totaltime) values(?,?) on duplicate key" +
                " update totaltime=totaltime+?", Array(item._1, item._2, item._2))
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            sqlProxy.shutdown(client)
          }
        })

      //統計課件下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.cwareId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into cwareid_learn_detail(cwareid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })

      //統計輔導下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.edutypeId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into edutype_learn_detail(edutypeid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })

      //統計同一資源平臺下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.sourceType
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into sourcetype_learn_detail (sourcetype_learn,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
      // 統計同一科目下的播放總時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.subjectId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitons => {
        val sqlProxy = new SqlProxy()
        val clinet = DataSourceUtil.getConnection
        try {
          partitons.foreach(item => {
            sqlProxy.executeUpdate(clinet, "insert into subject_learn_detail(subjectid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(clinet)
        }
      })

    })
    //計算轉換率
    //處理完 業務邏輯後 手動提交offset維護到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 計算視訊 有效時長  完成時長 總時長
    *
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
    */
  def calcVideoTime(key: String, iters: Iterable[LearnModel], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("_")
    val userId = keys(0).toInt
    val cwareId = keys(1).toInt
    val videoId = keys(2).toInt
    //查詢歷史資料
    var interval_history = ""
    sqlProxy.executeQuery(client, "select play_interval from video_interval where userid=? and cwareid=? and videoid=?",
      Array(userId, cwareId, videoId), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            interval_history = rs.getString(1)
          }
          rs.close()
        }
      })
    var effective_duration_sum = 0l //有效總時長
    var complete_duration_sum = 0l //完成總時長
    var cumulative_duration_sum = 0l //播放總時長
    val learnList = iters.toList.sortBy(item => item.ps) //轉成list 並根據開始區間升序排序
    learnList.foreach(item => {
      if ("".equals(interval_history)) {
        //沒有歷史區間
        val play_interval = item.ps + "-" + item.pe //有效區間
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) //有效時長
        val complete_duration = item.pe - item.ps //完成時長
        effective_duration_sum += effective_duration.toLong
        cumulative_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        interval_history = play_interval
      } else {
        //有歷史區間進行對比
        val interval_arry = interval_history.split(",").sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tuple = getEffectiveInterval(interval_arry, item.ps, item.pe)
        val complete_duration = tuple._1 //獲取實際有效完成時長
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) / (item.pe - item.ps) * complete_duration //計算有效時長
        val cumulative_duration = Math.ceil((item.te - item.ts) / 1000) //累計時長
        interval_history = tuple._2
        effective_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        cumulative_duration_sum += cumulative_duration.toLong
      }
      sqlProxy.executeUpdate(client, "insert into video_interval(userid,cwareid,videoid,play_interval) values(?,?,?,?) " +
        "on duplicate key update play_interval=?", Array(userId, cwareId, videoId, interval_history, interval_history))
      sqlProxy.executeUpdate(client, "insert into video_learn_detail(userid,cwareid,videoid,totaltime,effecttime,completetime) " +
        "values(?,?,?,?,?,?) on duplicate key update totaltime=totaltime+?,effecttime=effecttime+?,completetime=completetime+?",
        Array(userId, cwareId, videoId, cumulative_duration_sum, effective_duration_sum, complete_duration_sum, cumulative_duration_sum,
          effective_duration_sum, complete_duration_sum))
    })
  }

  /**
    * 計算有效區間
    *
    * @param array
    * @param start
    * @param end
    * @return
    */
  def getEffectiveInterval(array: Array[String], start: Int, end: Int) = {
    var effective_duration = end - start
    var bl = false //是否對有效時間進行修改
    import scala.util.control.Breaks._
    breakable {
      for (i <- 0 until array.length) {
        //迴圈各區間段
        var historyStart = 0 //獲取其中一段的開始播放區間
        var historyEnd = 0 //獲取其中一段結束播放區間
        val item = array(i)
        try {
          historyStart = item.split("-")(0).toInt
          historyEnd = item.split("-")(1).toInt
        } catch {
          case e: Exception => throw new Exception("error array:" + array.mkString(","))
        }
        if (start >= historyStart && historyEnd >= end) {
          //已有資料佔用全部播放時長 此次播放無效
          effective_duration = 0
          bl = true
          break()
        } else if (start <= historyStart && end > historyStart && end < historyEnd) {
          //和已有資料左側存在交集 扣除部分有效時間(以老資料為主進行對照)
          effective_duration -= end - historyStart
          array(i) = start + "-" + historyEnd
          bl = true
        } else if (start > historyStart && start < historyEnd && end >= historyEnd) {
          //和已有資料右側存在交集 扣除部分有效時間
          effective_duration -= historyEnd - start
          array(i) = historyStart + "-" + end
          bl = true
        } else if (start < historyStart && end > historyEnd) {
          //現資料 大於舊資料 扣除舊資料所有有效時間
          effective_duration -= historyEnd - historyStart
          array(i) = start + "-" + end
          bl = true
        }
      }
    }
    val result = bl match {
      case false => {
        //沒有修改原array 沒有交集 進行新增
        val distinctArray2 = ArrayBuffer[String]()
        distinctArray2.appendAll(array)
        distinctArray2.append(start + "-" + end)
        val distinctArray = distinctArray2.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        tmpArray.append(distinctArray(0))
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            //沒有交集
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            //有交集
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
          }
        }
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
        play_interval
      }
      case true => {
        //修改了原array 進行區間重組
        val distinctArray = array.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        tmpArray.append(distinctArray(0))
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            //沒有交集
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            //有交集
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
          }
        }
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
        play_interval
      }
    }
    (effective_duration, result)
  }
}

6章 總結與調優

6.1保證Spark Streaming第一次啟動不丟資料

  在kafka的引數auto.offset.rest設定為earlist保證Spark Streaming第一次啟動從kafka最早偏移量開始拉取資料

6.2 Spark Streaming手動維護偏移量

  在Spark Streaming下有三種消費模式的定義 最多一次、至少一次、恰好一次

  那麼最好是無限接近恰好一次。要實現恰好一次,偏移量必須手動維護,因為自動提交會在Spark Streaming剛執行時就立馬提交offset,如果這個時候Spark Streaming消費資訊失敗了,那麼offset也就錯誤提交了。所以必須保證:

  1)手動維護偏移量

  2)處理完業務資料後再提交offset,手動維護偏移量 需設定kafka引數enable.auto.commit改為false

  手動維護提交offset有兩種選擇:

  1)處理完業務資料後手動提交到Kafka

  2)處理完業務資料後手動提交到本地庫,MySqlHBase

6.2.1手動提交到Kafka

  先來看如何提交到kafka 官網所示:

  stream.foreachRdd後根據每個rdd先轉換成HashOffsetRanges物件通過.offsetRanges方法獲取到偏移量物件,再通過commitAsync方法將偏移量提交。

6.2.2維護到本地MySQL

  如專案所示:Driver端需先去判斷MySQL庫中是否存在偏移量,如果存在偏移量則從MySQL中獲取到當前topic對應的最新offset大小,如果MySQL不存在則從kafka中獲取

  消費到資料後,進行業務處理處理完後需將offset最新值儲存到MySql

  那麼如果有面試官提問如何保證資料恰好一次性消費回答到這兩點一般就可以了,手動維護偏移量和先處理完業務資料再提交offset。但是處理業務資料和提交offset並非同一事務,在極端情況下如提交offset時斷網斷電還是會導致offset沒有提交併且業務資料已處理完的情況。

  那麼保證事務就需要將並行度調成1或者將資料collectdriver端,再進行資料業務處理和提交offset,但這樣還會導致並行度變成1很可能導致處理速度跟不上,所以大資料情況下一般不考慮事務。

6.3 updateStateByKey運算元與checkpoint

  updateStateBykey運算元根據官網描述,是返回一個新的“狀態”的DStream的運算元,其通過在鍵的先前狀態和鍵的新值上應用給定函式更新每個鍵的狀態。

  具體寫法:

  根據歷史狀態值,和當前批次的資料狀態值的累加操作,得出一個最新的結果。如專案中程式碼:

  那麼使用updateStateByBykey運算元,必須使用Spark Streamingcheckpoint來維護歷史狀態資料

  Spark on Yarn模式是分散式處理資料的,那麼為了讓所有executor都能訪問到state歷史狀態資料,必須將state狀態資料維護在HDFS上,如專案上所指定目錄:

  那麼看下Hdfs上路徑下的檔案

  存在小檔案且小檔案個數不可控,所以在真實企業生產環境上並不會使用checkpoint操作,也不會使用基於checkpoint的運算元如updateStateBykey運算元

  那麼如何代替updateStateBykey這種基於歷史資料狀態的操作的運算元呢:

  在進行相應操作時,可以去庫中查詢出歷史資料,再與當前資料進行操作得出最新結果集,將結果集再重新整理到本地庫中。

6.4計算Spark Streaming一秒鐘拉取多少條資料

  在企業中往往會根據業務的實時性來定製一秒鐘消費資料量的條數,來達到實時性,那麼通過什麼引數來設定Spark Streamingkafka的拉取的條數呢。

  根據官網描述,可以設定spark.streaming.kafka.maxRatePerPartition引數來設定Spark Streamingkafka分割槽每秒拉取的條數

  那麼在專案中如 實時統計學員做題正確率與知識點掌握度需求中,需要每秒100處理速度,針對此需求topicqz_log 分割槽為10,那麼通過此引數設定10即可,每個分割槽沒秒10條資料。一秒處理100條資料,當前批次為3秒一次,一批處理300條資料.

6.5 Spark Streaming背壓機制

  根據官網描述 Spark Streaming背壓機制 使Spark Streaming能夠根據當前的批處理排程延遲和處理時間來動態控制接收速率,以便系統只接收系統可以處理的速度。背壓機制的上限速率由spark.streaming.kafka.maxRatePerPartition控制,所以生產環境中往往會兩個引數一起使用。

6.6一個stage的耗時由什麼決定

  由上圖可以看出一個stage由最慢的task耗時決定。

6.7 Spark Streaming優雅關閉

  提交Spark Streaming任務到yarn後,當需要停止程式時使用yarn application -kill application_id命令來關閉Spark Streaming ,那麼操作此命令時需要保證資料不丟失,需要設定spark.streaming.stopGracefullOnShutdown引數為ture

  當設定此引數後,Spark Streaming程式在接收到kill命令時,不會立馬結束程式,Spark會在JVM關閉時正常關閉Spark Streaming,而不是是立馬關閉,即保證當前資料處理完後再關閉。

6.8 Spark Streaming預設分割槽數

  Spark Streaming預設並行度與所對應kafka topic建立時的分割槽數所對應,比如專案中topic的分割槽都是10Spark Streaming的預設分割槽就為10且在真實開發環境中Spark Streaming一般不會去使用repartition增大分割槽操作,因為會進行shuffle耗時。

7章 打包、spark-submit命令

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g  --class com.atguigu.qzpoint.streaming.CourseLearnStreaming com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g  --class com.atguigu.qzpoint.streaming.PageStreaming com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g  --class com.atguigu.qzpoint.streaming.QzPointStreaming com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g  --class com.atguigu.qzpoint.streaming.RegisterStreaming com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

8章 常見問題

8.1 jar包衝突問題

  根據官網描述spark-streaming-kafka-0-10_2.11.jar包中包含kafka-clients客戶端jar包不需要再次新增kafka客戶端jar包,如果再次新增kafka客戶端jar包可能會引起版本衝突

8.2無法序列化問題和正確操作資料庫連線

  對於開發者人員剛開始接手Spark Streaming時往往會錯誤的使用資料庫連線,如上述官網描述對於connection獲取,程式碼寫在了foreachRDDrdd.foreach上,那麼這樣操作實際是在driver端建立到connection,然後rdd.foreacher操作為分散式節點操作,connection.send方法實際發生在了各個executor節點上,這個時候就涉及到了driver端物件到executor端的一個網路傳輸問題,這個時候spark會發生錯誤,會報一個org.apache.spark.SparkException:Task not serializable這樣一個任務無法序列化的錯,在Spark中遇到此錯誤一般都是錯誤的將driver端物件在executor端使用造的。

  那麼建立connection操作必須在executor端執行

  如官網描述,在rdd.foreach裡建立connection,這樣雖然不會發生錯誤,但是這樣迴圈的粒度是針對每條資料,每迴圈一條資料都會建立一個連線,這樣會造成資源浪費。

  1和圖2都是錯誤展示

  最後,正確的使用資料庫連線,迴圈粒度是分割槽,在每個分割槽下建立一個數據庫連線,迴圈分割槽下的資料每條資料使用當前分割槽下的資料庫連線,當使用完畢後歸還的連線池中。所以在Spark Streaming開發中需養成良好習慣:

dstream.foreachRdd{rdd=>{
     rdd.foreachPartition{partitions=>{
        //迴圈分割槽
//建立connection  
        partitions.foreach(record=>{
          //業務處理  使用當前connection
            }
}  
        //歸還連線
}
}

  迴圈粒度 foreachRdd => foreachPartition => foreach

8.3 Spark Streaming操作資料庫時執行緒安全問題

  在Spark Streaming中,採用查詢本地庫的歷史資料和當前批次資料的計算來代替需要基於HDFS的運算元updateStateByKey,那麼在查詢和重新刷入本地庫的時候處理不當會造成執行緒安全問題,資料不準的問題。

  那麼在查詢本地庫時需要進行一次預聚合操作,將相同key的資料落到一個分割槽,保證同一個key的資料指揮操作資料庫一次,預聚合操作有reduceByKeygroupByKeygroupby等運算元。

  如專案所寫:

  題庫更新操作時需要查詢MySQL本地庫的歷史資料,在查詢本地庫前先進行了groupBy操作,將相同符合條件的業務資料聚合到了一個分割槽,保證相同使用者下同一課程同一知識點的資料在當前批次下只會去查詢一次MySQL資料庫並且一次覆蓋。

8.4資料傾斜問題

  資料傾斜為在shuffle過程中,必須將各個節點上相同的key的資料拉取到某節點的一個task來進行,此時如果某個key對應的資料量特別大的話,就會發生資料傾,某個task耗時非常大,那麼一個stage的耗時由最慢的task決定,從而導致整個Spark Streaming任務執行非常緩慢。

  以reduceByKey為例:

  這張圖就是發生了資料傾斜,那麼解決方案最有效的為兩階段聚合,先打散key聚合一次,再還原key聚合一次。

  具體程式碼展示:

  對DStream 進行map操作對原始key前加上隨機值,map完後進行第一次reduceByKey操作,此結果為打散key後的reduceByKey結果,再次進行map操作根據分隔符,去掉隨機數保留原有keymap後再進行reduceByKey,保證相同key的資料準確累加。

8.5 Spark Streaming消費多topic

  在真實環境中往往會有許多業務場景非常類似,比如打標籤、監控某指標,可能程式碼邏輯都一樣只有某個取值不一樣,這個時候一個Spark Streaming就可以監控多個topic,然後根據topic的名稱來進行不同的業務處理,就不需要開發多個Spark Streaming程式了。

  檢視kafkaUtils.createDirectStream方法

  可以發現topic引數可以是個多個值,也就是createDirectStream方法支援多個topic

  通過kafkaUtils.createDirectStream方法獲取到DStream,這個DStream流的型別為InputDStream[ConsumerRecord[String,String]],那麼在可以通過呼叫map方法,ConsumerRecordtopic方法來獲取對應的topic名稱

  獲取到topic名稱後value資料後,就可以在後續操作里根據判斷topic名稱來處理不同的業務。

8.6記憶體洩露(附加)

  記憶體洩露是指程式中已動態分配的堆記憶體由於某種原因程式未釋放或無法釋放,造成系統記憶體的浪費,導致程式執行速度減慢,甚至系統崩潰等嚴重後果。

  在Spark Streaming中往往會因為開發者程式碼未正確編寫導致無法回收或釋放物件,造成Spark Streaming記憶體洩露越跑越慢甚至崩潰的結果。那麼排查記憶體洩露需要一些第三方的工具

8.6.1 IBM HeapAnalyzer

  官網地址:https://www.ibm.com/developerworks/community/groups/service/html/communityview?communityUuid=4544bafe-c7a2-455f-9d43-eb866ea60091

  點選下載 記憶體洩露分析工具,下載下來是一個jar

  那麼需要編寫bat批處理來執行,建立run.bat

  編輯

title ibm-heap-analyzer

path=%PATH%;%C:\JAVA\jdk1.8.0_51\bin

E:

cd E:\IBM heapAnalyzer\IBM_DUMP_wjfx

java.exe -Xms1048M -Xmx4096M -jar ha456.jar

  路徑需要改成自己當前路徑,點選run.bat執行

  執行成功

8.6.2 模擬記憶體洩露場景

  記憶體洩露的原因往往是因為物件無法釋放或被回收造成,那麼在本專案中就模擬此場景。

  如上圖所示,在計算學員知識點正確率與掌握度程式碼中,在最後提交offset提交偏移量後,迴圈往map裡新增LearnMode物件,使每處理一批資料就往map裡新增100000LearnMode物件,使堆記憶體撐滿。

8.6.3 查詢driver程序

  在叢集上提交spark streaming任務

ps -ef |grep com.atguigu.qzpoint.streaming.QzPointStreaming

  通過此命令查詢到driver程序號

  程序號為6860

  通過Spark Ui發現該Spark Straming task任務發生長時間卡住現象,GC出現異常。疑似發生記憶體洩露

8.6.4 JMAP命令

  使用jmap -heap pid命令檢視6860程序,記憶體使用情況。

jmap -heap 6860

  發現新生代和老年代記憶體佔滿,有物件無法被銷燬或回收。再通過jmap -histo pid命令檢視物件的記憶體情況。

jmap -histo 6860 > a.log
jmap -dump:live,format=b,file=dump.log 6860 

  live:將java程序裡存活的物件拿出來。

  format=b:二進位制的檔案。

  file=dump.log:指定下載下來的檔名。

  將dump從叢集下載下來,開啟IBM HeapAnalyzer進行分析

  從餅狀圖可以看出,堆記憶體中存在大量HashEntry類,點選Analysis 分析,檢視各個物件記憶體的洩露大小和總大小。

  選中最大的分析物件雙擊,或者右鍵點選 Find object in a tree view檢視樹狀圖。

  可以看出HashEntry的父類為HashMap,並且點選HashEntry檢視內部,

  連結串列裡的next物件為LearnModel

  可以定位 Spark Streaming在操作map 新增LearnModel時發生了記憶體洩露