使用Apache NiFi和Apache Kafka進行實時庫存處理
使用Apache NiFi和Apache Kafka實現從REST到Hive的流式使用案例
第1部分
使用Apache Kafka 2.0和Apache NiFi 1.8,有許多新功能和新功能即將推出。是時候對它們進行測試了。
因此,為了規劃我們將要做的事情,我有一個高階架構圖。我們將提取許多來源,包括REST提要,社交資源,訊息,影象,文件和關係資料。
我們將使用NiFi進行攝取,然後對其進行過濾,處理並將其細分為Kafka主題。Kafka資料將採用Apache Avro格式,並在Hortonworks Schema Registry中指定模式。Spark和NiFi將進行額外的事件處理以及機器學習和深度學習。
我們還將通過Kafka和NiFi向用戶推送清理和彙總的資料。我們將推送到Dockerized應用程式,訊息監聽器,Web客戶端,Slack通道和電子郵件列表。
為了在我們的企業中發揮作用,我們將通過Apache Ranger,Apache Atlas和Apache NiFi進行完全授權,身份驗證,審計,資料加密和資料沿襲。NiFi Registry和GitHub將用於原始碼控制。
我們將通過Apache Ambari提供管理功能。
示例伺服器佈局:
NiFi流量
IEX提供實時免費庫存資料,無需許可證金鑰。資料流速度非常快,幸好Apache NiFi和Kafka沒有問題。
從主題中獲取不同的記錄並在單獨的目錄和表中儲存到HDFS。
讓我們將一個大的REST檔案拆分成感興趣的單個記錄。我們的REST源包含引號,圖表和新聞陣列。
讓我們把一些訊息推到Slack
我們可以輕鬆地使用Apache NiFi中的多個主題。
由於我們有模式,因此在運動時查詢資料很容易
我們為每個Kafka主題建立模式。
我們可以監控在Ambari中通過Kafka傳遞的所有這些訊息(以及在Hortonworks SMM中更詳細的資訊)。
我讀入資料然後可以將它推送到Kafka 1.0和2.0經紀人。
一旦傳送資料,NiFi就會讓我們知道。
使用的專案
Apache Kafka
Apache NiFi
阿帕奇德魯伊
Kafka上的Apache Hive
Apache Hive on Druid
JDBC上的Apache Hive
Apache Zeppelin
NLP - Apache OpenNLP和Stanford CoreNLP
Horotnworks Schema Registry
NiFi登錄檔
Apache Ambari
日誌搜尋
Hortonworks SMM
Hortonworks資料平面服務(DPS)
來源
休息
水槽
Apache Hadoop HDFS
Apache Kafka
Apache Hive
鬆弛
S3
阿帕奇德魯伊
Apache HBase
話題
iextradingnews
iextradingquote
iextradingchart
個股
網路
HDFS目錄
hdfs dfs -mkdir -p / iextradingnews
hdfs dfs -mkdir -p / iextradingquote
hdfs dfs -mkdir -p / iextradingchart
hdfs dfs -mkdir -p / stocks
hdfs dfs -mkdir -p / cyber
hdfs dfs -chmod -R 777 /
PutHDFS
/${kafka.topic}
/iextradingchart/859496561256574.orc
/iextradingnews/855935960267509.orc
/iextradingquote/859143934804532.orc
蜂巢表
CREATE EXTERNAL TABLE如果不是 EXISTS iextradingchart(`date` STRING,開啟DOUBLE,高DOUBLE,低DOUBLE,關閉DOUBLE,音量INT,unadjustedVolume INT,更改DOUBLE,changePercent DOUBLE,vwap DOUBLE,標籤STRING,changeOverTime INT)
STORED AS ORC
LOCATION '/ iextradingchart' ;
CREATE EXTERNAL TABLE如果不是 EXISTS iextradingquote(符號STRING,companyName STRING,primaryExchange STRING,扇區STRING,calculationPrice STRING,開啟DOUBLE,openTime BIGINT,關閉DOUBLE,closeTime BIGINT,高DOUBLE,低DOUBLE,latestPrice DOUBLE,latestSource STRING,latestTime STRING, latestUpdate BIGINT,latestVolume INT,iexRealtimePrice DOUBLE,iexRealtimeSize INT,iexLastUpdated BIGINT,delayedPrice DOUBLE,delayedPriceTime BIGINT,extendedPrice DOUBLE,extendedChange DOUBLE,extendedChangePercent DOUBLE,extendedPriceTime BIGINT,previousClose DOUBLE,change DOUBLE,changePercent DOUBLE,iexMarketPercent DOUBLE,iexVolume INT,avgTotalVolume INT,iexBidPrice INT,iexBidSize INT,iexAskPrice INT,iexAskSize INT,marketCap INT,peRatio 雙倍,第52周高雙,第52周低DOUBLE,ytdChange DOUBLE)
STORED AS ORC
LOCATION '/ iextradingquote' ;
建立外部表如果不存在 iextradingnews(`datetime` STRING,標題STRING,源STRING,url STRING,彙總STRING,相關STRING,影象STRING)
STORED AS ORC
LOCATION '/ iextradingnews' ;
架構
{ “type”:“record”,“name”:“iextradingchart”,“fields”:[{ “name”:“date”, “type”:[ “string”, “null” ]},{ “name”:“open”, “type”:[ “double”, “null” ]},{ “name”:“high”, “type”:[ “double”, “null” ]},{ “name”:“low”, “type”:[ “double”, “null” ]},{ “name”:“close”, “type”:[ “double”, “null” ]},{ “name”:“volume”, “type”:[ “int”, “null” ]},{ “name”:“unadjustedVolume”, “type”:[ “int”, “null” ]},{ “name”:“更改“ , ”型別“:[ ”double“, ”null“ ]},{ “name”:“changePercent”, “type”:[ “double”, “null” ]},{ “name”:“vwap”, “type”:[ “double”, “null” ]},{ “name “:”label“, ”type“:[ ”string“, ”null“ ]},{ ”name“:”changeOverTime“, ”type“:[ ”int“, “null” ]}]} { “type”:“record”,“name”:“iextradingquote”,“fields”:[{ “name”:“symbol”, “type”:[ “string”, “null” ], “doc”:“從'\”HDP推斷的型別\“'” } ,{ “name”:“companyName”, “type”:[ “string”, “null” ], “doc”:“從'\”推斷的型別Hortonworks Inc. \“'” },{ “name”:“primaryExchange”, “type”:[ “string”, “null” ], “doc”:“從'\”推斷的型別“納斯達克全球選擇\”'“ },{ ”名稱“:”扇區“, ”型別“:[ ”字串“, ”空“ ], ”doc“:”型別從'\'Technology \“'” },{ “name”:“calculationPrice”, “type”:[ “string”, “null” ], “doc”:“從'\”中推斷型別關閉\“ ““ },{ ”name“:”open“, ”type“:[ ”double“, “null” ], “doc”:“從'16。''推斷型別' }},{ ”name“:”openTime“, ”type“:[ ”long“, ”null“ ], ”doc“:”從'1542033000568'“ },{ ”name“:”close“, ”type“:[ ”double“, ”null“ ], ”doc“:”從'15 .76'推斷的型別' }}, { “name”:“closeTime”, “type”:[ “long”, “null” ], “doc”:“從'1542056400520'推斷的型別' }},{ ”name“:”high“, ”type“:[ ”double“, ”null“ ], ”doc“:”從'16 .37'“ },{ ”name“:”low“, ”type“:[ ”double“, ”null“ ], ”doc“:”從'15。''推斷的型別' }},{ “name“ :”latestPrice“, ”type“:[ ”double“, “null” ], “doc”:“從'15。76'推斷的型別” },{ “name”:“latestSource”, “type”:[ “string”, “null” ], “doc”:“從'\“關閉\”'“ },{ ”name“:”latestTime“, ”type“:[ ”string“, ”null“ ], ”doc“:”從'\“推斷的型別”2018年11月12日“'“ },{ ”name“:”latestUpdate“, ”type“:[ ”long“, “null” ], “doc”:“從'1542056400520'推斷的型別' }},{ ”name“:”latestVolume“, ”type“:[ ”int“, ”null“ ], ”doc“:”型別推斷來自'4012339'“ },{ ”name“:”iexRealtimePrice“, ”type“:[ ”double“, ”null“ ], ”doc“:”型別從'15 .74'推斷“ },{ “name”:“iexRealtimeSize”, “type”:[ “int”, “null” ], “doc”:“從'43'推斷的型別” },{ “name”:“iexLastUpdated”, “type”:[ “long”, “null” ], “doc”:“型別推斷自'1542056397411'” },{ “name”:“delayedPrice”, “type”:[ “double”, “null” ], “doc”:“型別推斷自'15.76'“ },{ ”name“:”delayedPriceTime“, ”type“:[ “long”, “null” ], “doc”:“從'1542056400520'推斷型別' }},{ ”name“:”extendedPrice“, ”type“:[ ”double“, ”null“ ], ”doc“ “:”型別從'15 .85'“ } 推斷,{ ”name“:”extendedChange“, ”type“:[ ”double“, ”null“ ], ”doc“:”型別從'0.09'“ }推斷,{ ”name“:”extendedChangePercent“, “type”:[ “double”, “null” ], “doc”:“從'0.00571'推斷的型別' }},{ ”name“:”extendedPriceTime“, ”type“:[ ”long“, ”null“ ] , “doc”:“型別推斷自'1542059622726'” },{ “name”:“previousClose”, “type”:[ “double”, “null” ], “doc“ :”從'16 .24'推斷的型別“ }},{ ”name“:”更改“, “type”:[ “double”, “null” ], “doc”:“從'-0.48'”推斷型別 },{ “name”:“changePercent”, “type”:[ “double”, “null” “ ], ”doc“:”型別從'-0.02956'“ } 推斷,{ ”name“:”iexMarketPercent“, ”type“:[ ”double“, ”null“ ], “doc”:“型別從'0.03258'推斷' }},{ ”name“:“iexVolume”, “type”:[ “int”, “null” ], “doc”:“從'130722推斷的型別'” },{ “name”:“avgTotalVolume”, “type”:[ “int”, “null” ], “doc”:“從'2042809'推斷的型別'” },{ “name”:“iexBidPrice”, “type”:[ “int”, “null” ], “doc”:“從'0'推斷的型別” },{ “name”:“iexBidSize”, “type”:[ “int”, “null” ], “doc”:“從'0'推斷的型別” },{ “name”:“iexAskPrice”, “type”:[ “int”, “null” ], “doc”:“從'0'推斷的型別” },{ “name”:“iexAskSize”, “type”:[ “int”, “null” ], “doc”:“從'0'推斷的型別” },{ “name”:“marketCap”, “type”:[ “int”, “null” ], “doc”:“從'1317308142'推斷的型別'” },{ “name”:“peRatio”, “type”:[ “double”, “null” ], “doc”:“型別從'-7.43'” } 推斷,{ “name”:“week52High”, “type”:[ “double”, “null” ], “doc”:“型別從'26 .22'推斷 }”,{ “name”:“week52Low”, “type”:[ “double”, “null” ], “doc”:“從'15。''推斷型別' }},{ ”name“:”ytdChange“, ”type“:[ ”double“, ”null“ ], “doc”:“型別從'-0.25696247383444343'” }}}} {} type“:”record“,”name“:”iextradingchart“,”fields“:[{ “name”:“date”,“type”:[ “string”,“null” ]},{ “name”:“open”,“type”:[ “double”,“null” ]},{ “name”:“high”,“type”:[ “double”,“null” “ ]},{ ”name“:”low“,”type“:[ ”double“,”null“ ]},{ ”name“:”close“,”type“:[ ”double“,“null” ]},{ “name”:“volume”,“type”:[ “int”,“null” ]},{ “name”:“unadjustedVolume”,“type”:[ “int”,“null” ]},{ “name”:“change”,“type”:[ “double”,“null” ]},{ “name”:“changePercent”,“type”:[ “double”,“null” ]},{ “name”:“vwap”,“type”:[ “double”,“null” ]},{ “name”:“label”,“type”:[ “string”,“null” ]},{ “name”:“changeOverTime”,“type”:[ “int”,“null” ]}]}
給Slack的訊息
檔案:$ {'filename'}
抵消:$ {'kafka.offset'}
分割槽:$ {'kafka.partition'}
主題:$ {'kafka.topic'}
UUID:$ {'uuid'}
記錄數:$ {'record.count'}
檔案大小:$ {fileSize:divide(1024)} K.
請參閱jsonpath.com
拆分
$。*報價
$。*。圖
$。*新聞
陣列為單
$ *。
GETHTTP
網址
https://api.iextrading.com/1.0/stock/market/batch?symbols=hdp&types=quote,news,chart&range=1y&last=25000
檔名
marketbatch.hdp.${'hdp':append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})}
IEX免費提供的資料。檢視IEX的使用條款。
IEX實時價格
查詢
SELECT * FROM FLOWFILE
WHERE latestPrice> week52Low
SELECT * FROM FLOWFILE
WHERE latestPrice <= week52Low
示例輸出
File:855957937589894
抵消:22460
分割槽:0
主題:iextradingquote
UUID:b2a8e797-2249-4689-9a78-4339ddb5ecb4
記錄數:
檔案大小:3K
使用Hive和Spark SQL在Apache Zeppelin中進行資料視覺化
在HDFS中建立Apache ORC檔案之上的表很容易。
將某些訊息推送到Slack