Flume+Kafka+Storm+Redis構建大資料實時處理系統
資料處理方法分為離線處理和線上處理,今天寫到的就是基於Storm的線上處理。在下面給出的完整案例中,我們將會完成下面的幾項工作:
- 如何一步步構建我們的實時處理系統(Flume+Kafka+Storm+Redis)
- 實時處理網站的使用者訪問日誌,並統計出該網站的PV、UV
- 將實時分析出的PV、UV動態地展示在我們的前面頁面上
如果你對上面提及的大資料元件已經有所認識,或者對如何構建大資料實時處理系統感興趣,那麼就可以盡情閱讀下面的內容了。
需要注意的是,核心在於如何構建實時處理系統,而這裡給出的案例是實時統計某個網站的PV、UV,在實際中,基於每個人的工作環境不同,業務不同,因此業務系統的複雜度也不盡相同,相對來說,這裡統計PV、UV的業務是比較簡單的,但也足夠讓我們對大資料實時處理系統有一個基本的、清晰的瞭解與認識,是的,它不再那麼神祕了。
實時處理系統架構
我們的實時處理系統整體架構如下:
即從上面的架構中我們可以看出,其由下面的幾部分構成:
- Flume叢集
- Kafka叢集
- Storm叢集
從構建實時處理系統的角度出發,我們需要做的是讓資料在各個不同的集群系統之間打通(從上面的圖示中也能很好地說明這一點),即需要做各個系統之前的整合,包括Flume與Kafka的整合,Kafka與Storm的整合。當然,各個環境是否使用叢集,依個人的實際需要而定,在我們的環境中,Flume、Kafka、Storm都使用叢集。
Flume+Kafka整合
即從上面的架構中我們可以看出,其由下面的幾部分構成:
- Flume叢集
- Kafka叢集
- Storm叢集
從構建實時處理系統的角度出發,我們需要做的是讓資料在各個不同的集群系統之間打通(從上面的圖示中也能很好地說明這一點),即需要做各個系統之前的整合,包括Flume與Kafka的整合,Kafka與Storm的整合。當然,各個環境是否使用叢集,依個人的實際需要而定,在我們的環境中,Flume、Kafka、Storm都使用叢集。
Flume+Kafka整合
1整合思路
對於Flume而言,關鍵在於如何採集資料,並且將其傳送到Kafka上,並且由於我們這裡了使用Flume叢集的方式,Flume叢集的配置也是十分關鍵的。而對於Kafka,關鍵就是如何接收來自Flume的資料。從整體上講,邏輯應該是比較簡單的,在Kafka中建立一個用於我們實時處理系統的topic,然後Flume將其採集到的資料傳送到該topic上即可。
2整合過程
整合過程:Flume叢集配置與Kafka Topic建立。
Flume叢集配置
在我們的場景中,兩個Flume Agent分別部署在兩臺Web伺服器上,用來採集Web伺服器上的日誌資料,然後其資料的下沉方式都為傳送到另外一個Flume Agent上,所以這裡我們需要配置三個Flume Agent。
- Flume Agent01
該Flume Agent部署在一臺Web伺服器上,用來採集產生的Web日誌,然後傳送到Flume Consolidation Agent上,建立一個新的配置檔案flume-sink-avro.conf,其配置內容如下:
#########################################################
##
##主要作用是監聽檔案中的新增資料,採集到資料之後,輸出到avro
## 注意:Flume agent的執行,主要就是配置source channel sink
## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#對於source的配置描述 監聽檔案中的新增資料 exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log
#對於sink的配置描述 使用avro日誌做資料的消費
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = uplooking03
a1.sinks.k1.port = 44444
#對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
#通過channel c1將source r1和sink k1關聯起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置完成後, 啟動Flume Agent,即可對日誌檔案進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/ 2>&1 &
- Flume Agent02
- Flume Consolidation Agent
該Flume Agent用於接收其它兩個Agent傳送過來的資料,然後將其傳送到Kafka上,建立一個新的配置檔案flume-source_avro-sink_kafka.conf,配置內容如下:
##主要作用是監聽目錄中的新增檔案,採集到資料之後,輸出到kafka
#對於source的配置描述 監聽avro
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
#對於sink的配置描述 使用kafka做資料的消費
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = f-k-s
a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
#對於channel的配置描述 使用記憶體緩衝區域做資料的臨時快取
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
配置完成後, 啟動Flume Agent,即可對avro的資料進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/ 2>&1 &
Kafka配置
在我們的Kafka中,先建立一個topic,用於後面接收Flume採集過來的資料:
kafka-topics.sh --create --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
3整合驗證
啟動Kafka的消費指令碼:
$ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
如果在Web伺服器上有新增的日誌資料,就會被我們的Flume程式監聽到,並且最終會傳輸到到Kafka的f-k-stopic中,這裡作為驗證,我們上面啟動的是一個Kafka終端消費的指令碼,這時會在終端中看到資料的輸出:
這樣的話,我們的整合就沒有問題,當然Kafka中的資料應該是由我們的Storm來進行消費的,這裡只是作為整合的一個測試,下面就會來做Kafka+Storm的整合。
Kafka+Storm整合
Kafka和Storm的整合其實在Storm的官網上也有非常詳細清晰的文件:
http://storm.apache.org/releases/1.0.6/storm-kafka.html
想對其有更多瞭解的同學可以參考一下。
1整合思路
在這次的大資料實時處理系統的構建中,Kafka相當於是作為訊息佇列(或者說是訊息中介軟體)的角色,其產生的訊息需要有消費者去消費,所以Kafka與Storm的整合,關鍵在於我們的Storm如何去消費Kafka訊息topic中的訊息(Kafka訊息topic中的訊息正是由Flume採集而來,現在我們需要在Storm中對其進行消費)。
在Storm中,topology是非常關鍵的概念。
對比MapReduce,在MapReduce中,我們提交的作業稱為一個Job,在一個Job中,又包含若干個Mapper和Reducer,正是在Mapper和Reducer中有我們對資料的處理邏輯:
在Storm中,我們提交的一個作業稱為topology,其又包含了spout和bolt,在Storm中,對資料的處理邏輯正是在spout和bolt中體現:
即在spout中,正是我們資料的來源,又因為其實時的特性,所以可以把它比作一個“水龍頭”,表示其源源不斷地產生資料:
所以,問題的關鍵是spout如何去獲取來自Kafka的資料?
好在,Storm-Kafka的整合庫中提供了這樣的API來供我們進行操作。
2整合過程
整合過程應用了KafkaSpout。在程式碼的邏輯中只需要建立一個由Storm-KafkaAPI提供的KafkaSpout物件即可:
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);return new KafkaSpout(spoutConf);
下面給出完整的整合程式碼:
←
package cn.xpleaf.bigdata.storm.statics;
import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* Kafka和storm的整合,用於統計實時流量對應的pv和uv
*/public class KafkaStormTopology {
// static class MyKafkaBolt extends BaseRichBolt {
static class MyKafkaBolt extends BaseBasicBolt {
/**
* kafkaSpout傳送的欄位名為bytes
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
byte binary = input.getBinary(0); // 跨jvm傳輸資料,接收到的是位元組資料
// byte bytes = input.getBinaryByField("bytes"); // 這種方式也行
String line = new String(binary);
System.out.println(line);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder;
/**
* 設定spout和bolt的dag(有向無環圖)
*/
KafkaSpout kafkaSpout = createKafkaSpout;
builder.setSpout("id_kafka_spout", kafkaSpout);
builder.setBolt("id_kafka_bolt", new MyKafkaBolt)
.shuffleGrouping("id_kafka_spout"); // 通過不同的資料流轉方式,來指定資料的上游元件
// 使用builder構建topology
StormTopology topology = builder.createTopology;
String topologyName = KafkaStormTopology.class.getSimpleName; // 拓撲的名稱
Config config = new Config; // Config物件繼承自HashMap,但本身封裝了一些基本的配置
// 啟動topology,本地啟動使用LocalCluster,叢集啟動使用StormSubmitter
if (args == || args.length < 1) { // 沒有引數時使用本地模式,有引數時使用叢集模式
LocalCluster localCluster = new LocalCluster; // 本地開發模式,建立的物件為LocalCluster
localCluster.submitTopology(topologyName, config, topology);
} else {
StormSubmitter.submitTopology(topologyName, config, topology);
* BrokerHosts hosts kafka叢集列表
* String topic 要消費的topic主題
* String zkRoot kafka在zk中的目錄(會在該節點目錄下記錄讀取kafka訊息的偏移量)
* String id 當前操作的標識id
*/
private static KafkaSpout createKafkaSpout {
String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
BrokerHosts hosts = new ZkHosts(brokerZkStr); // 通過zookeeper中的/brokers即可找到kafka的地址
String topic = "f-k-s";
String zkRoot = "/" + topic;
String id = "consumer-id";
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
// 本地環境設定之後,也可以在zk中建立/f-k-s節點,在叢集環境中,不用配置也可以在zk中建立/f-k-s節點
//spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"});
//spoutConf.zkPort = 2181;
spoutConf.startOffsetTime = OffsetRequest.LatestTime; // 設定之後,剛啟動時就不會把之前的消費也進行讀取,會從最新的偏移量開始讀取
return new KafkaSpout(spoutConf);
(上下滑動檢視完整程式碼)
其實程式碼的邏輯非常簡單,我們只建立了 一個由Storm-Kafka提供的KafkaSpout物件和一個包含我們處理邏輯的MyKafkaBolt物件,MyKafkaBolt的邏輯也很簡單,就是把Kafka的訊息列印到控制檯上。
需要注意的是,後面我們分析網站PV、UV的工作,正是在上面這部分簡單的程式碼中完成的,所以其是非常重要的基礎。
3整合驗證
上面的整合程式碼,可以在本地環境中執行,也可以將其打包成jar包上傳到我們的Storm叢集中並提交業務來執行。如果Web伺服器能夠產生日誌,並且前面Flume+Kafka的整合也沒有問題的話,將會有下面的效果。
如果是在本地環境中執行上面的程式碼,那麼可以在控制檯中看到日誌資料的輸出:
如果是在Storm叢集中提交的作業執行,那麼也可以在Storm的日誌中看到Web伺服器產生的日誌資料:
這樣的話就完成了Kafka+Storm的整合。
Storm+Redis整合
1整合思路
其實所謂Storm和Redis的整合,指的是在我們的實時處理系統中的資料的落地方式,即在Storm中包含了我們處理資料的邏輯,而資料處理完畢後,產生的資料處理結果該儲存到什麼地方呢?顯然就有很多種方式了,關係型資料庫、NoSQL、HDFS、HBase等,這應該取決於具體的業務和資料量,在這裡,我們使用Redis來進行最後分析資料的儲存。
所以實際上做這一步的整合,其實就是開始寫我們的業務處理程式碼了,因為通過前面Flume-Kafka-Storm的整合,已經打通了整個資料的流通路徑,接下來關鍵要做的是,在Storm中,如何處理我們的資料並儲存到Redis中。
而在Storm中,spout已經不需要我們來寫了(由Storm-Kafka的API提供了KafkaSpout物件),所以問題就變成,如何根據業務編寫分析處理資料的bolt。
2整合過程
整合過程:編寫Storm業務處理Bolt。
日誌分析
我們實時獲取的日誌格式如下:
其中需要說明的是第二個欄位和第三個欄位,因為它對我們統計PV和UV非常有幫助,它們分別是ip欄位和mid欄位,說明如下:
- ip:使用者的IP地址
- mid:唯一的id,此id第一次會種在瀏覽器的cookie裡。如果存在則不再種。作為瀏覽器唯一標示。移動端或者pad直接取機器碼。
因此,根據IP地址,我們可以通過查詢得到其所在的省份,並且建立一個屬於該省份的變數,用於記錄pv數,每來一條屬於該省份的日誌記錄,則該省份的PV就加1,以此來完成pv的統計。
而對於mid,我們則可以建立屬於該省的一個set集合,每來一條屬於該省份的日誌記錄,則可以將該mid新增到set集合中,因為set集合存放的是不重複的資料,這樣就可以幫我們自動過濾掉重複的mid,根據set集合的大小,就可以統計出UV。
在我們storm的業務處理程式碼中,我們需要編寫兩個bolt:
- 第一個bolt用來對資料進行預處理,也就是提取我們需要的ip和mid,並且根據IP查詢得到省份資訊;
- 第二個bolt用來統計PV、UV,並定時將PV、UV資料寫入到Redis中。
當然上面只是說明了整體的思路,實際上還有很多需要注意的細節問題和技巧問題,這都在我們的程式碼中進行體現,我在後面寫的程式碼中都加了非常詳細的註釋進行說明。
編寫第一個Bolt:ConvertIPBolt
根據上面的分析,編寫用於資料預處理的bolt,程式碼如下:
package cn.xpleaf.bigdata.storm.statistic;
import cn.xpleaf.bigdata.storm.utils.JedisUtil;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;
/**
* 日誌資料預處理Bolt,實現功能:
* 1.提取實現業務需求所需要的資訊:ip地址、客戶端唯一標識mid
* 2.查詢IP地址所屬地,併發送到下一個Bolt
*/public class ConvertIPBolt extends BaseBasicBolt {
byte binary = input.getBinary(0);
String line = new String(binary);
String fields = line.split(" ");
if(fields == || fields.length < 10) {
return;
}
// 獲取ip和mid
String ip = fields[1];
String mid = fields[2];
// 根據ip獲取其所屬地(省份)
String province = ;
if (ip != ) {
Jedis jedis = JedisUtil.getJedis;
province = jedis.hget("ip_info_en", ip);
// 需要釋放jedis的資源,否則會報can not get resource from the pool
JedisUtil.returnJedis(jedis);
}
// 傳送資料到下一個bolt,只發送實現業務功能需要的province和mid
collector.emit(new Values(province, mid));
* 定義了傳送到下一個bolt的資料包含兩個域:province和mid
declarer.declare(new Fields("province", "mid"));
編寫第二個Bolt:StatisticBolt
這個bolt包含我們統計網站PV、UV的程式碼邏輯,因此非常重要,其程式碼如下:
import org.apache.storm.Constants;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 日誌資料統計Bolt,實現功能:
* 1.統計各省份的PV、UV
* 2.以天為單位,將省份對應的PV、UV資訊寫入Redis
*/public class StatisticBolt extends BaseBasicBolt {
Map<String, Integer> pvMap = new HashMap<>;
Map<String, HashSet<String>> midsMap = ;
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
if (!input.getSourceComponent.equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) { // 如果收到非系統級別的tuple,統計資訊到區域性變數mids
String province = input.getStringByField("province");
String mid = input.getStringByField("mid");
pvMap.put(province, pvMap.get(province) + 1); // pv+1
if(mid != ) {
midsMap.get(province).add(mid); // 將mid新增到該省份所對應的set中
}
} else { // 如果收到系統級別的tuple,則將資料更新到Redis中,釋放JVM堆記憶體空間
/*
* 以 廣東 為例,其在Redis中儲存的資料格式如下:
* guangdong_pv(Redis資料結構為hash)
* --20180415
* --pv數
* --20180416
* --pv數
* guangdong_mids_20180415(Redis資料結構為set)
* --mid
* ......
String dateStr = sdf.format(new Date);
// 更新pvMap資料到Redis中
String pvKey = ;
for(String province : pvMap.keySet) {
int currentPv = pvMap.get(province);
if(currentPv > 0) { // 當前map中的pv大於0才更新,否則沒有意義
pvKey = province + "_pv";
String oldPvStr = jedis.hget(pvKey, dateStr);
if(oldPvStr == ) {
oldPvStr = "0";
}
Long oldPv = Long.valueOf(oldPvStr);
jedis.hset(pvKey, dateStr, oldPv + currentPv + "");
pvMap.replace(province, 0); // 將該省的pv重新設定為0
// 更新midsMap到Redis中
String midsKey = ;
HashSet<String> midsSet = ;
for(String province: midsMap.keySet) {
midsSet = midsMap.get(province);
if(midsSet.size > 0) { // 當前省份的set的大小大於0才更新到,否則沒有意義
midsKey = province + "_mids_" + dateStr;
jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()]));
midsSet.clear;
// 釋放jedis資源
JedisUtil.returnJedis(jedis);
System.out.println(System.currentTimeMillis + "------->寫入資料到Redis");
* 設定定時任務,只對當前bolt有效,系統會定時向StatisticBolt傳送一個系統級別的tuple
public Map<String, Object> getComponentConfiguration {
Map<String, Object> config = new HashMap<>;
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return config;
* 初始化各個省份的pv和mids資訊(用來臨時儲存統計pv和uv需要的資料)
*/
public StatisticBolt {
pvMap = new HashMap<>;
midsMap = new HashMap<String, HashSet<String>>;
String provinceArray = {"shanxi", "jilin", "hunan", "hainan", "xinjiang", "hubei", "zhejiang", "tianjin", "shanghai",
"anhui", "guizhou", "fujian", "jiangsu", "heilongjiang", "aomen", "beijing", "shaanxi", "chongqing",
"jiangxi", "guangxi", "gansu", "guangdong", "yunnan", "sicuan", "qinghai", "xianggang", "taiwan",
"neimenggu", "henan", "shandong", "shanghai", "hebei", "liaoning", "xizang"};
for(String province : provinceArray) {
pvMap.put(province, 0);
midsMap.put(province, new HashSet);
(上下滑動可檢視完整程式碼)
編寫Topology
我們需要編寫一個topology用來組織前面編寫的Bolt,程式碼如下:
* 構建topology
*/public class StatisticTopology {
builder.setBolt("id_convertIp_bolt", new ConvertIPBolt).shuffleGrouping("id_kafka_spout"); // 通過不同的資料流轉方式,來指定資料的上游元件
builder.setBolt("id_statistic_bolt", new StatisticBolt).shuffleGrouping("id_convertIp_bolt"); // 通過不同的資料流轉方式,來指定資料的上游元件
將上面的程式打包成jar包,並上傳到我們的叢集提交業務後,如果前面的整合沒有問題,並且Web服務也有Web日誌產生,那麼一段時間後,我們就可以在Redis資料庫中看到資料的最終處理結果,即各個省份的UV和PV資訊:
需要說明的是mid資訊是一個set集合,只要求出該set集合的大小,也就可以求出UV值。
至此,準確來說,我們的統計PV、UV的大資料實時處理系統是構建完成了,處理的資料結果的用途根據不同的業務需求而不同,但是對於網站的PV、UV資料來說,是非常適合用作視覺化處理的,即用網頁動態將資料展示出來,我們下一步正是要構建一個簡單的Web應用將PV、UV資料動態展示出來。
資料視覺化處理
資料視覺化處理目前我們需要完成兩部分的工作:
- 開發一個Web專案,能夠查詢Redis中的資料,同時提供訪問的頁面
- 自行開發或找一個符合我們需求的前端UI,將Web專案中查詢到的資料展示出來
對於Web專案的開發,因個人的技術棧能力而異,選擇的語言和技術也有所不同,只要能夠達到我們最終資料視覺化的目的,其實都行的。這個專案中我們要展示的是PV和UV資料,難度不大,因此可以選擇Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我個人非常喜歡,因為開發非常快,但因為前面一直用的是Java,因此這裡我還是選擇使用SpringMVC來完成。
至於UI這一塊,我前端能力一般,普通的開發沒有問題,但是要做出像上面這種地圖型別的UI介面來展示資料的話,確實有點無能為力。好在現在第三方的UI框架比較多,對於圖表類展示的,比如就有highcharts和echarts,其中echarts是百度開源的,有豐富的中文文件,非常容易上手,所以這裡我選擇使用echarts來作為UI,並且其剛好就有能夠滿足我們需求的地圖類的UI元件。
因為難度不大,具體的開發流程的這裡就不提及了,有興趣的同學可以直接參考後面我提供的原始碼,這裡我們就直接來看一下效果好了。
因為實際上在本次專案案例中,這一塊的程式碼也是非常少的,使用SpringMVC開發的話,只要把JavaEE三層構架搭起來了,把依賴引入好了,後面的開發確實不難的;而如果有同學會Flask或者Django的話,其專案本身的構建和程式碼上也都會更容易。
啟動我們的Web專案後,輸入地址就可以訪問到資料的展示介面了:
可以看到,echarts的這個UI還是比較好看的,而且也真的能夠滿足我們的需求。每個省份上的兩個不同顏色的點表示目前我們需要展示的資料有兩種,分別為PV和UV,在左上角也有體現,而顏色的深淺就可以體現PV或者UV的數量大小關係了。
在這個介面上,點選左上角的UV,表示不檢視UV的資料,這樣我們就會只看到PV的情況:
當然,也可以只檢視UV的情況:
當滑鼠停留在某個省份上時,就可以檢視這個省份具體的PV或UV值,比如下面我們把滑鼠停留在“廣東”上時,就可以看到其此時的PV值為170,檢視其它省份的也是如此:
那麼資料是可以查看了,又怎麼體現動態呢?
對於頁面資料的動態重新整理有兩種方案,一種是定時重新整理頁面,另外一種則是定時向後端非同步請求資料。
目前我採用的是第一種,頁面定時重新整理,有興趣的同學也可以嘗試使用第二種方法,只需要在後端開發相關的返回JSON資料的API即可。
總結
那麼至此,從整個大資料實時處理系統的構建到最後的資料視覺化處理工作,我們都已經完成了,可以看到整個過程下來涉及到的知識層面還是比較多的,不過我個人覺得,只要把核心的原理牢牢掌握了,對於大部分情況而言,環境的搭建以及基於業務的開發都能夠很好地解決。