1. 程式人生 > >Kettle學習筆記012之使用Java執行Kettle作業

Kettle學習筆記012之使用Java執行Kettle作業

前言:

Kettle提供一些API,Java通過呼叫這些API去執行Kettle作業,轉換。

 

第一步:搭建Kettle執行環境

data-integration\lib\目錄下複製部分核心jar包出來,匯入到java專案(jdk1.8)中。

所需jar包如下(不要忘了資料庫連線驅動):

 

2. 程式碼示例(作業,轉換,資源庫)

ackage com.staroon.kettle.exec;

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta

;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta
;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.util.HashMap;
import java.util.Map;

public class RunKettleJob {
    public static void main(String[] args) {

        // 引數列表
        Map<String
, String> params = new HashMap<>();
        params.put("filename", "runjob");
        params.put("extend", "txt");

        String kjbPath = "D:/kettle/jobs/Kettledoc/常規作業示例.kjb";
        String ktrPath = "D:/kettle/jobs/Kettledoc/轉換任務.ktr";
//        runJob(params, kjbPath);
//        runTrans(params, ktrPath);
        runRepoJob(params);
    }

    /**
     * 執行資源庫中的作業
     *
     * @param params 作業引數
     */
    public static void runRepoJob(Map<String, String> params) {
        try {
            KettleEnvironment.init();
            KettleDatabaseRepository repository = new KettleDatabaseRepository();
            // 配置資源庫資料庫連線資訊
            DatabaseMeta databaseMeta = new DatabaseMeta(
                    "kettle",
                    "mysql",
                    "jdbc",
                    "127.0.0.1",
                    "kettle",
                    "3308",
                    "root",
                    "lwsjfwq"
            );

            // 配置連線引數,指定連線編碼為UTF8,若不指定則不能讀取中文目錄或者中文名作業
            databaseMeta.getAttributes().put("EXTRA_OPTION_MYSQL.characterEncoding", "utf8");

            // 連線測試
            if (databaseMeta.testConnection().startsWith("正確")) {
                System.out.println("資料庫連線成功");
            } else {
                System.out.println("資料庫連線失敗");
                return;
            }

            // 配置資源庫
            KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta(
                    "kettle",
                    "kettle",
                    "Kettle Repository",
                    databaseMeta
            );
            repository.init(repositoryMeta);
            // 連線資源庫
            repository.connect("admin", "admin");

            // 指定job或者trans所在的目錄
            RepositoryDirectoryInterface dir = repository.findDirectory("/批處理/");

            // 選擇資源庫中的作業
            JobMeta jobMeta = repository.loadJob("資源庫作業示例", dir, null, null);

            // 配置作業引數
            for (String param : params.keySet()) {
                jobMeta.setParameterValue(param, params.get(param));
            }

            Job job = new Job(repository, jobMeta);
            job.setLogLevel(LogLevel.DEBUG);
            //執行作業
            job.start();
            //等待作業執行結束
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("作業執行出錯");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 執行轉換檔案
     *
     * @param params  轉換引數
     * @param ktrPath 轉換檔案的路徑,字尾ktr
     */
    public static void runTrans(Map<String, String> params, String ktrPath) {
        try {
            // 初始化
            KettleEnvironment.init();
            EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(ktrPath);

            // 配置引數
            for (String param : params.keySet()) {
                transMeta.setParameterValue(param, params.get(param));
            }

            Trans trans = new Trans(transMeta);

            // 設定日誌級別
            trans.setLogLevel(LogLevel.DEBUG);

            // 執行轉換
            trans.execute(null);
            // 等待轉換執行結束
            trans.waitUntilFinished();
            // 丟擲異常
            if (trans.getErrors() > 0) {
                throw new Exception("轉換執行出錯");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 執行作業檔案
     *
     * @param params  作業引數
     * @param kjbPath 作業檔案路徑,字尾kjb
     */
    public static void runJob(Map<String, String> params, String kjbPath) {
        try {
            KettleEnvironment.init();
            JobMeta jobMeta = new JobMeta(kjbPath, null);

            // 配置作業引數
            for (String param : params.keySet()) {
                jobMeta.setParameterValue(param, params.get(param));
            }
            // 配置變數
            // jobMeta.setVariable("name","value");

            Job job = new Job(null, jobMeta);

            // 設定日誌級別
            job.setLogLevel(LogLevel.DEBUG);

            // 啟動作業
            job.start();
            // 等待作業執行完畢
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("作業執行出錯");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }