1. 程式人生 > >使用Apache NiFi和Apache Kafka進行實時庫存處理

使用Apache NiFi和Apache Kafka進行實時庫存處理

使用Apache NiFi和Apache Kafka實現從REST到Hive的流式使用案例

第1部分

使用Apache Kafka 2.0和Apache NiFi 1.8,有許多新功能和新功能即將推出。是時候對它們進行測試了。

因此,為了規劃我們將要做的事情,我有一個高階架構圖。我們將提取許多來源,包括REST提要,社交資源,訊息,影象,文件和關係資料。

我們將使用NiFi進行攝取,然後對其進行過濾,處理並將其細分為Kafka主題。Kafka資料將採用Apache Avro格式,並在Hortonworks Schema Registry中指定模式。Spark和NiFi將進行額外的事件處理以及機器學習和深度學習。

這將儲存在Druid中,用於實時分析和摘要。Hive,HDFS和S3將儲存資料以進行永久儲存。我們將使用Superset和Spark SQL + Zeppelin執行儀表板。

我們還將通過Kafka和NiFi向用戶推送清理和彙總的資料。我們將推送到Dockerized應用程式,訊息監聽器,Web客戶端,Slack通道和電子郵件列表。

為了在我們的企業中發揮作用,我們將通過Apache Ranger,Apache Atlas和Apache NiFi進行完全授權,身份驗證,審計,資料加密和資料沿襲。NiFi Registry和GitHub將用於原始碼控制。

我們將通過Apache Ambari提供管理功能。

示例伺服器佈局:

NiFi流量

IEX提供實時免費庫存資料,無需許可證金鑰。資料流速度非常快,幸好Apache NiFi和Kafka沒有問題。

從主題中獲取不同的記錄並在單獨的目錄和表中儲存到HDFS。


讓我們將一個大的REST檔案拆分成感興趣的單個記錄。我們的REST源包含引號,圖表和新聞陣列。

讓我們把一些訊息推到Slack

我們可以輕鬆地使用Apache NiFi中的多個主題。

由於我們有模式,因此在運動時查詢資料很容易

我們為每個Kafka主題建立模式。

我們可以監控在Ambari中通過Kafka傳遞的所有這些訊息(以及在Hortonworks SMM中更詳細的資訊)。

我讀入資料然後可以將它推送到Kafka 1.0和2.0經紀人。

一旦傳送資料,NiFi就會讓我們知道。

使用的專案

  • Apache Kafka

  • Apache NiFi

  • 阿帕奇德魯伊

  • Kafka上的Apache Hive

  • Apache Hive on Druid

  • JDBC上的Apache Hive

  • Apache Zeppelin

  • NLP - Apache OpenNLP和Stanford CoreNLP

  • Horotnworks Schema Registry

  • NiFi登錄檔

  • Apache Ambari

  • 日誌搜尋

  • Hortonworks SMM

  • Hortonworks資料平面服務(DPS)

來源

休息

水槽

  • Apache Hadoop HDFS

  • Apache Kafka

  • Apache Hive

  • 鬆弛

  • S3

  • 阿帕奇德魯伊

  • Apache HBase

話題

  • iextradingnews

  • iextradingquote

  • iextradingchart

  • 個股

  • 網路

HDFS目錄

hdfs dfs -mkdir  -p / iextradingnews

hdfs dfs -mkdir  -p / iextradingquote

hdfs dfs -mkdir  -p / iextradingchart

hdfs dfs -mkdir  -p / stocks

hdfs dfs -mkdir  -p / cyber

hdfs dfs -chmod  -R  777 /

PutHDFS

  • /${kafka.topic}

  • /iextradingchart/859496561256574.orc

  • /iextradingnews/855935960267509.orc

  • /iextradingquote/859143934804532.orc

蜂巢表

CREATE EXTERNAL TABLE如果不是 EXISTS iextradingchart(`date` STRING,開啟DOUBLE,高DOUBLE,低DOUBLE,關閉DOUBLE,音量INT,unadjustedVolume INT,更改DOUBLE,changePercent DOUBLE,vwap DOUBLE,標籤STRING,changeOverTime INT)
STORED AS ORC
LOCATION '/ iextradingchart' ;

CREATE EXTERNAL TABLE如果不是 EXISTS iextradingquote(符號STRING,companyName STRING,primaryExchange STRING,扇區STRING,calculationPrice STRING,開啟DOUBLE,openTime BIGINT,關閉DOUBLE,closeTime BIGINT,高DOUBLE,低DOUBLE,latestPrice DOUBLE,latestSource STRING,latestTime STRING, latestUpdate BIGINT,latestVolume INT,iexRealtimePrice DOUBLE,iexRealtimeSize INT,iexLastUpdated BIGINT,delayedPrice DOUBLE,delayedPriceTime BIGINT,extendedPrice DOUBLE,extendedChange DOUBLE,extendedChangePercent DOUBLE,extendedPriceTime BIGINT,previousClose DOUBLE,change DOUBLE,changePercent DOUBLE,iexMarketPercent DOUBLE,iexVolume INT,avgTotalVolume INT,iexBidPrice INT,iexBidSize INT,iexAskPrice INT,iexAskSize INT,marketCap INT,peRatio 雙倍,第52周高雙,第52周低DOUBLE,ytdChange DOUBLE)
STORED AS ORC
LOCATION '/ iextradingquote' ;

