1. 程式人生 > >ETL工具kettle與JAVA結合使用程式生成轉換

ETL工具kettle與JAVA結合使用程式生成轉換

    最近公司領導安排打造ETL支撐平臺,使用ETL工具kettle6.0對資料庫的資料進行清洗,使用工具spoon來使用一些圖形化的操作比較簡單,抽空研究了下用使用kettle的一些jar包,把kettle結合到java(web專案一樣)程式中。留作一個記錄,以後備用檢視。
    先看看網站上下了很多資料看http://infocenter.pentaho.com/help/index.jsp?topic=%2Fcat_dev_guides%2Ftop_dev_guides.html(主要看Developer Guides/Embedding and Extending Pentaho Data Integration/...)
    下載kettle的api和原始碼看看,也可以幫你解決不少問題的。
    下面是我自己寫的一個生成.ktr檔案的程式碼。
    (新增的jar包,我也沒有太多的注意,看例子加入(有些可能沒有必要,可以嘗試的去掉一些測試下)的:
avalon-framework-4.1.3.jar
commons-collections-3.2.jar
commons-io-1.4.jar
commons-lang-2.4.jar
commons-logging-1.1.jar
commons-vfs-20091118-pentaho.jar
kettle-core-4.4.0-GA.jar
kettle-db-4.4.0-GA.jar
kettle-engine-4.4.0-GA.jar
log4j-1.2.12.jar
logkit-1.0.1.jar
servlet-api-2.3.jar)
package com.jeefw.test.testKettle;
import java.io.File;

import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.w3c.dom.Document;

import com.dareway.framework.util.KettleUtil;
public class TransDemo {
	public static TransDemo transDemo;
    
    /**
     * 兩個庫中的表名
     */
    public static String bjdt_tablename = "wr";
    public static String kettle_tablename = "wr";
     
   /**
    * 資料庫連線資訊,適用於DatabaseMeta其中 一個構造器DatabaseMeta(String xml)
    */
    public static final String[] databasesXML = {
           "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
             "<connection>" +
               "<name>bjdt</name>" +
               "<server>127.0.0.1</server>" +
               "<type>Oracle</type>" +
               "<access>Native</access>" + 
               "<database>orcl</database>" +
               "<port>1521</port>" +
               "<username>scott</username>" +
               "<password>tiger</password>" +
               
			 "<attributes>"+
			 "<attribute><code>EXTRA_OPTION_ORACLE.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
			 "</attributes>"+  
             
             "</connection>",
             "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
             "<connection>" +
               "<name>kettle</name>" +
               "<server>127.0.0.1</server>" +
               "<type>Mysql</type>" +
               "<access>Native</access>" + 
               "<database>jeefw</database>" +
               "<port>3306</port>" +
               "<username>root</username>" +
               "<password>root</password>" +
               
               "<attributes>"+
               "<attribute><code>EXTRA_OPTION_MYSQL.characterEncoding</code><attribute>utf-8</attribute></attribute>"+
               "</attributes>"+
               
             "</connection>"
       };   
   /**
    * @param args
    */
   public static void main(String[] args) {
       try {
           KettleEnvironment.init();
           transDemo = new TransDemo();
           TransMeta transMeta = transDemo.generateMyOwnTrans();
           String transXml = transMeta.getXML();
           
           String transName = "update_insert_Trans.ktr";
           File file = new File(transName);
           FileUtils.writeStringToFile(file, transXml, "UTF-8");
//           System.out.println("transXml:"+transXml);   
//           System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);
           
           String xmls = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n" + transMeta.getXML().toString();
           Document doc = XMLHandler.loadXMLString(xmls);
           
           KettleDatabaseRepository repository = KettleUtil.getRepository();
           
           TransMeta tm = new TransMeta();
   	       tm.loadXML(doc.getDocumentElement(), repository, false);
           tm.setRepositoryDirectory(repository.findDirectory("/"));


           Trans trans=new Trans(tm);
	       trans.execute(null);
	       //trans.setParameterValue("characterEncoding", "utf-8");
	        
	        
       } catch (Exception e) {
           e.printStackTrace();
           return;
       }
        
   }
    
   /**
    * 生成一個轉化,把一個數據庫中的資料轉移到另一個數據庫中,只有兩個步驟,第一個是表輸入,第二個是表插入與更新操作
    * @return
 * @throws KettleException 
    */
   public TransMeta generateMyOwnTrans() throws KettleException{
        
       System.out.println("************start to generate my own transformation***********");
        
       TransMeta transMeta = new TransMeta();
        
       //設定轉化的名稱 
       transMeta.setName("insert_update");
        
       //新增轉換的資料庫連線
       for (int i=0;i<databasesXML.length;i++){
           DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
          
           transMeta.addDatabase(databaseMeta);
       }
        
       //registry是給每個步驟生成一個標識Id用
       PluginRegistry registry = PluginRegistry.getInstance();
        
       //******************************************************************
        
       //第一個表輸入步驟(TableInputMeta)
       TableInputMeta tableInput = new TableInputMeta();
       String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
       //給表輸入新增一個DatabaseMeta連線資料庫
       DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
       
//       java.util.Properties  p = new java.util.Properties();
//       p.setProperty("attribute", value)
//
//       database_bjdt.setAttributes(attributes);
       
       tableInput.setDatabaseMeta(database_bjdt);
       String select_sql = "SELECT * FROM "+bjdt_tablename;
       tableInput.setSQL(select_sql);
        
       //新增TableInputMeta到轉換中
       StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput);
        
       //給步驟新增在spoon工具中的顯示位置
       tableInputMetaStep.setDraw(true);
       tableInputMetaStep.setLocation(100, 100);
        
       transMeta.addStep(tableInputMetaStep);
       //******************************************************************
        
       //******************************************************************
       //第二個步驟插入與更新
       InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
       String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta);
       //新增資料庫連線
       DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
       
       
       insertUpdateMeta.setDatabaseMeta(database_kettle);
       //設定操作的表
       insertUpdateMeta.setTableName(kettle_tablename);
        
       //設定用來查詢的關鍵字
       insertUpdateMeta.setKeyLookup(new String[]{"ID"});
       insertUpdateMeta.setKeyStream(new String[]{"ID"});
       insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上
       insertUpdateMeta.setKeyCondition(new String[]{"="});
        
       //設定要更新的欄位
       String[] updatelookup = {"ID","dept_no","dept_name","dept_sex","dept_addr"} ;
        String [] updateStream = {"id","dept_no","dept_name","dept_sex","dept_addr"};
        Boolean[] updateOrNot = {false,true,true,true,true,true,true};
        insertUpdateMeta.setUpdateLookup(updatelookup);
       insertUpdateMeta.setUpdateStream(updateStream);
       insertUpdateMeta.setUpdate(updateOrNot);
       String[] lookup = insertUpdateMeta.getUpdateLookup();
       //System.out.println("******:"+lookup[1]);
       //System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML());
       //新增步驟到轉換中
       StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta);
       insertUpdateStep.setDraw(true);
       insertUpdateStep.setLocation(250,100);
       transMeta.addStep(insertUpdateStep);
       
       //******************************************************************
        
       //******************************************************************
       //新增hop把兩個步驟關聯起來
       transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
       System.out.println("***********the end************");
       return transMeta;
   }
}