Kettle外掛開發流程
最近正好做了有關Kettle中外掛開發的工作,對Kettle外掛的原始碼進行了一定的研究,並開發了自定義的外掛,在此有些感悟,記錄下來。
一 Kettle外掛概述
Kettle的開發體系是基於外掛的,平臺本身提供了介面,開發者按照相關規範就可以開發出相應的外掛新增到Kettle中使用,感覺這個體系設計思路很不錯,非常有利於Kettle後續的擴充套件。
初次接觸Kettle外掛開發可以參考GitHub上有關外掛模板DummyPlugin的原始碼,通過對原始碼的分析,發現Kettle外掛開發的流程還是比較簡單的,以DummyPlugin為例,主要包括以下幾個類:
DummyPlugin.java
DummyPluginData.java
DummyPluginDialog.java
DummyPluginMeta.java
Kettle外掛的開發遵循了MVC設計模式,其中DummyPlugin.java類實現了Control功能,當轉換執行時,負責按照預設的邏輯處理輸入資料;DummyPluginDialog.java類實現了View功能,即對話方塊的實現;而DummyPluginData.java和DummyPluginMeta.java用來儲存使用者在對話方塊的配置引數,實現了Model功能。
二 Kettle中的Solr外掛開發
由於Kettle中沒有預先整合Solr外掛,因為專案開發的需要,對Solr外掛進行了編寫測試,主要功能是讀取輸入的資料流傳送到Solr叢集中,開發的外掛也比較簡單,分享一下。
其實主要就實現了3個類,SolrPluginMeta、SolrPluginDialog、SolrPlugin,分別實現Model、View、Control功能:
/**
* SolrPluginMeta 類主要用來儲存使用者的配置資料,頁面上的配置包括zk地址、collection名以及分割槽策略等
*/
public class SolrPluginMeta extends BaseStepMeta implements StepMetaInterface{
private String zkHost;
private String collectionName;
/**
* 分割槽策略 0:不分割槽 1:欄位分割槽 2:大小分割槽
*/
private String mode;
/**
* 選擇的分割槽欄位
*/
private String filedselected;
/**
* 每個shard的最大容量
*/
private long countsize;
public SolrPluginMeta(){
super();
}
public void setZkHost(String zkHost) {
this.zkHost = zkHost;
}
public String getZkHost() {
return zkHost;
}
public void setCollectionName(String collectionName) {
this.collectionName = collectionName;
}
public String getCollectionName() {
return collectionName;
}
public void setMode(String mode) {
this.mode = mode;
}
public String getMode() {
return mode;
}
public void setFiledselected(String filedselected) {
this.filedselected = filedselected;
}
public String getFiledselected() {
return filedselected;
}
public void setCountsize(long countsize) {
this.countsize = countsize;
}
public long getCountsize() {
return countsize;
}
/**
* 這個函式的作用是在複製SolrPluginMeta 時獲取原物件引數的
*/
public String getXML(){
StringBuilder retval = new StringBuilder();
retval.append("<values>").append(Const.CR);
retval.append(" ").append(XMLHandler.addTagValue("zkHost", zkHost));
retval.append(" ").append(XMLHandler.addTagValue("collectionName", collectionName));
retval.append(" ").append(XMLHandler.addTagValue("filedselected", filedselected));
retval.append(" ").append(XMLHandler.addTagValue("countsize", countsize));
retval.append(" ").append(XMLHandler.addTagValue("mode", mode));
retval.append("</values>").append(Const.CR);
return retval.toString();
}
public Object clone(){
return super.clone();
}
/**
* 複製物件時傳遞引數
*/
public void loadXML(Node stepnode, List<DatabaseMeta>
databases, Map<String,Counter> counters){
Node valnode = XMLHandler.getSubNode(stepnode, "values", "zkHost");
if(null!=valnode){
zkHost = valnode.getTextContent();
}
valnode = XMLHandler.getSubNode(stepnode, "values", "collectionName");
if(null!=valnode){
collectionName = valnode.getTextContent();
}
valnode = XMLHandler.getSubNode(stepnode, "values", "filedselected");
if(null!=valnode){
filedselected = valnode.getTextContent();
}
valnode = XMLHandler.getSubNode(stepnode, "values", "countsize");
if(null!=valnode){
countsize = Long.parseLong(valnode.getTextContent());
}
valnode = XMLHandler.getSubNode(stepnode, "values", "mode");
if(null!=valnode){
mode = valnode.getTextContent();
}
}
@Override
public void setDefault() {
this.zkHost = "localhost:2181,localhost:2182,localhost:2183";
this.collectionName = "collection1234";
this.mode = "0";
}
public StepDialogInterface getDialog(Shell shell, StepMetaInterface meta,
TransMeta transMeta, String name){
return new SolrPluginDialog(shell, meta, transMeta, name);
}
@Override
public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface,
int cnr, TransMeta transMeta, Trans disp) {
return new SolrPlugin(stepMeta, stepDataInterface, cnr, transMeta, disp);
}
@Override
public StepDataInterface getStepData() {
return new SolrPluginData();
}
}
SolrPluginMeta 類儲存了使用者的配置資訊,在開發中Solr的分割槽分為三種模式:不分割槽(所有記錄傳送到一個shard中)、欄位分割槽(每個欄位單獨一個分割槽)、大小分割槽(指定數量的記錄劃分在一個分割槽中)。
當從Kettle中拖拽一個外掛到面板上時,其實就生成了一個SolrPluginMeta 物件,這個物件將儲存使用者在對話方塊中輸入的配置資訊,而當轉換執行時,Kettle會重新生成一個SolrPluginMeta 物件並獲取原物件的配置引數(這一點我還不太明白為啥Kettle採用這種方式),因此getXML()和loadXML函式就是在複製配置引數時使用的。
SolrPluginDialog就是編寫對話方塊供使用者輸入引數,並且將引數儲存到SolrPluginMeta中,具體程式碼就不帖出來了。
SolrPlugin類也比較簡單,是轉換操作的核心邏輯,其實主要的方法就是processRow,Kettle中資料按照流的形式傳遞,因此processRow方法會分批次對輸入流進行處理。
public class SolrPlugin extends BaseStep implements StepInterface{
private SolrPluginData data;
private SolrPluginMeta meta;
/**
* Zk叢集地址
*/
private String zkHost;
/**
* collection名
*/
private String collectionName;
/**
* 輸入的資料總量
*/
private long send_count = 0l;
/**
* 欄位名列表
*/
private String[] fieldNames;
/**
* shard與傳送文件的對映
*/
private Map<String, List<SolrInputDocument>> send_list;
/**
* 當前shard與hostIp的對映
*/
private Map<String, String> shard_hostIp;
/**
* shard與對應的solr地址的對映
*/
private Map<String, HttpSolrClient> solrserver_url;
public SolrPlugin(StepMeta s, StepDataInterface stepDataInterface,
int c, TransMeta t, Trans dis){
super(s,stepDataInterface,c,t,dis);
}
public boolean init(StepMetaInterface smi, StepDataInterface sdi){
meta = (SolrPluginMeta)smi;
data = (SolrPluginData)sdi;
return super.init(smi, sdi);
}
public void dispose(StepMetaInterface smi, StepDataInterface sdi){
meta = (SolrPluginMeta)smi;
data = (SolrPluginData)sdi;
super.dispose(smi, sdi);
}
/**
* 獲取route欄位
* @param mode 分割槽策略
* @param doc 輸入文件
* @param site 文件位置
* @return
*/
public String getRoute(String mode, SolrInputDocument doc, long site){
//不分割槽模式
if(mode.equals("0")){
return "shard1";
//欄位分割槽
}else if(mode.equals("1")){
String filed = meta.getFiledselected();
String shardname = doc.getFieldValue(filed)==null ? "shard1" :
doc.getFieldValue(filed).toString();
return PinyinUtil.getInstance().getStringPinyin(shardname);
//大小分割槽
}else{
long shard_num = meta.getCountsize();
int index = (int)(site/shard_num)+1;
return "shard"+index;
}
}
/**
* 傳送本地快取的list至solr中
* @param doclist
*/
public void sendList(Map<String, List<SolrInputDocument>> doclist) throws KettleException{
if(null==doclist){
return;
}
for(String shard : doclist.keySet()){
if(StringUtils.isEmpty(shard)){
continue;
}
//獲取該shard對應的hostIp
String hostIp = shard_hostIp.get(shard);
if(StringUtils.isEmpty(hostIp)){
logBasic("準備建立shard:"+shard);
SolrService.createShard(zkHost, collectionName, shard, this);
hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, shard, this);
shard_hostIp.put(shard, hostIp);
}
//獲取shard對應的url
HttpSolrClient client = solrserver_url.get(shard);
if(client==null){
String url = "http://"+hostIp+"/solr/"+collectionName;
client = new HttpSolrClient(url);
solrserver_url.put(shard, client);
}
long time = System.currentTimeMillis();
//待發送的文件集合
List<SolrInputDocument> list = doclist.get(shard);
if(list.size()<=0){
continue;
}
try {
client.add(list);
client.commit();
} catch (Exception e) {
logError(String.format("傳送到shard:%s出錯,地址:%s,原因:%s",
shard, client.getBaseURL(), e.getMessage()));
throw new KettleException(e.getMessage());
}
logBasic(String.format("成功傳送到%s %s條記錄, 耗時%s毫秒", shard, list.size(),
System.currentTimeMillis()-time));
list.clear();
}
}
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{
meta = (SolrPluginMeta)smi;
data = (SolrPluginData)sdi;
//獲取上一個步驟的輸入流
Object[] r=getRow();
if(r==null){
logBasic("無輸入資料");
sendList(send_list);
setOutputDone();
return false;
}
//first為true則表明是第一行資料,可以在此完成相關的初始化工作
if(first){
first = false;
zkHost = meta.getZkHost();
collectionName = meta.getCollectionName();
SolrService.createCollection(zkHost, collectionName, this);
//獲取輸入欄位名集合
RowMetaInterface fields = getInputRowMeta().clone();
fieldNames = fields.getFieldNames();
for(String o : fieldNames){
logBasic(o);
}
send_list = new HashMap<String, List<SolrInputDocument>>();
solrserver_url = new HashMap<String, HttpSolrClient>();
shard_hostIp = new HashMap<String, String>();
String hostIp = SolrService.getCollectionShardInfo(zkHost, collectionName, "shard1", this);
shard_hostIp.put("shard1", hostIp);
}
if(r.length<fieldNames.length){
logError("輸入資料有誤, 本次資料忽略");
return true;
}
//儲存輸入資料至document
SolrInputDocument input = new SolrInputDocument();
for(int i=0; i<fieldNames.length; i++){
input.addField(fieldNames[i], r[i]);
}
String shardname = getRoute(meta.getMode(), input, ++send_count);
input.addField("_route_", shardname);
List<SolrInputDocument> documentlist = send_list.get(shardname);
if(documentlist==null){
documentlist = new ArrayList<SolrInputDocument>();
send_list.put(shardname, documentlist);
}
documentlist.add(input);
if(send_count%20000==0){
sendList(send_list);
}
return true;
}
}
processRow返回true則表明資料處理沒有結束,則Kettle會繼續呼叫processRow處理輸入資料;返回false則表明處理完成,記住在返回false之前要呼叫基類的setOutputDone()方法。
三 外掛部署到Kettle中
原始碼寫好後,打成jar包,接下來還要編寫plugin.xml配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<plugin id="SolrPlugin" iconfile="solr.png" description="SolrPlugin"
tooltip="This is a solr plugin step" category="TestDemo"
classname="com.iflytek.kettle.solr.SolrPluginMeta" >
<libraries>
<library name="SolrPlugin.jar"/>
</libraries>
<localized_category>
<category locale="en_US">TestDemo</category>
<category locale="zh_CN">外掛測試</category>
</localized_category>
<localized_description>
<description locale="en_US">SolrPlugin</description>
</localized_description>
<localized_tooltip>
<tooltip locale="en_US">傳送記錄到Solr中</tooltip>
</localized_tooltip>
</plugin>
其中的id是外掛註冊的標識,iconfile指定了外掛的圖示,classname指定了外掛的入口類,就是SolrPluginMeta類;指定了外掛在Kettle左側列表中的位置。將打好的jar包、plugin.xml配置檔案、圖示等放置在單獨的資料夾中,並將該資料夾Kettle目錄下的plugins\steps中(如果沒有steps目錄則新建),重啟Kettle就可看到自定義的外掛:
四 總結
Kettle外掛的開發並不複雜,掌握了基本的開發流程就可以自己開發需要的外掛了,寫得比較亂,歡迎各位多多交流指正。