1. 程式人生 > >JAVA執行Kettle的JOB或者轉換的總結

JAVA執行Kettle的JOB或者轉換的總結

第一步,連線資源庫

本例採用的是單例設計模式,這樣做的優點是如果資源庫已經連線,則不必重新連線

下面直接上程式碼

package com.qm.util;

import java.io.File; import java.io.IOException;

import org.jeecgframework.core.util.PropertiesUtil; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.plugins.DatabasePluginType; import org.pentaho.di.core.plugins.EnginePluginType; import org.pentaho.di.core.plugins.JobEntryPluginType; import org.pentaho.di.core.plugins.PluginFolder; import org.pentaho.di.core.plugins.RepositoryPluginType; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.repository.kdr.KettleDatabaseRepository; import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;

public class RepBuilder{          private RepBuilder(){};     private static KettleDatabaseRepository rep=null;     public static KettleDatabaseRepository getInstance() throws KettleException{         if(rep==null){             synchronized(RepBuilder.class){                 if(rep==null){                      //初始化                       //EnvUtil.environmentInit();                     PropertiesUtil p = new PropertiesUtil("etl.properties

");                     File   file = new File(p.readProperty("pdi.dir"));//這個path就是jdbc.prtoperties檔案的配置路徑,指向上層資料夾路徑即可,建議是絕對路徑。                     String path;                     try {                         path = file.getCanonicalPath();                         Const.JNDI_DIRECTORY = path;                         System.setProperty( "java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory" );                         System.setProperty( "org.osjava.sj.root", path);                         System.setProperty( "org.osjava.sj.delimiter", "/" );                         /*System.setProperty( "KETTLE_HOME", "C:\\Users\\wgq\\.kettle" );*/                     } catch (IOException e) {                         // TODO Auto-generated catch block                         e.printStackTrace();                     }                     //載入外掛                     PluginFolder pluginFolder = new PluginFolder(p.readProperty("pdi.plugins"),true,true);                     PluginFolder pluginFolderKaraf = new PluginFolder(p.readProperty("pdi.pluginsKaraf"),true,true);

                    EnginePluginType.getInstance().getPluginFolders().add(pluginFolder);                     JobEntryPluginType.getInstance().getPluginFolders().add(pluginFolder);                     StepPluginType.getInstance().getPluginFolders().add(pluginFolder);                     DatabasePluginType.getInstance().getPluginFolders().add(pluginFolder);                     DatabasePluginType.getInstance().getPluginFolders().add(pluginFolderKaraf);

                    KettleEnvironment.init();                       //資料庫連線元物件                                                     DatabaseMeta dataMeta = new DatabaseMeta(p.readProperty("pdi.conName"), p.readProperty("pdi.conType"),                              p.readProperty("pdi.access"),p.readProperty("pdi.hostName"),p.readProperty("pdi.dataBaseName"),                             p.readProperty("pdi.portNumber"),p.readProperty("pdi.username"),p.readProperty("pdi.password"));                     //資料庫形式的資源庫元物件                       KettleDatabaseRepositoryMeta repInfo = new KettleDatabaseRepositoryMeta();                       //                       repInfo.setConnection(dataMeta);                                              //資料庫形式的資源庫物件                        KettleDatabaseRepository rep = new KettleDatabaseRepository();                       //用資源庫元物件初始化資源庫物件                       rep.init(repInfo);                       //連線到資源庫                       rep.connect(p.readProperty("rep.username"), p.readProperty("rep.password"));//預設的連線資源庫的使用者名稱和密碼                       if(rep.isConnected()){                           System.out.println("連線成功");                           return rep;                       }else{                           System.out.println("連線失敗");                           return null;                       }                   }             }         }         return rep;     }   

}

下面是etl.properties 的配置僅供參考

rep.name=pdires rep.username=admin rep.password=admin

#pdi.path=D:\\data-integration #pdi.pan=pan.bat #pdi.kitchen=kitchen.bat

pdi.dir=D:\\data-integration\\simple-jndi pdi.conName=pdires pdi.conType=Oracle pdi.access=JNDI pdi.hostName=10.6.197.219 pdi.dataBaseName=pdires pdi.portNumber=1521 pdi.username=pdires pdi.password=pdires pdi.plugins=D:\\data-integration\\plugins pdi.pluginsKaraf=D:\\data-integration\\system\\karaf\\system\\pentaho

然後建立物件Object obj =    RepBuilder.getInstance();

如果obj!=null 則表明連線成功

第二部分,執行job或者轉換

 /**       * @author WGQ       * 根據指定的資源庫物件及job名稱   執行指定的job       * @param filename       * @param parameters       * @param dirName       * @return       * @throws KettleException       * */       public static Result runJob(KettleDatabaseRepository rep,String jobName, String[] parameters, String filename, String dirName) throws Exception{                                RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根據指定的字串路徑 找到目錄                   //載入指定的job                   JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null);                   Job job = new Job(rep, jobMeta);                   //設定引數                   if(parameters!=null){                                          for(String param : parameters){                         String key = param.split("=")[0];                         String value = param.substring(param.indexOf("=")+1);                         jobMeta.setParameterValue(key,value);                     }                 }                 if(!"".equals(filename)){                     PropertiesUtil p = new PropertiesUtil("sysConfig.properties");                     jobMeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename);                 }                 job.setLogLevel(LogLevel.BASIC);                   //啟動執行指定的job                   job.run();                  Result result = job.getResult();                 job.waitUntilFinished();//等待job執行完;                   job.setFinished(true);                   System.out.println(job.getResult());               return result;     }     /**       * @author WGQ       *        * @param rep  資源庫物件       * @param transName 要呼叫的trans名稱       *        * 呼叫資源庫中的trans       * @param filename       * @param parameters       * @param dirName       * @return       * @throws KettleException       *        * */       public static Result runTrans(KettleDatabaseRepository rep,String transName, String[] parameters, String filename, String dirName) throws KettleException{                                  RepositoryDirectoryInterface dir = rep.findDirectory(dirName);//根據指定的字串路徑 找到目錄               TransMeta tmeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null);               //設定引數               //tmeta.setParameterValue("", "");               if(parameters!=null){                                  for(String param : parameters){                     String key = param.split("=")[0];                     String value = param.substring(param.indexOf("=")+1);                     tmeta.setParameterValue(key,value);                 }             }             if(!"".equals(filename)){                 PropertiesUtil p = new PropertiesUtil("sysConfig.properties");                 tmeta.setParameterValue("filename", p.readProperty("webUploadpath")+filename);             }             Trans trans = new Trans(tmeta);               trans.setLogLevel(LogLevel.BASIC);              trans.execute(null);//執行trans               trans.waitUntilFinished();               if(trans.getErrors()>0){                   System.out.println("有異常");               }               Result result = trans.getResult();             return result;     } 

這裡講解下引數jobName 和tranName 是job名稱和轉換名稱,fileName 上傳的檔案的全路徑,parameters 引數,dirName 是job或者轉換所在的根目錄