建立外部表如果不存在 iextradingnews(`datetime` STRING,標題STRING,源STRING,url STRING,彙總STRING,相關STRING,影象STRING)
STORED AS ORC
LOCATION '/ iextradingnews' ;

架構

{ “type”:“record”,“name”:“iextradingchart”,“fields”:[{   “name”:“date”,   “type”:[   “string”,   “null”   ]},{   “name”:“open”,   “type”:[   “double”,   “null”   ]},{   “name”:“high”,   “type”:[   “double”,  “null”   ]},{   “name”:“low”,   “type”:[   “double”,   “null”   ]},{   “name”:“close”,   “type”:[   “double”,   “null”   ]},{   “name”:“volume”,   “type”:[   “int”,   “null”   ]},{   “name”:“unadjustedVolume”,   “type”:[   “int”,   “null”   ]},{   “name”:“更改“ ,   ”型別“:[   ”double“,   ”null“   ]},{  “name”:“changePercent”,   “type”:[   “double”,   “null”   ]},{   “name”:“vwap”,   “type”:[   “double”,   “null”   ]},{   “name “:”label“,   ”type“:[   ”string“,   ”null“   ]},{   ”name“:”changeOverTime“,   ”type“:[   ”int“,  “null”   ]}]} { “type”:“record”,“name”:“iextradingquote”,“fields”:[{   “name”:“symbol”,   “type”:[   “string”,   “null”   ],   “doc”:“從'\”HDP推斷的型別\“'”   } ,{   “name”:“companyName”,   “type”:[   “string”,   “null”   ],   “doc”:“從'\”推斷的型別Hortonworks Inc. \“'”   },{   “name”:“primaryExchange”,   “type”:[   “string”,   “null”  ],   “doc”:“從'\”推斷的型別“納斯達克全球選擇\”'“   },{   ”名稱“:”扇區“,   ”型別“:[   ”字串“,   ”空“   ],   ”doc“:”型別從'\'Technology \“'”   },{   “name”:“calculationPrice”,   “type”:[   “string”,   “null”   ],   “doc”:“從'\”中推斷型別關閉\“ ““   },{   ”name“:”open“,   ”type“:[   ”double“,  “null”   ],   “doc”:“從'16。''推斷型別'   }},{   ”name“:”openTime“,   ”type“:[   ”long“,   ”null“   ],   ”doc“:”從'1542033000568'“   },{   ”name“:”close“,   ”type“:[   ”double“,   ”null“   ],   ”doc“:”從'15 .76'推斷的型別'   }},  { “name”:“closeTime”,   “type”:[   “long”,  “null”   ],   “doc”:“從'1542056400520'推斷的型別'   }},{   ”name“:”high“,   ”type“:[   ”double“,   ”null“   ],   ”doc“:”從'16 .37'“   },{   ”name“:”low“,   ”type“:[   ”double“,   ”null“   ],   ”doc“:”從'15。''推斷的型別'   }},{   “name“ :”latestPrice“,   ”type“:[   ”double“,  “null”   ],   “doc”:“從'15。76'推斷的型別”   },{   “name”:“latestSource”,   “type”:[   “string”,   “null”   ],   “doc”:“從'\“關閉\”'“   },{   ”name“:”latestTime“,   ”type“:[   ”string“,   ”null“   ],   ”doc“:”從'\“推斷的型別”2018年11月12日“'“   },{   ”name“:”latestUpdate“,   ”type“:[   ”long“,   “null”   ],   “doc”:“從'1542056400520'推斷的型別'   }},{   ”name“:”latestVolume“,   ”type“:[   ”int“,   ”null“   ],   ”doc“:”型別推斷來自'4012339'“   },{   ”name“:”iexRealtimePrice“,   ”type“:[   ”double“,   ”null“   ],   ”doc“:”型別從'15 .74'推斷“  },{   “name”:“iexRealtimeSize”,   “type”:[  “int”,   “null”   ],   “doc”:“從'43'推斷的型別”   },{   “name”:“iexLastUpdated”,   “type”:[   “long”,   “null”   ],   “doc”:“型別推斷自'1542056397411'”   },{   “name”:“delayedPrice”,   “type”:[   “double”,   “null”   ],   “doc”:“型別推斷自'15.76'“   },{   ”name“:”delayedPriceTime“,   ”type“:[   “long”,   “null”   ],   “doc”:“從'1542056400520'推斷型別'   }},{   ”name“:”extendedPrice“,   ”type“:[   ”double“,   ”null“   ],   ”doc“ “:”型別從'15 .85'“   } 推斷,{   ”name“:”extendedChange“,   ”type“:[   ”double“,   ”null“   ],   ”doc“:”型別從'0.09'“   }推斷,{   ”name“:”extendedChangePercent“,  “type”:[   “double”,   “null”   ],   “doc”:“從'0.00571'推斷的型別'   }},{   ”name“:”extendedPriceTime“,   ”type“:[   ”long“,   ”null“   ] ,   “doc”:“型別推斷自'1542059622726'”   },{   “name”:“previousClose”,   “type”:[   “double”,   “null”   ],   “doc“ :”從'16 .24'推斷的型別“   }},{   ”name“:”更改“,   “type”:[   “double”,   “null”   ],   “doc”:“從'-0.48'”推斷型別   },{   “name”:“changePercent”,   “type”:[   “double”,   “null” “   ],   ”doc“:”型別從'-0.02956'“   } 推斷,{   ”name“:”iexMarketPercent“,   ”type“:[   ”double“,   ”null“   ],  “doc”:“型別從'0.03258'推斷'   }},{   ”name“:“iexVolume”,   “type”:[   “int”,   “null”   ],   “doc”:“從'130722推斷的型別'”   },{   “name”:“avgTotalVolume”,   “type”:[   “int”,   “null”   ],   “doc”:“從'2042809'推斷的型別'”   },{   “name”:“iexBidPrice”,   “type”:[   “int”,   “null”  ],   “doc”:“從'0'推斷的型別”   },{   “name”:“iexBidSize”,   “type”:[   “int”,   “null”   ],   “doc”:“從'0'推斷的型別”   },{   “name”:“iexAskPrice”,   “type”:[   “int”,   “null”   ],   “doc”:“從'0'推斷的型別”   },{   “name”:“iexAskSize”,   “type”:[   “int”,   “null”   ],  “doc”:“從'0'推斷的型別”   },{   “name”:“marketCap”,   “type”:[   “int”,   “null”   ],   “doc”:“從'1317308142'推斷的型別'”   },{   “name”:“peRatio”,   “type”:[   “double”,   “null”   ],   “doc”:“型別從'-7.43'”   } 推斷,{   “name”:“week52High”,   “type”:[   “double”,   “null”   ],   “doc”:“型別從'26 .22'推斷   }”,{   “name”:“week52Low”,   “type”:[   “double”,   “null”   ],   “doc”:“從'15。''推斷型別'   }},{   ”name“:”ytdChange“,   ”type“:[   ”double“,   ”null“   ],   “doc”:“型別從'-0.25696247383444343'”   }}}} {} type“:”record“,”name“:”iextradingchart“,”fields“:[{ “name”:“date”,“type”:[ “string”,“null” ]},{ “name”:“open”,“type”:[ “double”,“null” ]},{ “name”:“high”,“type”:[ “double”,“null” “ ]},{ ”name“:”low“,”type“:[ ”double“,”null“ ]},{ ”name“:”close“,”type“:[ ”double“,“null” ]},{ “name”:“volume”,“type”:[ “int”,“null” ]},{ “name”:“unadjustedVolume”,“type”:[ “int”,“null” ]},{ “name”:“change”,“type”:[ “double”,“null” ]},{ “name”:“changePercent”,“type”:[ “double”,“null” ]},{ “name”:“vwap”,“type”:[ “double”,“null” ]},{ “name”:“label”,“type”:[ “string”,“null” ]},{ “name”:“changeOverTime”,“type”:[ “int”,“null” ]}]}

給Slack的訊息

檔案:$ {'filename'}

抵消:$ {'kafka.offset'}

分割槽:$ {'kafka.partition'}

主題:$ {'kafka.topic'}

UUID:$ {'uuid'}

記錄數:$ {'record.count'}

檔案大小:$ {fileSize:divide(1024)} K.

請參閱jsonpath.com

拆分

$。*報價

$。*。圖

$。*新聞

陣列為單

$ *。

GETHTTP

網址

https://api.iextrading.com/1.0/stock/market/batch?symbols=hdp&types=quote,news,chart&range=1y&last=25000

檔名

 marketbatch.hdp.${'hdp':append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})} IEX免費提供的資料。檢視IEX的使用條款。

IEX實時價格 

查詢

SELECT * FROM FLOWFILE

WHERE latestPrice> week52Low

SELECT * FROM FLOWFILE

WHERE latestPrice <= week52Low

示例輸出

File:855957937589894

抵消:22460

分割槽:0

主題:iextradingquote

UUID:b2a8e797-2249-4689-9a78-4339ddb5ecb4

記錄數:

檔案大小:3K

使用Hive和Spark SQL在Apache Zeppelin中進行資料視覺化

在HDFS中建立Apache ORC檔案之上的表很容易。

將某些訊息推送到Slack