1. 程式人生 > >Apache NiFi Processor實戰

Apache NiFi Processor實戰

高度 hadoop生態圈 功能 簡介 rec 個人理解 記錄 src 哪些

1 前言

Apache NiFi是什麽?NiFi官網給出如下解釋:“一個易用、強大、可靠的數據處理與分發系統”。通俗的來說,即Apache NiFi 是一個易於使用、功能強大而且可靠的數據處理和分發系統,其為數據流設計,它支持高度可配置的指示圖的數據路由、轉換和系統中介邏輯。
為了對NiFi能夠表述的更為清楚,下面通過NiFi的架構來做簡要介紹,如下圖所示。
技術分享
根據官網對各個組件的說明,做摘要翻譯:
? WebServer:其目的在於提供基於HTTP的命令和控制API。
? Flow Controller:這是操作的核心,以Processor為處理單元,提供了用於運行的擴展線程,並管理擴展接收資源時的調度。
? Extensions:在其他文檔中描述了各種類型的NiFi擴展,Extensions的關鍵在於擴展在JVM中操作和執行。
? FlowFile Repository:FlowFile庫的作用是NiFi跟蹤記錄當前在流中處於活動狀態的給定流文件的狀態,其實現是可插拔的,默認的方法是位於指定磁盤分區上的一個持久的寫前日誌。
? Content Repository:Content庫的作用是給定流文件的實際內容字節所在的位置,其實現也是可插拔的。默認的方法是一種相對簡單的機制,即在文件系統中存儲數據塊。
? Provenance Repository:Provenance庫是所有源數據存儲的地方,支持可插拔。默認實現是使用一個或多個物理磁盤卷,在每個位置事件數據都是索引和可搜索的。

2 NiFi Processer介紹

上一節說了那麽多,主要通過NiFi的架構圖介紹了NiFi的基本概念,由概念可知Flow Controller是NiFi的核心,那麽Flow Controller具體是什麽?Flow Controller扮演者文件交流的處理器角色,維持著多個處理器的連接並管理各個Processer,Processer則是實際處理單元。

那麽,讓我們通過NiFi的UI看下NiFi的Processor包含哪些?
技術分享
通過上圖可知,Processor包含各種類型的組件,如amazon、attributes、hadoop等,可通過前綴進行輕易辨識,如Get、Fetch開頭代表獲取,如getFile、getFTP、FetchHDFS,execute代表執行,如ExecuteSQL、ExecuteProcess、ExecuteFlumeSink等均可較容易知其簡單用途。

3 NiFi Processer實戰

說了那麽多,介紹了NiFi的架構和Processor,那麽說好的實戰呢?那麽,本文就以筆者的一個實際需求為例,進行Processor的實戰。需求如下:選取一款數據處理調度工具,對服務器腳本實現定制調度執行。其中服務器的腳本涉及到對環境變量、oracle數據庫、Hadoop生態圈組件的調度。當對服務器腳本調度執行完成後返回腳本運行狀態,並提供失敗重運行接口。
為了實現需求,曾調度過各種調度工具,如Apache Oozie、Azkaban、Pentaho等,最終比較了各種利弊嘗試選用Apache NiFi作為嘗試,通過查閱NiFi Processor API,能更好的支持遠程操作的Processor為ExecuteProcess。下面將對需求進行實戰講解。

3.1 Processor的添加與配置

1. 點擊“Add Processor”,選擇ExecuteProcess後點擊Add按鈕完成添加,如下圖。
技術分享


2. 右擊ExecuteProcess後選擇Configure Processor,對Properties選項卡進行配置,其中每一個配置選項均提供了相關的說明,如下圖。
技術分享

如上圖所示,這裏有必要對各選項進行相關說明。
? Command(執行命令): sh。
? Command Arguments(執行命令參數):-c;ssh user@ip sh js/job/job_hourly.sh `date
? Batch Duration(執行間隔時間):不設置。//我們需求是通過定時調度,而並非按間隔時間執行。
? Redirect Error Stream(重定位流):不設置。
? Argument Delimiter(執行命令參數分隔符):; //以;對參數進行分割。

3.2 Processor調度

NiFi支持三種調度策略,包括Time Driven(時間驅動)、CRON Driven(CRON驅動)和Event Driven(事件驅動,非可選),根據我們實際需求選擇CRON Driven,個人理解CRON即是Crontab的應用,CRON的各參數含義分別代表:秒、分、時、日、月、周、年,需要配合*、?和L共同執行(*代表字段的值都有效;?代表對於指定的字段不指定值;L代表長整形)。如:“0 0 13 * * ?”代表想要在每天下午1點進行調度執行。因此根據我們的需求進行參數的調度配置。如下圖所示。
技術分享

3.3 運行狀態監控

NiFi通過Rest API供開發者調度,這裏我們用Processor API對運行狀態進行監控(狀態參數獲取、Processor的啟動與停止)。
1) 運行狀態監控參數獲取:
命令如下:curl ‘http://IP/nifi-api/processors/processorsID ‘得到如下結果,可通過json解析器解析並獲取狀態。
技術分享
2) Processor的啟動與停止:
NiFi的Processor啟動停止通過其Put方法實現,Put最有效的作用是改變其運行狀態,NiFi的Process總共有三種狀態,即Running、Stopped和Disabled。
那麽我們將開始和停止兩個命令Rest API的放在腳本中執行即可。
? 啟動命令(使用Rest API的Put方法):
curl -i -X PUT -H ‘Content-Type:application/json’ -d ‘
{
“revision”: {
“clientId”: “586ec1d7-015d-1000-6459-28251212434e”,
“version”:17},
“component”: {
“id”: “39e0dafc-015d-1000-918d-bee89ae2226e”,
“state”: “RUNNING”
}
}’ http://IP/nifi-api/processors/processorsID
? 停止命令(使用Rest API的Put方法):
curl -i -X PUT -H ‘Content-Type:application/json’ -d ‘
{
“revision”: {
“clientId”: “586ec1d7-015d-1000-6459-28251212434e”,
“version”:17},
“component”: {
“id”: “39e0dafc-015d-1000-918d-bee89ae2226e”,
“state”: “STOPPED”
}
}’ http://IP/nifi-api/processors/processorsID

4 小結與後記

本文首先對Apache NiFi進行簡介,後以筆者的實際需求為例,對NiFi核心組件Processor的實戰說明。由於NiFi仍然屬於Apache推出時間不長的一個頂級項目,雖功能十分強大,但可查閱資源仍然有限,本文更多的是一個拋磚的過程,其真正強大的功能還在數據處理上,歡迎感興趣的各位進行互相探討。

原文鏈接:http://www.cobub.com/actual-combat-of-apache-nifi-processor/

Apache NiFi Processor實戰