大資料專案實戰之新聞話題的實時統計分析
摘要: 本文講解一個完整的企業級大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用酷炫的前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。
前言:本文是一個完整的大資料專案實戰,實時|離線統計分析使用者的搜尋話題,並用酷炫的前端介面展示出來。這些指標對網站的精準營銷、運營都有極大幫助。架構大致是按照企業標準來的,從日誌的採集、轉化處理、實時計算、JAVA後臺開發、WEB前端展示,一條完整流程線下來,甚至每個節點都用的高可用架構,都考慮了故障轉移和容錯性。所用到的框架包括:Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Structured Streaming )+Hue+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的語言包括:JAVA、Scala、Shell。
由於本文並非零基礎教學,所以只講架構和流程,基礎性知識自行查缺補漏。Github已經上傳完整專案程式碼
最終效果圖如下:
專案架構圖如下:
環境準備
**
模擬網站實時產生日誌資訊
**
-
獲取資料來源,本文是利用搜狗的資料:搜狗實驗室
-
編寫java類模擬實時採集網站日誌。主要利用Java中的輸入輸出流。寫好後打成jar包傳到伺服器上
public class ReadWebLog {
private static String readFileName; private static String writeFileName; public static void main(String args[]) { readFileName = args[0]; writeFileName = args[1]; readFile(readFileName); } public static void readFile(String fileName) { try { FileInputStream fis = new FileInputStream(fileName); InputStreamReader isr = new InputStreamReader(fis, "GBK"); //以上兩步已經可以從檔案中讀取到一個字元了,但每次只讀取一個字元不能滿足大資料的需求。故需使用BufferedReader,它具有緩衝的作用,可以一次讀取多個字元 BufferedReader br = new BufferedReader(isr); int count = 0; while (br.readLine() != null) { String line = br.readLine(); count++; // 顯示行號 Thread.sleep(300); String str = new String(line.getBytes("UTF8"), "GBK"); System.out.println("row:" + count + ">>>>>>>>" + line); writeFile(writeFileName, line); } isr.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void writeFile(String fileName, String conent) { try { FileOutputStream fos = new FileOutputStream(fileName, true); OutputStreamWriter osw = new OutputStreamWriter(fos); BufferedWriter bw = new BufferedWriter(osw); bw.write("\n"); bw.write(conent); bw.close(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
}
- 編寫採集日誌的shell指令碼
vim weblog.sh
#/bin/bash
echo "start log"
java -jar /home/weblog.jar /usr/local/weblog.log /home/weblogs.log
- 執行效果圖
Flume Agent2採集日誌資訊
主要通過設定Source、Channel、Sink來完成日誌採集。
配置flume配置檔案 vim agent2.conf
a2.sources = r2 a2.channels = c2 a2.sinks = k2 a2.sources.r2.type = exec #來源於weblogs.log檔案 a2.sources.r2.command = tail -F /home/weblogs.log a2.sources.r2.channels = c2 a2.channels.c2.type = memory a2.channels.c2.capacity = 10000 a2.channels.c2.transactionCapacity = 100 a2.channels.c2.keep-alive = 10 a2.sinks.k2.type = avro a2.sinks.k2.channel = c2 落地點是master機器的5555埠(主機名和埠號都必須與master機器的flume配置保持一致) a2.sinks.k2.hostname = master a2.sinks.k2.port = 5555
- 編寫shell指令碼,方便執行。
vim flume.sh
#/bin/bash
echo “flume agent2 start”
bin/flume-ng agent --conf conf --name a2 --conf-file conf/agent2.conf -Dflume.root.logger=INFO,console
- 執行的時候直接
./flume.sh
即可
Flume Agent3採集日誌資訊
各方面配置都和Agent2完全一樣、省略。
Flume Agent1整合日誌資訊
- vim agent1.conf
#Flume Agent1實時整合日誌資訊
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS
flume + hbase
a1.sources.r1.type = avro
a1.sources.r1.channels = kafkaC hbaseC
a1.sources.r1.bind = master
a1.sources.r1.port = 5555
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC
flume + kafka
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
-
vim flume.sh
#/bin/bash
echo “flume agent1 start”
bin/flume-ng agent --conf conf --name a1 --conf-file conf/agent1.conf -Dflume.root.logger=INFO,console
具體講解如下:
Flume與Hbase的整合
- 通過檢視官方文件可知,Flume與Hbase的整合主要需要如下引數,表名、列簇名、以及Java類SimpleAsyncHbaseEventSerializer。
- 改寫SimpleAsyncHbaseEventSerializer
下載Flume原始碼,需要改寫如下兩個Java類.
- 打成jar包,上傳到linux伺服器中替換原有flume目錄的該jar包
-
Flume配置檔案配置Sink為Hbase
a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC
Flume與Kafka的整合
- Flume配置檔案:主要配置topic、brokerlist:
a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
-
編寫kafka消費端指令碼,消費從flume傳過來的資訊。
vim flume.sh
#/bin/bash
echo “flume agent1 start”
bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic weblogs --from-beginning
執行效果圖
Kafka與Spark整合完成資料實時處理
這裡我選擇的是2.2版本中的StructuredStreaming,因為它相比SparkStreaming而言有很多優勢,它的出現重點就是解決端到端的精確一次語義,保證資料的不丟失不重複,這對於流式計算極為重要。StructuredStreaming的輸入源為kafka,spark對來自kafka的資料進行計算,主要就是累加話題量和訪問量。具體程式碼參考github。
val spark = SparkSession.builder()
.master("local[2]")
.appName("streaming").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "master:9092")
.option("subscribe", "weblogs")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val weblog = lines.map(_.split(",")).map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
val titleCount = weblog.groupBy("searchname").count().toDF("titleName", "webcount")
Spark與Mysql整合
這裡選擇Mysql是因為,我們的需求只是報表展示,需要在前臺展示的欄位並不多,關係型資料庫完全能夠支撐。在Hbase裡有幾百萬條資料(一個瀏覽話題可能有十幾萬人搜尋過,也就是說一個話題就有十幾萬條資料,這麼大量資料當然要存在Hbase中),而經過spark的計算,這十幾萬條資料在mysql中就變成了一條資料(XXX話題,XXX瀏覽量)。
如果業務需求變了,我需要實時查詢使用者各種資訊(資料量很大,欄位很多),那麼當然就是實時的直接從Hbase裡查,而不會在Mysql中。
所以企業中要根據不同的業務需求,充分考慮資料量等問題,進行架構的選擇。
val url = "jdbc:mysql://master:3306/weblog?useSSL=false"
val username = "root"
val password = "123456"
val writer = new JdbcSink(url, username, password)
val weblogcount = titleCount.writeStream
.foreach(writer)
.outputMode("update")
.start()
weblogcount.awaitTermination()
離線分析:HIVE整合HBASE。
我們知道Hive是一個數據倉庫,主要就是轉為MapReduce完成對大量資料的離線分析和決策。之前我們已經用Flume整合Hbase,使得Hbase能源源不斷的插入資料。那麼我們直接將HIVE整合HBase,這樣只要Hbase有資料了,那Hive表也就有資料了。怎麼整合呢?很簡單,用【外部表】就搞定了。
CREATE EXTERNAL TABLE `weblogs`(
`id` string COMMENT 'from deserializer',
`datatime` string COMMENT 'from deserializer',
`userid` string COMMENT 'from deserializer',
`searchname` string COMMENT 'from deserializer',
`retorder` string COMMENT 'from deserializer',
`cliorder` string COMMENT 'from deserializer',
`cliurl` string COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl',
'serialization.format'='1')
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='false',
'hbase.table.name'='weblogs',
'numFiles'='0',
'numRows'='-1',
'rawDataSize'='-1',
'totalSize'='0',
'transient_lastDdlTime'='1518778031')
驗證一下HBASE和HIVE是不是同步的:
好了現在我們可以在Hive中盡情的離線分析和決策了~~~
SpringMVC+Mybatis完成對mysql資料的查詢
個人覺得傳統JDBC實在是太笨重,還是最喜歡Spring整合Mybatis對資料庫進行操作。這裡主要完成的操作就是對mysql的資料進行查詢。詳情請參考github,地址文章開頭已給出。
WebSocket實現全雙工通訊
既然要實現客戶端實時接收伺服器端的訊息,而伺服器端又實時接收客戶端的訊息,必不可少的就是WebSocket了,WebSocket實現了瀏覽器與伺服器全雙工通訊(full-duple),能更好的節省伺服器資源和頻寬並達到實時通訊。WebSocket用HTTP握手之後,伺服器和瀏覽器就使用這條HTTP連結下的TCP連線來直接傳輸資料,拋棄了複雜的HTTP頭部和格式。一旦WebSocket通訊連線建立成功,就可以在全雙工模式下在客戶端和伺服器之間來回傳送WebSocket訊息。即在同一時間、任何方向,都可以全雙工傳送訊息。WebSocket 核心就是OnMessage、OnOpen、OnClose,本專案使用的是和Spring整合的方式,因此需要有configurator = SpringConfigurator.class。
@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)
public class WebSocket {
@Autowired
private WebLogService webLogService;
@OnMessage
public void onMessage(String message, Session session) throws IOException, InterruptedException {
String[] titleNames = new String[10];
Long[] titleCounts = new Long[10];
Long[] titleSum = new Long[1];
while (true) {
Map<String, Object> map = new HashMap<String, Object>();
List<WebLogBO> list = webLogService.webcount();
System.out.print(list);
for (int i = 0; i < list.size(); i++) {
titleNames[i] = list.get(i).getTitleName();
titleCounts[i] = list.get(i).getWebcount();
}
titleSum[0] = webLogService.websum();
map.put("titleName", titleNames);
map.put("titleCount", titleCounts);
map.put("titleSum", titleSum);
System.out.print(map);
session.getBasicRemote().sendText(JSON.toJSONString(map));
Thread.sleep(1000);
map.clear();
}
}
@OnOpen
public void onOpen() {
System.out.println("Client connected");
}
@OnClose
public void onClose() {
System.out.println("Connection closed");
}
}
Echarts完成前端介面展示
大家可以看到開頭給出的專案效果圖還是蠻漂亮的,其實非常簡單,就是用的Echarts這個框架。直接給它傳值就ok了,其他前端那些事它都給你搞定了。詳情請參考github,地址文章開頭已給出。
function webcount(json) {
var option = {
title: {
text: '搜狗新聞熱點實時統計',
subtext: '作者:劉彥伶'
},
tooltip: {
trigger: 'axis',
axisPointer: {
type: 'shadow'
}
},
legend: {
data: ['瀏覽量']
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: {
type: 'value',
boundaryGap: [0, 0.01]
},
yAxis: {
type: 'category',
data: json.titleName
},
series: [
{
name: '瀏覽量',
type: 'bar',
data: json.titleCount
},
]
};
countchart.setOption(option);
}
本文講解的比較粗糙,有很多細節的東西,畢竟一整個專案不可能用一篇文章說清楚。。。所以實踐的東西需要讀者自己去領悟,但是架構、環境搭建、方法、流程還是很有參考價值的!