Kettle plugin 外掛開發
Kettle本身提供了很多元件,多個元件一起構成一個transformation(轉換),多個轉換一起構成一個job(任務)。kettle的元件已經非常豐富,在元件不滿足需求時可以在kettle上面開發自己的元件,kettle支援的元件開發如下:
Kettle中的外掛包含兩部分:
一是系統本身就已經實現的功能點,在原始碼目錄src中說明,如kettle-steps.xml;
二是系統之外開發的外掛,在plugins目錄對應外掛目錄下的plugins.xml說明,如plugins/steps/S3CsvInput/plugins.xml。
1.1系統整合外掛定義( step )
內容 |
位置 |
外掛說明資訊 |
engine/src/kettle-steps.xml,所有外掛集中說明 |
外掛原始碼 |
src-engine與src-ui下,org.pentaho.di.steps.外掛名 |
外掛圖片 |
外掛說明xml中說明 |
外掛介面文字說明 |
org.pentaho.di.steps.外掛名.messages |
外掛說明資訊中包括描述資訊、類名(包括package,反射用)、父級目錄(Spoon左側欄目錄)、提示資訊和圖片資訊。Kettle使用國家化方式程式設計,所以軟體中的所有文字描述均由messages_**.properties提供。
系統整合外掛配置說明kettle-steps.xml結構:
1.2擴充套件外掛定義(step)
所有新開發的擴充套件外掛,均放在同一的目錄下進行管理,外掛管理模組會自動去該目錄下進行搜尋查詢。外掛目錄結構如下所示(以S3CsvInput步驟為例):
擴充套件外掛定義
內容 |
位置 |
外掛說明資訊 |
plugins/steps/外掛名稱/plugin.xml |
外掛原始碼 |
plugins/steps/外掛名稱/*.jar |
外掛依賴包 |
plugins/steps/外掛名稱/ |
擴充套件外掛與系統整合外掛的說明內容相似,擴充套件外掛增加ID屬性和依賴屬性,同時他的目錄結構、描述資訊和提示資訊均能進行國際化配置。
1.3擴充套件外掛配置說明plugin.xml
ID:在kettle外掛中必須全域性唯一,因為被kettle序列化了,所以不要隨便改變
Iconfile: kettle中外掛顯示的圖片,必須是png圖片
Description:外掛描敘,顯示在樹形選單裡面。
Tooltip:樹形選單中,滑鼠滑過的時候顯示的提示資訊
Category:外掛顯示的父目錄
Classname:元資料類
Library:指明瞭外掛需要載入所依賴的jar包
2.1 Kettle轉換步驟外掛至少需要實現四個介面
org.pentaho.di.trans.step.StepMetaInterface:元資料的處理,載入xml,校驗,主要是對一個步驟的定義的基本資料。
org.pentaho.di.trans.step. StepDataInterface:資料處理涉及的具體資料,以及對資料的狀態的設定和回收。
org.pentaho.di.trans.step. StepInterface:負責資料處理,轉換和流轉。這裡面主要由processRow()方法來處理。
org.pentaho.di.trans.step. StepDialogInterface:提供GUI/dialog,編輯步驟的元資料。
對於以上四個介面的實現,都有相應的基類,具體的步驟只需要繼承基類和實現相應的介面即可。
Step擴充套件介面:
Java 介面 |
基類 |
主要功能 |
StepMetaInterface |
BaseStepMeta |
儲存step設定資訊 驗證step設定資訊 序列化step設定資訊 提供獲取step類的方法 |
StepDialogInterface |
BaseStepDialog |
step屬性資訊配置視窗 |
StepInterface |
BaseStep |
處理rows |
StepDataInterface |
BaseStepData |
為資料處理提高資料儲存 |
2.2 Kettle轉換步驟外掛各個類命名推薦規則
stepInterface的實現類以外掛的功能相關命名:*.java
stepDataInterface的實現類:*Data.java
stepMetaInterface的實現類:*Meta.java
StepDialogInterface的實現類:*Dialog.java
一、元資料類:
下面是元資料的幾個關鍵的方法,注意元資料類裡面用私有成員變數outputField儲存了下一個步驟的輸出欄位。
// keeptrack of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()
// xml型別元資料的寫入與載入
public String getXML()
public void loadXML(…)
// 資源庫型別元資料的寫入與載入
public void readRep(…)
public void saveRep(…)
//提供有關步驟如何影響處理行欄位結構的資訊
public void getFields(…)
//為步驟進行擴充套件有效性檢查
public void check(…)
// 為step,data
和dialog
類提供例項
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…)
TemplateStepMeta元資料類其實還有很多方面,不過大多被他的父類BaseStepMeta給預設實現了,這些預設的實現足以使我們的元資料類工作良好。
二、對話方塊類:
TemeplateStepDialog為步驟實現了對話方塊的設定,kettle的使用者介面部件是使用的eclipse的swt框架。在開發過程中,一個對話方塊物件擁有一個元資料物件,它記錄了應該從哪裡讀取配置?應該把設定好的配置儲存在哪裡?
三、步驟類:
步驟類是實際的處理和轉換工作的地方。因為大部分樣本程式碼已經由父類BaseStep提供了,外掛開發者只需要關注下面幾個特定的方法就行了。
//初始化和關閉
public boolean init(…)
public void dispose(..)
// 處理行
public void run()
public boolean processRow(..) //步驟主要資料處理方法
Init()方法在轉換執行前被kettle呼叫,轉換必須在所有步驟初始化成功時才真正執行。dispose()方法是在步驟執行完之後執行(非轉換執行完),它完成資源的關閉,像檔案控制代碼、快取等等。
run()方法在實際處理記錄集的時候呼叫,其實就是個迴圈,由上游獲取的每條記錄交由processRow()方法處理,當此步驟沒有資料處理或轉換被停止時退出迴圈。
processRow()方法在處理單條記錄的時候被呼叫。這個方法通常通過呼叫getRow()來獲取需要處理的單條記錄。這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理資料時。processRow()隨後的流程將執行轉換工作並呼叫putRow()方法將處理過的記錄放到它的下游步驟。
四、資料類:
大多數步驟都需要臨時的緩衝或者臨時的儲存。資料類就是這些資料合適的存放位置。每一個執行執行緒擁有的一個數據類的例項,所以它們能在獨立的空間裡面執行。TemplateStepData繼承自BaseStepData,作為一個經驗法則,不要將non-constant欄位放置BaseStepData類裡面,如果一定要,請將它最好放置在派生類TemplateStepData裡面.
2.4 外掛配置
在plugins\steps下新建資料夾TemplateStepPlugin,將編寫好的外掛原始碼編譯打包成一個jar包,連同外掛顯示圖片和寫好的plugin.xml一起放入TemplateStepPlugin。
3. 外掛註冊與查詢
3.1外掛的註冊
Spoon在啟動的時候會對所有外掛進行註冊,並儲存在PluginRegistry類裡面。平臺通過查詢PluginRegistry登錄檔獲取外掛資訊。Kettle安裝外掛需要進行重啟,解除安裝外掛也只需簡單的刪除plugins目錄結構下對應的檔案即可。
外掛註冊時序圖:
PluginRegistry首選註冊本系統的外掛型別處理類,原始碼中註冊了10種型別:PluginRegistry.addPluginType(RowDistributionPluginType.getInstance());
PluginRegistry.addPluginType(StepPluginType.getInstance());
PluginRegistry.addPluginType(PartitionerPluginType.getInstance());
PluginRegistry.addPluginType(JobEntryPluginType.getInstance());
PluginRegistry.addPluginType(LogTablePluginType.getInstance());
PluginRegistry.addPluginType(RepositoryPluginType.getInstance());
PluginRegistry.addPluginType(LifecyclePluginType.getInstance());
PluginRegistry.addPluginType(KettleLifecyclePluginType.getInstance());
PluginRegistry.addPluginType(ImportRulePluginType.getInstance());
PluginRegistry.addPluginType(CartePluginType.getInstance());
此處以StepPluginType為例。註冊型別處理類後,PluginRegistry按照不同的型別進行外掛搜尋(模板模式),基類BasePluginType提供了本地搜尋、jar搜尋、xml資訊搜尋3種鉤子。根據搜尋結果,按照不同的外掛型別儲存在PluginRegistry中。
3.2 外掛查詢
PluginRegistry提供了外掛查詢功能,準確的來說是外掛資訊的查詢功能。以steps在左側功能欄裡面的顯示為例,進行外掛查詢的說明。PluginRegistry提供了getPlugins獲取指定外掛型別列表、getPlugin獲取指定成名外掛、getCateories獲取目錄結構、getClass獲取指定外掛類等方法。
左側顯示由Spoon.refreshCoreObjects()函式實現,如果選擇時trans相關的內容,將顯示所有的step外掛。流程圖如下所示:
實現程式碼:
if (showTrans) {
selectionLabel.setText(BaseMessages.getString(PKG, "Spoon.Steps"));
PluginRegistry registry = PluginRegistry.getInstance();
final List<PluginInterface> basesteps = registry.getPlugins(StepPluginType.class); //獲取外掛資訊
final List<String> basecat = registry.getCategories(StepPluginType.class); //獲取目錄資訊
if( stepFilter == null )
{
stepFilter = new StepFilterConfigure();
}
//items filter...
for (int i = 0; i <basecat.size(); i++) { //依次新增獲取到的目錄
StringtmpCatName = basecat.get(i).toLowerCase();
if(stepFilter.getTransCategoriesFilteredList().contains(tmpCatName))
{
//filter categories
continue;
}
TreeItem item = new TreeItem(coreObjectsTree, SWT.NONE);
item.setText(basecat.get(i));
item.setImage(GUIResource.getInstance().getImageArrow());
//replace icon
Image newIcon = null;
String iconName = "cate_" + basecat.get(i)+ ".png";
String path = "ui/images/";
String iconPath = path + iconName;
newIcon = GUIResource.getInstance().getImageByName(iconPath);
if( newIcon != null )
{
item.setImage(newIcon);
}
for (int j = 0; j <basesteps.size(); j++) { //依次新增獲取到的該目錄下的外掛
if(basesteps.get(j).getCategory().equalsIgnoreCase(basecat.get(i)) && !stepFilter.getTransStepFilteredList().contains(basesteps.get(j).getName())){
final Image stepimg = GUIResource.getInstance().getImagesStepsSmall()
.get(basesteps.get(j).getIds()[0]);
String pluginName =basesteps.get(j).getName();
String pluginDescription =basesteps.get(j).getDescription();
if (!filterMatch(pluginName) &&!filterMatch(pluginDescription))
continue;
TreeItem stepItem = new TreeItem(item, SWT.NONE);
stepItem.setImage(stepimg);
stepItem.setText(pluginName);
stepItem.addListener(SWT.Selection, new Listener() {
public void handleEvent(Eventarg0) {
//System.out.println("Tree item Listener fired");
}
});
coreStepToolTipMap.put(pluginName,pluginDescription);
}
}
}
}