1. 程式人生 > >kettle外掛二次開發

kettle外掛二次開發

先把環境搭建好,這裡可以參考我之前的部落格,或者比較好的參考文獻:

原理部分:

轉換外掛開發:

kettle轉換步驟外掛至少需要實現四個介面:

org.pentaho.di.trans.step.StepInterface 負責資料處理,轉換和流轉。這裡面主要由processRow()方法來處理。

org.pentaho.di.trans.step.StepDataInterface 資料處理設計的具體資料,以及對資料的狀態的設定和回收

org.pentaho.di.trans.step.StepMetaInterface 元資料的處理,載入xml(負責將Dialog資料載入到xml),校驗,主要是對一個步驟的定義的基本資料。

org.pentaho.di.trans.step.StepDialogInterface 提供GUI/dialog編譯步驟的元資料。

轉換步驟外掛各個類名規則:

StepInterface的實現類以外掛的功能先關命名:*.java

StepDataInterface的實現類:*Data.java

StepMetaInterface實現類:*Meta.java

StepDialogInterface實現類:*Dialog.java

<?xml version="1.0" encoding="UTF-8"?>
<plugin
   id="TemplatePlugin"
   iconfile="icon.png"
   description="Template Plugin"
   tooltip="Only there for demonstration purposes"
   category="Demon"                                <!--外掛入所在分類-->
   classname="plugin.template.TemplateStepMeta">   <!--外掛入口類-->
	<libraries>
    	<library name="TemplateStepPlugin.jar"/>
	</libraries>
		    
   <localized_category>
     <category locale="en_US">Demon</category>
     <category locale="zh_CN">測試</category>
   </localized_category>
   <localized_description>
     <description locale="en_US">Template step</description>
     <description locale="en_US">Template step</description>
   </localized_description>
   <localized_tooltip>
     <tooltip locale="en_US">Template step</tooltip>
     <tooltip locale="en_US">Template step</tooltip>
   </localized_tooltip>
</plugin>

這裡對Meta字尾的類重點講解(弄清楚這個類,其實已經解決了trans外掛開發的都大部分問題):

注意:getXML是儲存、loadXML是讀取,這2個方法基本是屬於模板類的程式碼,跟著其他專案一樣寫即可。本質使用xml來定義data資料格式

如果需要對外掛進行錯誤處理步驟,實現錯誤分流(實現:主步驟輸出、錯誤處理步驟),需要重寫父類方法,返回true即可:

@Override
	public boolean supportsErrorHandling() {
		return true;
	}

這裡可以參考一個kettle已有外掛的原始碼:(資料檢驗)

Dialog類:/kettle-7.1-eclipse/ui/org/pentaho/di/ui/trans/steps/validator/ValidatorDialog.java

其他類:/kettle-7.1-eclipse/engine/org/pentaho/di/trans/steps/validator/包下有:

KettleValidatorException.java

Validation.java

Validator.java

ValidatorData.java

ValidatorMeta.java

messages/

國際化相關:messages/:

messages_en_US.properties

messages_fr_FR.properties

messages_it_IT.properties

messages_ja_JP.properties

messages_ko_KR.properties

messages_zh_CN.properties

具體程式碼分析:

開始執行,呼叫:public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException

裡面使用Object[]r = getRow(); // get row, set busy!獲取一行記錄

返回值是boolean,其中有個first來標記是否為第一行。

返回值:

  • false 處理完畢
  • true 迴圈呼叫

有個RunThread類內run方法通過while來不斷迴圈呼叫processRow方法。

在processRow中使用:

  • 正確記錄輸出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
  • 錯誤資料輸出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");

關於日誌:BaseStep有如下方法,直接使用即可列印在kettle控制檯:

logMinimal、logBasic、logDetailed、logDebug等

並在程式碼中判斷級別:

if (log.isDetailed())

...

if (log.isDebug())

...

if (log.isRowlevel())

...

具體meta類中一些變數,可以除錯檢視:在*(按照kettlt命名規範)類中:

meta = (MetaDataPluginMeta) smi;

data = (MetaDataPluginData) sdi;

1,meta拿到了dialog中填寫的值

2,呼叫getRow()方法,直接拿到對應的一行資料,這裡input是表輸入,這裡直接拿到了資料庫中第一行資料(資料,不是表頭)

3,其中first是BaseStep中定義,當第一行時,first標記為true

4,獲取表頭資訊呼叫: List<ValueMetaInterface> metaList = getInputRowMeta().getValueMetaList();

這裡上面聲明瞭一個data物件是使用者儲存資料的,data裡面有欄位來儲存表頭資訊:data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();

5.獲取元件名稱:getStepname()

6.getMetaStore()

7:關於:getFields方法:

  • 已經過時的:

public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space )

  • 非過時的:

public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterface[] info, StepMeta nextStep,VariableSpace space, Repository repository, IMetaStore metaStore )

  • 非過時方法引數含義:
  1. inputRowMeta對應上個元件提供的變數,對資料庫input就是sql裡面具體的column name呼叫(RowMetaInterface) getInputRowMeta().clone()可以獲取
  2. name 當天轉換名稱,getStepname()獲取
  3. info 直接 null
  4. netStep 直接null
  5. space 變數的儲存空間,型別map,來存你定義的內容
  6. getRepository()
  7. getMetaStore()獲取

注:該方法在自己編寫外掛的時候,並沒有重寫,下一個元件就能直接拿到input的欄位名

8.如果要新增新的或者修改一行資料,可以用kettlt提供的:RowDataUtil.addValueData來實現:

Object[] outputRow = RowDataUtil.addValueData(r, data.outputRowMeta.size() - 1, "dummy value");

Object[] addValueData( Object[] rowData, int length, Object extra )

引數含義:

rowData 構造出一行資料,r (getRow()獲取),當前的行值。

length 要替換的值,所在r的下標

extra 新的值

該類提供了下列方法:

9.關於資料分流:

在processRow中使用:

  • 正確記錄輸出:putRow(data.outputRowMeta, outputRow); // copy row to possible alternate rowset(s)
  • 錯誤資料輸出:putError(data.outputRowMeta, outputRow,1, "", "aa", "");

其中putError後面三個記錄,需要在*Meta中重寫,然後在圖形化中滑鼠右鍵,定義錯誤數、錯誤列名等等,就能在下個元件中拿到直接拿到:

@Override

public boolean supportsErrorHandling() {

return true;//返回true即可。

}

整體實現:一個自定義元資料,根據元資料中定義的資料,來檢查input表中的資料是否滿足,

  • 滿足,則正常匯入到目標表。
  • 不滿足,則寫入到錯誤表。

其中元資料索引表,儲存表、表字段、元資料值code。

根據元資料code查詢出一個list集合,該集合中的值,為元資料索引表中指定欄位允許的值。

最終外掛實現效果: