1. 程式人生 > 實用技巧 >基於Spark2.x新聞網大資料實時分析視覺化系統專案實戰

基於Spark2.x新聞網大資料實時分析視覺化系統專案實戰

本次專案是基於企業大資料經典案例專案(大資料日誌分析),全方位、全流程講解 大資料專案的業務分析、技術選型、架構設計、叢集規劃、安裝部署、整合繼承與開發和web視覺化互動設計。

專案程式碼託管於github,大家可以自行下載

一、業務需求分析

  1. 捕獲使用者瀏覽日誌資訊
  2. 實時分析前20名流量最高的新聞話題
  3. 實時統計當前線上已曝光的新聞話題
  4. 統計哪個時段使用者瀏覽量最高

二、系統架構圖設計

三、系統資料流程設計

四、叢集資源規劃設計

五、步驟詳解

考慮到實際情況,本人叢集配置共三個節點(node5、node6、node7)。

1. Zookeeper分散式叢集部署

參考部落格

2.Hadoop2.X HA架構與部署

參考部落格

3.HBase分散式叢集部署與設計

參考部落格

4.Kafka分散式叢集部署

參考部落格

5.Flume部署及資料採集準備

參考部落格,node6與node7中flume資料採集到node5中,而且node6和node7的flume配置檔案大致相同,node7中將a2改為a3,如下
a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/data/weblog-flume.log
a2.sources.r1.channels = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 1000
a2.channels.c1.keep-alive = 5

a2.sinks.k1.type = avro
a2.sinks.k1.channel = c1
a2.sinks.k1.hostname = node5
a2.sinks.k1.port = 5555

6.Flume+HBase+Kafka整合與開發

1.下載Flume原始碼並匯入Idea開發工具 1)將apache-flume-1.7.0-src.tar.gz原始碼下載到本地解壓 2)通過idea匯入flume原始碼 開啟idea開發工具,選擇File——》Open 然後找到flume原始碼解壓檔案,選中flume-ng-hbase-sink,點選ok載入相應模組的原始碼。 2.官方flume與hbase整合的引數介紹 3. 下載日誌資料並分析 到搜狗實驗室下載使用者查詢日誌 1)介紹

搜尋引擎查詢日誌庫設計為包括約1個月(2008年6月)Sogou搜尋引擎部分網頁查詢需求及使用者點選情況的網頁查詢日誌資料集合。為進行中文搜尋引擎使用者行為分析的研究者提供基準研究語料。

2)格式說明

資料格式為:訪問時間\t使用者ID\t[查詢詞]\t該URL在返回結果中的排名\t使用者點選的順序號\t使用者點選的URL

其中,使用者ID是根據使用者使用瀏覽器訪問搜尋引擎時的Cookie資訊自動賦值,即同一次使用瀏覽器輸入的不同查詢對應同一個使用者ID

4.node5聚合節點與HBase和Kafka的整合配置

node5通過flume接收node6與node7中flume傳來的資料,並將其分別傳送至hbase與kafka中,配置內容如下

a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink

a1.sources.r1.type = avro       
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = node5
a1.sources.r1.port = 5555 
a1.sources.r1.threads = 5 

#****************************flume + hbase****************************** 
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20

a1.sinks.hbaseSink.type = asynchbase
a1.sinks.hbaseSink.table = weblogs
a1.sinks.hbaseSink.columnFamily = info
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl

#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20

a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = node5:9092,node6:9092,node7:9092
a1.sinks.kafkaSink.topic = weblogs
a1.sinks.kafkaSink.zookeeperConnect = node5:2181,node6:2181,node7:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

5.對日誌資料進行格式處理

1)將檔案中的tab更換成逗號

cat weblog.log|tr "\t" "," > weblog.log

2)將檔案中的空格更換成逗號

cat weblog2.log|tr " " "," > weblog.log

6.自定義SinkHBase程式設計與開發

