1. 程式人生 > >【NIFI】 Apache NiFI 之 ExecuteScript處理(二) 【NIFI】 Apache NiFI 之 ExecuteScript處理(一)

【NIFI】 Apache NiFI 之 ExecuteScript處理(二) 【NIFI】 Apache NiFI 之 ExecuteScript處理(一)

  本例介紹NiFI ExecuteScript處理器的使用,使用的指令碼引擎ECMScript

  接上一篇【NIFI】 Apache NiFI 之 ExecuteScript處理(一)

ExecuteScript使用

  1、動態屬性

    其中一個功能是動態屬性的概念,也稱為使用者定義屬性。這些是處理器的屬性,使用者可以為其設定屬性名稱和值。並非所有處理器都支援/使用動態屬性,但ExecuteScript會將動態屬性作為變數傳遞,這些變數引用與屬性值對應的PropertyValue物件。這裡有兩件重要的事情需要注意:

    • 由於屬性名稱按原樣繫結到變數名稱,因此必須為指定的程式語言支援動態屬性的命名約定。
      例如,Groovy不支援句點(。)作為有效的變數字元,因此動態屬性(如“my.value”)將導致處理器失敗。在這種情況下,有效的替代方案是“myValue”。
    • 使用PropertyValue物件(而不是值的String表示形式)以允許指令碼在將屬性值評估為String之前對屬性的值執行各種操作。如果已知屬性包含文字值,則可以對變數呼叫getValue()方法以獲取其String表示形式。相反,如果值可能包含表示式語言,或者您希望將值轉換為除String之外的值(例如對布林物件的值為'true'),則還有這些操作的方法。這些示例在下面的配方中說明,假設我們有兩個屬性'myProperty1'和'myProperty2'定義如下:

    

    目的:使用者輸入了動態屬性以供指令碼使用(例如,配置引數)。

    方法:使用變數的PropertyValue物件中的getValue()方法。此方法返回動態屬性值的String表示形式。請注意,如果值中存在表示式語言,則getValue()將不對其進行求值

    Examples

      Javascript

1 var myValue1 = myProperty1.getValue()
2 var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()

 

   2、新增模組

    ExecuteScript的另一個功能是能夠向類路徑新增外部“模組”,這允許您利用各種第三方庫,指令碼等。但是每個指令碼引擎都以不同方式處理模組的概念,因此我將討論它們分別。一般來說,有兩種型別的模組,Java庫(JAR)和指令碼(用與ExecuteScript中相同的語言編寫

    配置如下:
      

    目的:指令碼執行時,使用Java庫

    方法:配置ExecuteScript,新增Module Directory目錄(裡面都是jar包),指令碼中獲取jar包中的類,使用jar包中的類方法

    Examples

      Javascript

 1 var ObjectMapper = Java.type("com.fasterxml.jackson.databind.ObjectMapper");
 2 var Map = Java.type("java.util.Map");
 3 
 4 
 5 var objectMapper = new ObjectMapper();
 6 
 7 
 8 // java物件轉 json字串
 9 var str = objectMapper.writeValueAsString(obj);
10 
11 // json字串 轉 java物件
12 var map = objectMapper.readValue(str, Map.class);  

  3、狀態儲存

    NiFi(我相信0.5.0)為處理器和其他NiFi元件提供了持久儲存一些資訊的能力,以便在元件周圍實現某些狀態功能,例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次執行時,它只會獲取其值大於目前已見過的行(即儲存在州經理)。

    NiFi元件可以選擇將其狀態儲存在叢集級別或本地級別。請注意,在獨立的NiFi例項中,“群集範圍”與“本地範圍”相同。範圍的選擇通常是關於在流中,每個節點上的相同處理器是否可以共享狀態資料。如果群集中的例項不需要共享狀態,則使用本地範圍。在Java中,這些選項作為名為Scope的列舉提供,因此當我引用Scope.CLUSTER和Scope.LOCAL時,我分別表示叢集和本地作用域。

    要在ExecuteScript中使用狀態管理功能(下面是特定於語言的示例),您可以通過呼叫ProcessContext的getStateManager()方法獲得對StateManager的引用(回想一下,每個引擎都獲得一個名為“context”的變數,其中包含ProcessContext例項)。然後,您可以在StateManager物件上呼叫以下方法:

    void setState(Map <String,String> state,Scope scope - 更新元件在給定範圍內的狀態值,並將其設定為給定值。請注意,該值是一個Map; “元件狀態”的概念是每個構成較低級別狀態的所有鍵/值對的對映。Map會立即更新以提供原子性。

    StateMap getState(作用域範圍 - 返回給定範圍內元件的當前狀態。此方法永遠不會返回null; 相反,它是一個StateMap物件,如果尚未設定狀態,StateMap的版本將為-1,值的對映將為空。通常會建立一個新的Map <String,String>來儲存更新的值,然後呼叫setState()或replace()。

    boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope - 當且僅當值與給定的oldValue相同時,才更新元件狀態(在給定範圍內)的值到新值。如果狀態已更新為新值,則返回true; 否則,如果狀態的值不等於oldValue,則返回false。

    void clear(範圍範圍 - 清除給定範圍內元件狀態的所有鍵和值。

    目的1:指令碼需要從狀態管理器獲取當前鍵/值對以供指令碼使用(例如,更新)

    方法1:使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap(),然後使用toMap()轉換為鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法並不常用,因為Map通常會更新,一旦完成,就必須為StateManager設定。

    Examples

      Javascript

1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
2 var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();

 

    目的2:指令碼希望使用新的鍵/值對對映更新狀態對映。

    方法2:要獲取當前StateMap物件,請再次使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的配方(使用toMap()方法),您可以使用現有值建立一個新的Map,然後只更新所需的條目。請注意,如果沒有當前對映(即StateMap.getVersion()返回-1),則replace()將不起作用,因此示例將相應地檢查並呼叫setState()或replace()。從新的ExecuteScript例項執行時,StateMap版本將為-1,因此在單次執行後,如果右鍵單擊ExecuteScript處理器並選擇View State,您應該看到如下內容:

      

    Examples

      Javascript

1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
2 var stateManager = context.stateManager;
3 var stateMap = stateManager.getState(Scope.CLUSTER);
4 var newMap = {'myKey1': 'myValue1'};
5 if (stateMap.version == -1) {
6   stateManager.setState(newMap, Scope.CLUSTER);
7 } else {
8   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
9 }

 

ExecuteScript-Demo

  Demo流程:先存入快取myKey1 =》 第二次取出myKey1的值 =》 myKey1的值修改 =》 再次存入快取中

  ExecuteScript內容如下:

 1 var Scope = Java.type('org.apache.nifi.components.state.Scope');
 2 var stateManager = context.stateManager;
 3 var stateMap = stateManager.getState(Scope.CLUSTER);
 4 
 5 if (stateMap.version == -1) {
 6   var newMap = {'myKey1': "1"};
 7   stateManager.setState(newMap, Scope.CLUSTER);
 8 } else {
 9     
10 
11    var myValue1 = stateMap.toMap().get("myKey1");
12    myValue1 = myValue1*1 + 1;
13    var newMap = {'myKey1': myValue1 + ""};
14   
15   // 替換
16   stateManager.replace(stateMap, newMap, Scope.CLUSTER);
17 }