【NIFI】 Apache NiFI 之 ExecuteScript處理(二) 【NIFI】 Apache NiFI 之 ExecuteScript處理(一)
本例介紹NiFI ExecuteScript處理器的使用,使用的指令碼引擎ECMScript
接上一篇【NIFI】 Apache NiFI 之 ExecuteScript處理(一)
ExecuteScript使用
1、動態屬性
其中一個功能是動態屬性的概念,也稱為使用者定義屬性。這些是處理器的屬性,使用者可以為其設定屬性名稱和值。並非所有處理器都支援/使用動態屬性,但ExecuteScript會將動態屬性作為變數傳遞,這些變數引用與屬性值對應的PropertyValue物件。這裡有兩件重要的事情需要注意:
-
- 由於屬性名稱按原樣繫結到變數名稱,因此必須為指定的程式語言支援動態屬性的命名約定。
- 使用PropertyValue物件(而不是值的String表示形式)以允許指令碼在將屬性值評估為String之前對屬性的值執行各種操作。如果已知屬性包含文字值,則可以對變數呼叫getValue()方法以獲取其String表示形式。相反,如果值可能包含表示式語言,或者您希望將值轉換為除String之外的值(例如對布林物件的值為'true'),則還有這些操作的方法。這些示例在下面的配方中說明,假設我們有兩個屬性'myProperty1'和'myProperty2'定義如下:
- 由於屬性名稱按原樣繫結到變數名稱,因此必須為指定的程式語言支援動態屬性的命名約定。
目的:使用者輸入了動態屬性以供指令碼使用(例如,配置引數)。
方法:使用變數的PropertyValue物件中的getValue()方法。此方法返回動態屬性值的String表示形式。請注意,如果值中存在表示式語言,則getValue()將不對其進行求值
Examples
Javascript
1 var myValue1 = myProperty1.getValue() 2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
2、新增模組
ExecuteScript的另一個功能是能夠向類路徑新增外部“模組”,這允許您利用各種第三方庫,指令碼等。但是每個指令碼引擎都以不同方式處理模組的概念,因此我將討論它們分別。一般來說,有兩種型別的模組,Java庫(JAR)和指令碼(用與ExecuteScript中相同的語言編寫
配置如下:
目的:指令碼執行時,使用Java庫
方法:配置ExecuteScript,新增Module Directory目錄(裡面都是jar包),指令碼中獲取jar包中的類,使用jar包中的類方法
Examples
Javascript
1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper"); 2 var Map = Java.type("java.util.Map"); 3 4 5 var objectMapper = new ObjectMapper(); 6 7 8 // java物件轉 json字串 9 var str = objectMapper.writeValueAsString(obj); 10 11 // json字串 轉 java物件 12 var map = objectMapper.readValue(str, Map.class);
3、狀態儲存
NiFi(我相信0.5.0)為處理器和其他NiFi元件提供了持久儲存一些資訊的能力,以便在元件周圍實現某些狀態功能,例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次執行時,它只會獲取其值大於目前已見過的行(即儲存在州經理)。
NiFi元件可以選擇將其狀態儲存在叢集級別或本地級別。請注意,在獨立的NiFi例項中,“群集範圍”與“本地範圍”相同。範圍的選擇通常是關於在流中,每個節點上的相同處理器是否可以共享狀態資料。如果群集中的例項不需要共享狀態,則使用本地範圍。在Java中,這些選項作為名為Scope的列舉提供,因此當我引用Scope.CLUSTER和Scope.LOCAL時,我分別表示叢集和本地作用域。
要在ExecuteScript中使用狀態管理功能(下面是特定於語言的示例),您可以通過呼叫ProcessContext的getStateManager()方法獲得對StateManager的引用(回想一下,每個引擎都獲得一個名為“context”的變數,其中包含ProcessContext例項)。然後,您可以在StateManager物件上呼叫以下方法:
void setState(Map <String,String> state,Scope scope) - 更新元件在給定範圍內的狀態值,並將其設定為給定值。請注意,該值是一個Map; “元件狀態”的概念是每個構成較低級別狀態的所有鍵/值對的對映。Map會立即更新以提供原子性。
StateMap getState(作用域範圍) - 返回給定範圍內元件的當前狀態。此方法永遠不會返回null; 相反,它是一個StateMap物件,如果尚未設定狀態,StateMap的版本將為-1,值的對映將為空。通常會建立一個新的Map <String,String>來儲存更新的值,然後呼叫setState()或replace()。
boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope) - 當且僅當值與給定的oldValue相同時,才更新元件狀態(在給定範圍內)的值到新值。如果狀態已更新為新值,則返回true; 否則,如果狀態的值不等於oldValue,則返回false。
void clear(範圍範圍) - 清除給定範圍內元件狀態的所有鍵和值。
目的1:指令碼需要從狀態管理器獲取當前鍵/值對以供指令碼使用(例如,更新)
方法1:使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap(),然後使用toMap()轉換為鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法並不常用,因為Map通常會更新,一旦完成,就必須為StateManager設定。
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
目的2:指令碼希望使用新的鍵/值對對映更新狀態對映。
方法2:要獲取當前StateMap物件,請再次使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的配方(使用toMap()方法),您可以使用現有值建立一個新的Map,然後只更新所需的條目。請注意,如果沒有當前對映(即StateMap.getVersion()返回-1),則replace()將不起作用,因此示例將相應地檢查並呼叫setState()或replace()。從新的ExecuteScript例項執行時,StateMap版本將為-1,因此在單次執行後,如果右鍵單擊ExecuteScript處理器並選擇View State,您應該看到如下內容:
Examples
Javascript
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 var newMap = {'myKey1': 'myValue1'}; 5 if (stateMap.version == -1) { 6 stateManager.setState(newMap, Scope.CLUSTER); 7 } else { 8 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 9 }
ExecuteScript-Demo
Demo流程:先存入快取myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入快取中
ExecuteScript內容如下:
1 var Scope = Java.type('org.apache.nifi.components.state.Scope'); 2 var stateManager = context.stateManager; 3 var stateMap = stateManager.getState(Scope.CLUSTER); 4 5 if (stateMap.version == -1) { 6 var newMap = {'myKey1': "1"}; 7 stateManager.setState(newMap, Scope.CLUSTER); 8 } else { 9 10 11 var myValue1 = stateMap.toMap().get("myKey1"); 12 myValue1 = myValue1*1 + 1; 13 var newMap = {'myKey1': myValue1 + ""}; 14 15 // 替換 16 stateManager.replace(stateMap, newMap, Scope.CLUSTER); 17 }