Kettle學習筆記012之使用Java執行Kettle作業
阿新 • • 發佈:2018-10-31
前言:
Kettle提供一些API,Java通過呼叫這些API去執行Kettle作業,轉換。
第一步:搭建Kettle執行環境
從data-integration\lib\目錄下複製部分核心jar包出來,匯入到java專案(jdk1.8)中。
所需jar包如下(不要忘了資料庫連線驅動):
2. 程式碼示例(作業,轉換,資源庫)
ackage com.staroon.kettle.exec;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.util.HashMap;
import java.util.Map;
public class RunKettleJob {
public static void main(String[] args) {
// 引數列表
Map<String
params.put("filename", "runjob");
params.put("extend", "txt");
String kjbPath = "D:/kettle/jobs/Kettledoc/常規作業示例.kjb";
String ktrPath = "D:/kettle/jobs/Kettledoc/轉換任務.ktr";
// runJob(params, kjbPath);
// runTrans(params, ktrPath);
runRepoJob(params);
}
/**
* 執行資源庫中的作業
*
* @param params 作業引數
*/
public static void runRepoJob(Map<String, String> params) {
try {
KettleEnvironment.init();
KettleDatabaseRepository repository = new KettleDatabaseRepository();
// 配置資源庫資料庫連線資訊
DatabaseMeta databaseMeta = new DatabaseMeta(
"kettle",
"mysql",
"jdbc",
"127.0.0.1",
"kettle",
"3308",
"root",
"lwsjfwq"
);
// 配置連線引數,指定連線編碼為UTF8,若不指定則不能讀取中文目錄或者中文名作業
databaseMeta.getAttributes().put("EXTRA_OPTION_MYSQL.characterEncoding", "utf8");
// 連線測試
if (databaseMeta.testConnection().startsWith("正確")) {
System.out.println("資料庫連線成功");
} else {
System.out.println("資料庫連線失敗");
return;
}
// 配置資源庫
KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta(
"kettle",
"kettle",
"Kettle Repository",
databaseMeta
);
repository.init(repositoryMeta);
// 連線資源庫
repository.connect("admin", "admin");
// 指定job或者trans所在的目錄
RepositoryDirectoryInterface dir = repository.findDirectory("/批處理/");
// 選擇資源庫中的作業
JobMeta jobMeta = repository.loadJob("資源庫作業示例", dir, null, null);
// 配置作業引數
for (String param : params.keySet()) {
jobMeta.setParameterValue(param, params.get(param));
}
Job job = new Job(repository, jobMeta);
job.setLogLevel(LogLevel.DEBUG);
//執行作業
job.start();
//等待作業執行結束
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("作業執行出錯");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 執行轉換檔案
*
* @param params 轉換引數
* @param ktrPath 轉換檔案的路徑,字尾ktr
*/
public static void runTrans(Map<String, String> params, String ktrPath) {
try {
// 初始化
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(ktrPath);
// 配置引數
for (String param : params.keySet()) {
transMeta.setParameterValue(param, params.get(param));
}
Trans trans = new Trans(transMeta);
// 設定日誌級別
trans.setLogLevel(LogLevel.DEBUG);
// 執行轉換
trans.execute(null);
// 等待轉換執行結束
trans.waitUntilFinished();
// 丟擲異常
if (trans.getErrors() > 0) {
throw new Exception("轉換執行出錯");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 執行作業檔案
*
* @param params 作業引數
* @param kjbPath 作業檔案路徑,字尾kjb
*/
public static void runJob(Map<String, String> params, String kjbPath) {
try {
KettleEnvironment.init();
JobMeta jobMeta = new JobMeta(kjbPath, null);
// 配置作業引數
for (String param : params.keySet()) {
jobMeta.setParameterValue(param, params.get(param));
}
// 配置變數
// jobMeta.setVariable("name","value");
Job job = new Job(null, jobMeta);
// 設定日誌級別
job.setLogLevel(LogLevel.DEBUG);
// 啟動作業
job.start();
// 等待作業執行完畢
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("作業執行出錯");
}
} catch (Exception e) {
e.printStackTrace();
}
}