手把手教你使用Kettle JAVA API進行資料抽取
阿新 • • 發佈:2019-01-09
Kettle作為一款優秀的資料抽取程式,因為高效穩定的效能,一直被廣大使用者所喜愛,並且還在國內廣受好評。因為其本身使用純JAVA編寫,所以其JAVA API使用起來自然也是非常簡便。雖然其本身自帶的元件已經非常好用,並且能夠滿足豐富的場景。但可能有些場景下,我們可能需要通過其他的方式來實現,本篇我們將介紹Kettle的JAVA API的使用。
一、環境搭建
核心jar包的pom.xml配置如下:
<dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>4.4.0-stable</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>4.4.0-stable</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-db</artifactId> <version>4.4.0-stable</version> </dependency>
二、程式碼部分
1、初始化環境
public void initKettleEnvironment(HttpServletRequest request) throws KettleException { if (KettleEnvironment.isInitialized()) { return; } /** * 為避免在部分網路環境中無法完成初始化,需要自行處理 */ if (request == null) { // 執行環境初始化 KettleEnvironment.init(); } else { String userDir = System.getProperty("user.dir"); String kettleHome = request.getSession().getServletContext().getRealPath(File.separator "WEB-INF"); // 設定使用者路徑和系統環境,包括使用者路徑和主目錄 System.setProperty("user.dir", kettleHome); System.setProperty("KETTLE_HOME", kettleHome); // 執行環境初始化 KettleEnvironment.init(); // 避免造成影響其他程式的執行,還原使用者路徑 System.setProperty("user.dir", userDir); } }
2、建立轉化元
新增配置陣列,配置轉化元
public TransMeta buildTransMeta(String metaName, String[] transXML) throws KettleXMLException { TransMeta transMeta = new TransMeta(); // 設定轉化元的名稱 transMeta.setName(metaName); // 新增轉換的資料庫連線 for (int i = 0; i < transXML.length; i ) { DatabaseMeta databaseMeta = new DatabaseMeta(transXML[i]); transMeta.addDatabase(databaseMeta); } return transMeta; }
3、新增日誌(可選操作)
public void setStepLogTable(TransMeta transMeta, String connDbName, String tableName) {
VariableSpace space = new Variables();
// 將step日誌資料庫配置名加入到變數集中
space.setVariable(Const.KETTLE_TRANS_LOG_DB, connDbName);
space.initializeVariablesFrom(null);
StepLogTable stepLogTable = StepLogTable.getDefault(space, transMeta);
// 配置StepLogTable使用的資料庫配置名稱
stepLogTable.setConnectionName(connDbName);
// 設定Step日誌的表名
stepLogTable.setTableName(tableName);
// 設定TransMeta的StepLogTable
transMeta.setStepLogTable(stepLogTable);
}
4、建立外掛註冊器
public PluginRegistry getRegistry() {
// 外掛註冊,用於註冊轉換中需要用到的外掛
return PluginRegistry.getInstance();
}
5、設定表輸入步驟元
該步驟用於獲取源資料
/**
* 設定表輸入步驟
* @param transMeta
* @param registry
* @param sourceDbName
* @param sql
* @param stepName
* @return
*/
public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql,
String stepName) {
// 建立表輸入
TableInputMeta tableInputMeta = new TableInputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta);
// 指定資料來源資料庫配置名
DatabaseMeta source = transMeta.findDatabase(sourceDbName);
tableInputMeta.setDatabaseMeta(source);
tableInputMeta.setSQL(sql);
// 將表輸入新增到轉換中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta);
// 給步驟新增在spoon工具中的顯示位置
stepMeta.setDraw(true);
stepMeta.setLocation(100, 100);
// 將表輸入新增到步驟中
transMeta.addStep(stepMeta);
return stepMeta;
}
6、更新步驟元
該步驟用於將獲取到的資料更新到目標資料庫中
/**
* 設定表輸出步驟,用於整表抽取
* @param transMeta
* @param registry
* @param targetDbName
* @param targetTableName
* @param stepName
* @return
*/
public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName,
String targetTableName, String stepName) {
// 建立表輸出
TableOutputMeta tableOutputMeta = new TableOutputMeta();
String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta);
// 配置表輸出的目標資料庫配置名
DatabaseMeta targetDb = transMeta.findDatabase(targetDbName);
tableOutputMeta.setDatabaseMeta(targetDb);
tableOutputMeta.setTableName(targetTableName);
// 將表輸出新增到轉換中
StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta);
transMeta.addStep(stepMeta);
return stepMeta;
}
/**
* 設定表插入與更新步驟,用於表中部分欄位更新
* @param transMeta
* @param registry
* @param targetDbName
* @param targetTableName
* @param updatelookup lookup檢索欄位
* @param updateStream lookup更新欄位
* @param updateStream2 lookup更新欄位2
* @param conditions lookup條件
* @param updateOrNot lookup更新標記
* @param stepName
* @return
*/
public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName,
String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2,
String[] conditions, Boolean[] updateOrNot, String stepName) {
// 建立插入與更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
// 配置目標資料庫配置名
DatabaseMeta database_target = transMeta.findDatabase(targetDbName);
insertUpdateMeta.setDatabaseMeta(database_target);
// 設定目標表名
insertUpdateMeta.setTableName(targetTableName);
// 設定用來查詢的關鍵字
insertUpdateMeta.setKeyLookup(updatelookup);
insertUpdateMeta.setKeyStream(updateStream);
insertUpdateMeta.setKeyStream2(updateStream2);// 這一步不能省略
insertUpdateMeta.setKeyCondition(conditions);
// 設定要更新的欄位
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
// 新增步驟到轉換中
StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(250, 100);
transMeta.addStep(stepMeta);
return stepMeta;
}
7、繫結關聯步驟
該步驟用於將資料獲取和匯入更新的步驟關聯繫結
/**
* 用於將表輸入步驟與第二步驟繫結
* @param transMeta
* @param from
* @param to
*/
public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) {
transMeta.addTransHop(new TransHopMeta(from, to));
}
8、執行抽取
執行資料抽取
/**
* 執行抽取
* @param transMeta
* @param targetDbName
*/
public void executeTrans(TransMeta transMeta, String targetDbName) {
try {
Database database = new Database(null, transMeta.findDatabase(targetDbName));
database.connect();
Trans trans = new Trans(transMeta);
trans.execute(new String[] { "start..." });
trans.waitUntilFinished();
// 關閉資料庫連線
database.disconnect();
if (trans.getErrors() > 0) {
throw new RuntimeException("There were errors during transformation execution.");
}
} catch (KettleDatabaseException e) {
e.printStackTrace();
} catch (KettleException e) {
e.printStackTrace();
}
}