1. 程式人生 > >Elastic Job入門示例-實現DataflowJob介面

Elastic Job入門示例-實現DataflowJob介面

1.接上篇內容:https://blog.csdn.net/seanme/article/details/80256460

2.本次介紹流式處理任務型別

 *  流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料
 *    a.流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;
 *    b.非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業
 

3.相關配置(Spring Boot)

import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 *  流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料
 **     a.流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;
 *      b.非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業
 * @param <T>
 */
public abstract  class DataflowJobTypeConfig<T> implements DataflowJob<T> {
    @Resource
    protected ZookeeperRegistryCenter regCenter;
    @Resource
    protected JobEventConfiguration jobEventConfiguration;


    /**
     * 作業啟動時間的cron表示式
     *
     * @return
     */
    abstract protected String getCron();

    /**
     * 作業 Listener
     *
     * @return
     */
    protected ElasticJobListener[] getJobListener(){
        return new ElasticJobListener[0];
    }

    /**
     * 作業分片總數,default 1;
     *
     * @return
     */
    protected int getShardingTotalCount()
    {
        return 1;
    }

    /**
     * 設定分片序列號和個性化引數對照表.
     *
     * <p>
     * 分片序列號和引數用等號分隔, 多個鍵值對用逗號分隔. 類似map. 分片序列號從0開始, 不可大於或等於作業分片總數. 如: 0=a,1=b,2=c
     * </p>
     *
     * @return
     */
    protected String getShardingItemParameters()
    {
        return "";
    }

    /**
     * 執行任務的Class
     * @return
     */
    abstract  protected Class getJobClass();

    @PostConstruct
    public void simpleJobScheduler() {
        new SpringJobScheduler(this,
                                regCenter,
                                getLiteJobConfiguration(this.getJobClass(), getCron(),getShardingTotalCount(), getShardingItemParameters()),
                                jobEventConfiguration,
                                getJobListener()).init();
    }

    @SuppressWarnings("rawtypes")
    protected LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJobTypeConfig> jobClass,
                                                           final String cron,
                                                           final int shardingTotalCount,
                                                           final String shardingItemParameters) {
        return LiteJobConfiguration
                .newBuilder(
                        new DataflowJobConfiguration(
                                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(),
                                jobClass.getCanonicalName(), true))
                .overwrite(true).build();
    }
}

4.樣例

import com.alibaba.fastjson.JSON;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 *  示例-流式處理型別
 */
@Configuration("DemoFlowTask")
public class DemoFlowTask extends DataflowJobTypeConfig<String> {

    @Override
    protected String getCron() {
        return "0 0/1 * * * ?";
    }

    @Override
    protected Class getJobClass() {
        return DemoFlowTask.class;
    }

    private int testLoopCount=0;// just for test
    /**
     *  流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;
     * @param shardingContext
     * @return
     */
    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        if(testLoopCount>=2){
            return null; //如果是返回NULL,則會停止停業
        }
        String execTime = "12292088575058";
        System.out.println(testLoopCount+"->testLoopCount *************1 DemoFlowTask fetchData executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext));
        List<String> demoList = new ArrayList<String>();
        demoList.add("" + execTime);
        testLoopCount++;
        try{
            Thread.sleep(1000*60);
        }catch (Exception ex){
            ex.printStackTrace();
        }
        System.out.println("*************2 DemoFlowTask fetchData end executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext));
        return demoList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<String> list) {

        String testData=list != null? list.get(0):"";
        System.out.println("*************3 DemoFlowTask processData executed:"+testData+"*******************************"+JSON.toJSONString(shardingContext));
    }

}