JAVA執行Kettle的JOB或者轉換的總結
第一步,連線資源庫
本例採用的是單例設計模式,這樣做的優點是如果資源庫已經連線,則不必重新連線
下面直接上程式碼
package com.qm.util;
import java.io.File; import java.io.IOException;
import org.jeecgframework.core.util.PropertiesUtil; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.plugins.DatabasePluginType; import org.pentaho.di.core.plugins.EnginePluginType; import org.pentaho.di.core.plugins.JobEntryPluginType; import org.pentaho.di.core.plugins.PluginFolder; import org.pentaho.di.core.plugins.RepositoryPluginType; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
public class RepBuilder{
private RepBuilder(){};
private static KettleDatabaseRepository rep=null;
public static KettleDatabaseRepository getInstance() throws KettleException{
if(rep==null){
synchronized(RepBuilder.class){
if(rep==null){
//初始化
//EnvUtil.environmentInit();
PropertiesUtil p = new PropertiesUtil("etl.properties
EnginePluginType.getInstance().getPluginFolders().add(pluginFolder); JobEntryPluginType.getInstance().getPluginFolders().add(pluginFolder); StepPluginType.getInstance().getPluginFolders().add(pluginFolder); DatabasePluginType.getInstance().getPluginFolders().add(pluginFolder); DatabasePluginType.getInstance().getPluginFolders().add(pluginFolderKaraf);
KettleEnvironment.init(); //資料庫連線元物件 DatabaseMeta dataMeta = new DatabaseMeta(p.readProperty("pdi.conName"), p.readProperty("pdi.conType"), p.readProperty("pdi.access"),p.readProperty("pdi.hostName"),p.readProperty("pdi.dataBaseName"), p.readProperty("pdi.portNumber"),p.readProperty("pdi.username"),p.readProperty("pdi.password")); //資料庫形式的資源庫元物件 KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta(); // repInfo.setConnection(dataMeta); //資料庫形式的資源庫物件 KettleDatabaseRepository rep = new KettleDatabaseRepository(); //用資源庫元物件初始化資源庫物件 rep.init(repInfo); //連線到資源庫 rep.connect(p.readProperty("rep.username"), p.readProperty("rep.password"));//預設的連線資源庫的使用者名稱和密碼 if(rep.isConnected()){ System.out.println("連線成功"); return rep; }else{ System.out.println("連線失敗"); return null; } } } } return rep; }
}
下面是etl.properties 的配置僅供參考
rep.name=pdires rep.username=admin rep.password=admin
#pdi.path=D:\\data-integration #pdi.pan=pan.bat #pdi.kitchen=kitchen.bat
pdi.dir=D:\\data-integration\\simple-jndi pdi.conName=pdires pdi.conType=Oracle pdi.access=JNDI pdi.hostName=10.6.197.219 pdi.dataBaseName=pdires pdi.portNumber=1521 pdi.username=pdires pdi.password=pdires pdi.plugins=D:\\data-integration\\plugins pdi.pluginsKaraf=D:\\data-integration\\system\\karaf\\system\\pentaho
然後建立物件Object obj = RepBuilder.getInstance();
如果obj!=null 則表明連線成功
第二部分,執行job或者轉換
/** * @author WGQ * 根據指定的資源庫物件及job名稱 執行指定的job * @param filename * @param parameters * @param dirName * @return * @throws KettleException * */ public static Result runJob(KettleDatabaseRepository rep,String jobName, String[] parameters, String filename, String dirName) throws Exception{ RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根據指定的字串路徑 找到目錄 //載入指定的job JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null); Job job = new Job(rep, jobMeta); //設定引數 if(parameters!=null){ for(String param : parameters){ String key = param.split("=")[0]; String value = param.substring(param.indexOf("=")+1); jobMeta.setParameterValue(key,value); } } if(!"".equals(filename)){ PropertiesUtil p = new PropertiesUtil("sysConfig.properties"); jobMeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename); } job.setLogLevel(LogLevel.BASIC); //啟動執行指定的job job.run(); Result result = job.getResult(); job.waitUntilFinished();//等待job執行完; job.setFinished(true); System.out.println(job.getResult()); return result; } /** * @author WGQ * * @param rep 資源庫物件 * @param transName 要呼叫的trans名稱 * * 呼叫資源庫中的trans * @param filename * @param parameters * @param dirName * @return * @throws KettleException * * */ public static Result runTrans(KettleDatabaseRepository rep,String transName, String[] parameters, String filename, String dirName) throws KettleException{ RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根據指定的字串路徑 找到目錄 TransMeta tmeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null); //設定引數 //tmeta.setParameterValue("", ""); if(parameters!=null){ for(String param : parameters){ String key = param.split("=")[0]; String value = param.substring(param.indexOf("=")+1); tmeta.setParameterValue(key,value); } } if(!"".equals(filename)){ PropertiesUtil p = new PropertiesUtil("sysConfig.properties"); tmeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename); } Trans trans = new Trans(tmeta); trans.setLogLevel(LogLevel.BASIC); trans.execute(null);//執行trans trans.waitUntilFinished(); if(trans.getErrors()>0){ System.out.println("有異常"); } Result result = trans.getResult(); return result; }
這裡講解下引數jobName 和tranName 是job名稱和轉換名稱,fileName 上傳的檔案的全路徑,parameters 引數,dirName 是job或者轉換所在的根目錄