1)模仿SimpleAsyncHbaseEventSerializer自定義KfkAsyncHbaseEventSerializer實現類,修改一下程式碼即可。

  1. @Override
  2. public List<PutRequest> getActions() {
  3. List<PutRequest> actions = new ArrayList<PutRequest>();
  4. if (payloadColumn != null) {
  5. byte[] rowKey;
  6. try {
  7. /*---------------------------程式碼修改開始---------------------------------*/
  8. // 解析列欄位
  9. String[] columns = new String(this.payloadColumn).split(",");
  10. // 解析flume採集過來的每行的值
  11. String[] values = new String(this.payload).split(",");
  12. for(int i=0;i < columns.length;i++){
  13. byte[] colColumn = columns[i].getBytes();
  14. byte[] colValue = values[i].getBytes(Charsets.UTF_8);
  15. // 資料校驗:欄位和值是否對應
  16. if(colColumn.length != colValue.length) break;
  17. // 時間
  18. String datetime = values[0].toString();
  19. // 使用者id
  20. String userid = values[1].toString();
  21. // 根據業務自定義Rowkey
  22. rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
  23. // 插入資料
  24. PutRequest putRequest = new PutRequest(table, rowKey, cf,
  25. colColumn, colValue);
  26. actions.add(putRequest);
  27. /*---------------------------程式碼修改結束---------------------------------*/
  28. }
  29. } catch (Exception e) {
  30. throw new FlumeException("Could not get row key!", e);
  31. }
  32. }
  33. return actions;
  34. }

2)在SimpleRowKeyGenerator類中,根據具體業務自定義Rowkey生成方法

  1. public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
  2. return (userid + "-" + datetime + "-" + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
  3. }

7.自定義編譯程式打jar包

1)在idea工具中,選擇File——》ProjectStructrue

2)左側選中Artifacts,然後點選右側的+號,最後選擇JAR——》From modules with dependencies

3)然後直接點選ok

4)刪除其他依賴包,只把flume-ng-hbase-sink打成jar包就可以了。

5)然後依次點選apply,ok

6)點選build進行編譯,會自動打成jar包

7)到專案的apache-flume-1.7.0-src\flume-ng-sinks\flume-ng-hbase-sink\classes\artifacts\flume_ng_hbase_sink_jar目錄下找到剛剛打的jar包

8)將打包名字替換為flume自帶的包名flume-ng-hbase-sink-1.7.0.jar ,然後上傳至flume/lib目錄下,覆蓋原有的jar包即可。

7.資料採集/儲存/分發完整流程測試

1.在idea開發工具中構建weblogs專案,編寫資料生成模擬程式。

  1. package main.java;
  2. import java.io.*;
  3. public class ReadWrite {
  4. static String readFileName;
  5. static String writeFileName;
  6. public static void main(String args[]){
  7. readFileName = args[0];
  8. writeFileName = args[1];
  9. try {
  10. // readInput();
  11. readFileByLines(readFileName);
  12. }catch(Exception e){
  13. }
  14. }
  15. public static void readFileByLines(String fileName) {
  16. FileInputStream fis = null;
  17. InputStreamReader isr = null;
  18. BufferedReader br = null;
  19. String tempString = null;
  20. try {
  21. System.out.println("以行為單位讀取檔案內容,一次讀一整行:");
  22. fis = new FileInputStream(fileName);// FileInputStream
  23. // 從檔案系統中的某個檔案中獲取位元組
  24. isr = new InputStreamReader(fis,"GBK");
  25. br = new BufferedReader(isr);
  26. int count=0;
  27. while ((tempString = br.readLine()) != null) {
  28. count++;
  29. // 顯示行號
  30. Thread.sleep(300);
  31. String str = new String(tempString.getBytes("UTF8"),"GBK");
  32. System.out.println("row:"+count+">>>>>>>>"+tempString);
  33. method1(writeFileName,tempString);
  34. //appendMethodA(writeFileName,tempString);
  35. }
  36. isr.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. } finally {
  42. if (isr != null) {
  43. try {
  44. isr.close();
  45. } catch (IOException e1) {
  46. }
  47. }
  48. }
  49. }
  50. public static void method1(String file, String conent) {
  51. BufferedWriter out = null;
  52. try {
  53. out = new BufferedWriter(new OutputStreamWriter(
  54. new FileOutputStream(file, true)));
  55. out.write("\n");
  56. out.write(conent);
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. } finally {
  60. try {
  61. out.close();
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. }

2.參照前面idea工具專案打包方式,將該專案打成weblogs.jar包,然後上傳至bigdata-pro01.kfk.com節點的/opt/jars目錄下(目錄需要提前建立)

3.將weblogs.jar分發到另外兩個節點(node6和node7)

1)在另外兩個節點上分別建立/opt/jars目錄

mkdir /opt/jars

2)將weblogs.jar分發到另外兩個節點

scp weblogs.jar node6:/opt/jars/
scp weblogs.jar node7:/opt/jars/

4.編寫執行模擬程式的shell指令碼

在node6和node7節點的/opt/datas目錄下,建立weblog-shell.sh指令碼。內容為

#/bin/bash
echo "start log......"
#第一個引數是原日誌檔案,第二個引數是日誌生成輸出檔案
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

修改weblog-shell.sh可執行許可權

chmod 777 weblog-shell.sh

5.編寫啟動flume服務程式的shell指令碼

在各節點的flume安裝目錄下編寫flume啟動指令碼flume-kfk-start.sh。

下面是node5中的配置寫法,node6與node7中將a1分別改為a2和a3。

#/bin/bash
echo "flume-1 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

6.編寫Kafka Consumer執行指令碼kfk-test-consumer.sh。

#/bin/bash
echo "kfk-kafka-consumer.sh start......"
bin/kafka-console-consumer.sh --zookeeper node5:2181,node6:2181,node7:2181 --from-beginning --topic weblogs

7.將kfk-test-consumer.sh指令碼分發另外兩個節點

scp kfk-test-consumer.sh node6:/opt/modules/kakfa_2.11-0.8.2.1/
scp kfk-test-consumer.sh node7:/opt/modules/kakfa_2.11-0.8.2.1/

8.啟動模擬程式並測試

在node6節點啟動日誌產生指令碼,模擬產生日誌是否正常。

/opt/datas/weblog-shell.sh

9.啟動資料採集所有服務

1)啟動Zookeeper服務

2)啟動hdfs服務

3)啟動HBase服務

建立hbase業務表

create 'weblogs','info'

4)啟動Kafka服務,並建立業務資料topic

bin/kafka-server-start.sh config/server.properties &

bin/kafka-topics.sh --create --zookeeper node5:2181,node6:2181,node7:2181 --topic weblogs --partitions 1 --replication-factor 3

10.完成資料採集全流程測試

1)在node5節點上啟動flume聚合指令碼,將採集的資料分發到Kafka叢集和hbase叢集。

./flume-kfk-start.sh

2)在node6和node7節點上完成資料採集。

(1)使用shell指令碼模擬日誌產生

cd /opt/datas/
./weblog-shell.sh

(2)啟動flume採集日誌資料傳送給聚合節點

./flume-kfk-start.sh

3)啟動Kafka Consumer檢視flume日誌採集情況

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning

4)檢視hbase資料寫入情況

./hbase-shell
count 'weblogs'

8.MySQL安裝

參考部落格

9.Hive與HBase整合進行資料分析

1. Hive安裝(本地mysql模式),參考部落格 2.Hive與HBase整合 1)在hive-site.xml檔案中配置Zookeeper,hive通過這個引數去連線HBase叢集。
<property>
    <name>hbase.zookeeper.quorum</name>   
<value>node5,node6,node7</value>
</property>
2)將hbase的9個包拷貝到hive/lib目錄下。如果是CDH版本,已經整合好不需要導包。
export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0
export HIVE_HOME=/opt/modules/hive-0.13.1/lib
ln -s $HBASE_HOME/lib/hbase-server-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-server-0.98.6-cdh5.3.0.jar
ln -s $HBASE_HOME/lib/hbase-client-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-client-0.98.6-cdh5.3.0.jar
ln -s $HBASE_HOME/lib/hbase-protocol-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-protocol-0.98.6-cdh5.3.0.jar 
ln -s $HBASE_HOME/lib/hbase-it-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-it-0.98.6-cdh5.3.0.jar 
ln -s $HBASE_HOME/lib/htrace-core-2.04.jar$HIVE_HOME/lib/htrace-core-2.04.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compact-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-hadoop2-compact-0.98.6-cdh5.3.0.jar 
ln -s $HBASE_HOME/lib/hbase-hadoop-compact-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-hadoop-compact-0.98.6-cdh5.3.0.jar 
ln -s $HBASE_HOME/lib/high-scale-lib-1.1.1.jar $HIVE_HOME/lib/high-scale-lib-1.1.1.jar 
ln -s $HBASE_HOME/lib/hbase-common-0.98.6-cdh5.3.0.jar $HIVE_HOME/lib/hbase-common-0.98.6-cdh5.3.0.jar 
3)在hive中建立與hbase整合的外部表
  1. CREATE EXTERNAL TABLE weblogs(
  2. id string,
  3. datetime string,
  4. userid string,
  5. searchname string,
  6. retorder string,
  7. cliorder string,
  8. cliurl string
  9. )
  10. STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  11. WITH SERDEPROPERTIES("hbase.columns.mapping"=
  12. ":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl")
  13. TBLPROPERTIES("hbase.table.name"="weblogs");
  14. #檢視hbase資料記錄
  15. select count(*) from weblogs;
4)hive中beeline和hiveserver2的使用
# 啟動hiveserver2
bin/hiveserver2
# 啟動beeline
bin/beeline
# 連線hive2服務
!connect jdbc:hive2//node7.kfk.com:10000
# 查看錶
show tables;
# 檢視前10條資料
select * from weblogs limit 10;

10.Cloudera HUE大資料視覺化分析

參考部落格

11.Spark2.X環境準備、編譯部署及執行

參考部落格,搭建spark叢集參考部落格

12.Spark SQL快速離線資料分析

1.Spark SQL 與Hive整合(spark-shell),參考部落格
2.Spark SQL 與Hive整合(spark-sql),參考部落格
3.Spark SQL之ThriftServer和beeline使用,參考部落格 4. Spark SQL與MySQL整合,參考部落格 5.Spark SQL與HBase整合,參考部落格

13.Structured Streaming業務資料實時分析

1. Structured Streaming與kafka整合

1)Structured Streaming是Spark2.2.0新推出的,要求kafka的版本0.10.0及以上。整合時需將如下的包拷貝到Spark的jar包目錄下。

kafka_2.11-0.10.1.0.jar
kafka-clients-0.10.1.0.jar
spark-sql-kafka-0-10_2.11-2.2.0.jar
spark-streaming-kafka-0-10_2.11-2.1.0.jar

2)與kafka整合程式碼

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node5:9092")
      .option("subscribe", "weblogs")
      .load()

import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]

2.Structured Streaming與MySQL整合

1)mysql建立相應的資料庫和資料表,用於接收資料

  1. create database test;
  2. use test;
  3. CREATE TABLE `webCount` (
  4. `titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  5. `count` int(11) DEFAULT NULL
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2)與mysql整合程式碼

val url ="jdbc:mysql://node5:3306/test"
    val username="root"
    val password="1234"

    val writer = new JDBCSink(url,username,password)
    val query = titleCount.writeStream
      .foreach(writer)      
      .outputMode("update")
      .trigger(ProcessingTime("5 seconds"))
      .start()

3.Structured Streaming向mysql資料庫寫入中文亂碼解決

修改資料庫檔案my.cnf(linux下)

[client]
socket=/var/lib/mysql/mysql.sock    //新增
default-character-set=utf8          //新增
[mysqld]
character-set-server=utf8           //新增
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

14.大資料Web視覺化分析系統開發

1.基於業務需求的WEB系統設計(具體參照程式碼)

2.基於Echart框架的頁面展示層開發

1)echart、JQuery下載

2)頁面效果圖選取及程式碼實現

3.工程編譯並打包釋出

參照之前將的idea打包方式,將spark web專案打包釋出。

4.啟動各個服務

1)啟動zookeeper: zkServer.sh start

2)啟動hadoop: start-all.sh

3)啟動hbase: start-hbase

4)啟動mysql: service mysqld start

5)node6(node7)啟動flume:flume-kfk-start.sh,將資料傳送到node5中

6)node5啟動flume:flume-kfk-start.sh,將資料分別傳到hbase和kafka中

7)啟動kafka-0.10(最好三臺都啟動,不然易出錯):

bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 & 

8)啟動node6(node7)中的指令碼:weblog-shell.sh

9)啟動 StructuredStreamingKafka來從kafka中取得資料,處理後存到mysql中

10)啟動web專案(sparkStu),該專案會從mysql資料庫中讀取資料展示到頁面

5. 最終專案執行效果