kettle外掛二次開發
先把環境搭建好,這裡可以參考我之前的部落格,或者比較好的參考文獻:
原理部分:
轉換外掛開發:
kettle轉換步驟外掛至少需要實現四個介面:
org.pentaho.di.trans.step.StepInterface 負責資料處理,轉換和流轉。這裡面主要由processRow()方法來處理。
org.pentaho.di.trans.step.StepDataInterface 資料處理設計的具體資料,以及對資料的狀態的設定和回收
org.pentaho.di.trans.step.StepMetaInterface 元資料的處理,載入xml(負責將Dialog資料載入到xml),校驗,主要是對一個步驟的定義的基本資料。
org.pentaho.di.trans.step.StepDialogInterface 提供GUI/dialog編譯步驟的元資料。
轉換步驟外掛各個類名規則:
StepInterface的實現類以外掛的功能先關命名:*.java
StepDataInterface的實現類:*Data.java
StepMetaInterface實現類:*Meta.java
StepDialogInterface實現類:*Dialog.java
<?xml version="1.0" encoding="UTF-8"?> <plugin id="TemplatePlugin" iconfile="icon.png" description="Template Plugin" tooltip="Only there for demonstration purposes" category="Demon" <!--外掛入所在分類--> classname="plugin.template.TemplateStepMeta"> <!--外掛入口類--> <libraries> <library name="TemplateStepPlugin.jar"/> </libraries> <localized_category> <category locale="en_US">Demon</category> <category locale="zh_CN">測試</category> </localized_category> <localized_description> <description locale="en_US">Template step</description> <description locale="en_US">Template step</description> </localized_description> <localized_tooltip> <tooltip locale="en_US">Template step</tooltip> <tooltip locale="en_US">Template step</tooltip> </localized_tooltip> </plugin>
這裡對Meta字尾的類重點講解(弄清楚這個類,其實已經解決了trans外掛開發的都大部分問題):
注意:getXML是儲存、loadXML是讀取,這2個方法基本是屬於模板類的程式碼,跟著其他專案一樣寫即可。本質使用xml來定義data資料格式
如果需要對外掛進行錯誤處理步驟,實現錯誤分流(實現:主步驟輸出、錯誤處理步驟),需要重寫父類方法,返回true即可:
@Override
public boolean supportsErrorHandling() {
return true;
}
這裡可以參考一個kettle已有外掛的原始碼:(資料檢驗)
Dialog類:/kettle-7.1-eclipse/ui/org/pentaho/di/ui/trans/steps/validator/ValidatorDialog.java
其他類:/kettle-7.1-eclipse/engine/org/pentaho/di/trans/steps/validator/包下有:
KettleValidatorException.java
Validation.java
Validator.java
ValidatorData.java
ValidatorMeta.java
messages/
國際化相關:messages/:
messages_en_US.properties
messages_fr_FR.properties
messages_it_IT.properties
messages_ja_JP.properties
messages_ko_KR.properties
messages_zh_CN.properties
具體程式碼分析:
開始執行,呼叫:public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException
裡面使用Object[]r = getRow(); // get row, set busy!獲取一行記錄
返回值是boolean,其中有個first來標記是否為第一行。
返回值:
- false 處理完畢
- true 迴圈呼叫
有個RunThread類內run方法通過while來不斷迴圈呼叫processRow方法。
在processRow中使用:
- 正確記錄輸出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
- 錯誤資料輸出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");
關於日誌:BaseStep有如下方法,直接使用即可列印在kettle控制檯:
logMinimal、logBasic、logDetailed、logDebug等
並在程式碼中判斷級別:
if (log.isDetailed())
...
if (log.isDebug())
...
if (log.isRowlevel())
...
具體meta類中一些變數,可以除錯檢視:在*(按照kettlt命名規範)類中:
meta = (MetaDataPluginMeta) smi;
data = (MetaDataPluginData) sdi;
1,meta拿到了dialog中填寫的值
2,呼叫getRow()方法,直接拿到對應的一行資料,這裡input是表輸入,這裡直接拿到了資料庫中第一行資料(資料,不是表頭)
3,其中first是BaseStep中定義,當第一行時,first標記為true
4,獲取表頭資訊呼叫: List<ValueMetaInterface> metaList = getInputRowMeta().getValueMetaList();
這裡上面聲明瞭一個data物件是使用者儲存資料的,data裡面有欄位來儲存表頭資訊:data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();
5.獲取元件名稱:getStepname()
6.getMetaStore()
7:關於:getFields方法:
- 已經過時的:
public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space )
- 非過時的:
public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space, Repository repository, IMetaStore metaStore )
- 非過時方法引數含義:
- inputRowMeta對應上個元件提供的變數,對資料庫input就是sql裡面具體的column name呼叫(RowMetaInterface) getInputRowMeta().clone()可以獲取
- name 當天轉換名稱,getStepname()獲取
- info 直接 null
- netStep 直接null
- space 變數的儲存空間,型別map,來存你定義的內容
- getRepository()
- getMetaStore()獲取
注:該方法在自己編寫外掛的時候,並沒有重寫,下一個元件就能直接拿到input的欄位名
8.如果要新增新的或者修改一行資料,可以用kettlt提供的:RowDataUtil.addValueData來實現:
Object[] outputRow = RowDataUtil.addValueData(r, data.outputRowMeta.size() - 1, "dummy value");
Object[] addValueData( Object[] rowData, int length, Object extra )
引數含義:
rowData 構造出一行資料,r (getRow()獲取),當前的行值。
length 要替換的值,所在r的下標
extra 新的值
該類提供了下列方法:
9.關於資料分流:
在processRow中使用:
- 正確記錄輸出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
- 錯誤資料輸出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");
其中putError後面三個記錄,需要在*Meta中重寫,然後在圖形化中滑鼠右鍵,定義錯誤數、錯誤列名等等,就能在下個元件中拿到直接拿到:
@Override
public boolean supportsErrorHandling() {
return true;//返回true即可。
}
整體實現:一個自定義元資料,根據元資料中定義的資料,來檢查input表中的資料是否滿足,
- 滿足,則正常匯入到目標表。
- 不滿足,則寫入到錯誤表。
其中元資料索引表,儲存表、表字段、元資料值code。
根據元資料code查詢出一個list集合,該集合中的值,為元資料索引表中指定欄位允許的值。
最終外掛實現效果: