使用Kettle API實現資料的遷移
阿新 • • 發佈:2018-11-11
利用kettle的api,將一個數據源中的資訊匯入到另外一個數據源中:
package cn.com.saidi.job; import org.apache.commons.io.FileUtils; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleDatabaseException; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import java.io.File; public class TransDemo { public static TransDemo transDemo; /** * 兩個庫中的表名 */ public static String bjdt_tablename = "test1"; public static String kettle_tablename = "test2"; /** * 資料庫連線資訊,適用於DatabaseMeta其中 一個構造器DatabaseMeta(String xml) */ public static final String[] databasesXML = { "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>bjdt</name>" + "<server>192.168.1.122</server>" + "<type>Mysql</type>" + "<access>Native</access>" + "<database>daiqiaobing</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>kettle</name>" + "<server>192.168.1.122</server>" + "<type>Mysql</type>" + "<access>Native</access>" + "<database>daiqiaobing</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>" }; public static void main(String[] args) { try { KettleEnvironment.init(); transDemo = new TransDemo(); TransMeta transMeta = transDemo.generateMyOwnTrans(); String transXml = transMeta.getXML(); String transName = "etl/update_insert_Trans.ktr"; File file = new File(transName); FileUtils.writeStringToFile(file, transXml, "UTF-8"); System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]); } catch (Exception e) { e.printStackTrace(); return; } } /** * 生成一個轉化,把一個數據庫中的資料轉移到另一個數據庫中,只有兩個步驟,第一個是表輸入,第二個是表插入與更新操作 * @return * @throws KettleXMLException */ public TransMeta generateMyOwnTrans() throws KettleXMLException, KettleDatabaseException { System.out.println("************start to generate my own transformation***********"); TransMeta transMeta = new TransMeta(); //設定轉化的名稱 transMeta.setName("insert_update"); //新增轉換的資料庫連線 for (int i=0;i<databasesXML.length;i++){ DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); transMeta.addDatabase(databaseMeta); } //registry是給每個步驟生成一個標識Id用 PluginRegistry registry = PluginRegistry.getInstance(); //第一個表輸入步驟(TableInputMeta) TableInputMeta tableInput = new TableInputMeta(); String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); //給表輸入新增一個DatabaseMeta連線資料庫 DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt"); tableInput.setDatabaseMeta(database_bjdt); String select_sql = "SELECT name FROM "+bjdt_tablename; tableInput.setSQL(select_sql); //新增TableInputMeta到轉換中 StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"table input",tableInput); //給步驟新增在spoon工具中的顯示位置 tableInputMetaStep.setDraw(true); tableInputMetaStep.setLocation(100, 100); transMeta.addStep(tableInputMetaStep); //第二個步驟插入與更新 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta); //新增資料庫連線 DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); insertUpdateMeta.setDatabaseMeta(database_kettle); //設定操作的表 insertUpdateMeta.setTableName(kettle_tablename); //設定用來查詢的關鍵字 insertUpdateMeta.setKeyLookup(new String[]{"name"}); insertUpdateMeta.setKeyStream(new String[]{"name"}); insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上 insertUpdateMeta.setKeyCondition(new String[]{"="}); //設定要更新的欄位 String[] updatelookup = {"name"} ; String [] updateStream = {"name"}; Boolean[] updateOrNot = {true}; insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); String[] lookup = insertUpdateMeta.getUpdateLookup(); //新增步驟到轉換中 StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,"insert_update",insertUpdateMeta); insertUpdateStep.setDraw(true); insertUpdateStep.setLocation(250,100); transMeta.addStep(insertUpdateStep); //****************************************************************** //****************************************************************** //新增hop把兩個步驟關聯起來 transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep)); System.out.println("***********the end************"); return transMeta; } }
上述操作將會產生一個ktr檔案,接下來的操作是對ktr檔案進行轉換:
public static void main(String[] args) throws KettleException { //初始化ketlle KettleEnvironment.init(); //建立轉換元資料物件 TransMeta meta = new TransMeta("etl/update_insert_Trans.ktr"); Trans trans = new Trans(meta); trans.prepareExecution(null); trans.startThreads(); trans.waitUntilFinished(); if(trans.getErrors()!=0){ System.out.println("執行失敗!"); } }