Elastic Job入門示例-實現DataflowJob介面
阿新 • • 發佈:2019-01-01
1.接上篇內容:https://blog.csdn.net/seanme/article/details/80256460
2.本次介紹流式處理任務型別
* 流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料
* a.流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;
* b.非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業
3.相關配置(Spring Boot)
import com.dangdang.ddframe.job.api.dataflow.DataflowJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.event.JobEventConfiguration; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * 流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料 ** a.流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; * b.非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業 * @param <T> */ public abstract class DataflowJobTypeConfig<T> implements DataflowJob<T> { @Resource protected ZookeeperRegistryCenter regCenter; @Resource protected JobEventConfiguration jobEventConfiguration; /** * 作業啟動時間的cron表示式 * * @return */ abstract protected String getCron(); /** * 作業 Listener * * @return */ protected ElasticJobListener[] getJobListener(){ return new ElasticJobListener[0]; } /** * 作業分片總數,default 1; * * @return */ protected int getShardingTotalCount() { return 1; } /** * 設定分片序列號和個性化引數對照表. * * <p> * 分片序列號和引數用等號分隔, 多個鍵值對用逗號分隔. 類似map. 分片序列號從0開始, 不可大於或等於作業分片總數. 如: 0=a,1=b,2=c * </p> * * @return */ protected String getShardingItemParameters() { return ""; } /** * 執行任務的Class * @return */ abstract protected Class getJobClass(); @PostConstruct public void simpleJobScheduler() { new SpringJobScheduler(this, regCenter, getLiteJobConfiguration(this.getJobClass(), getCron(),getShardingTotalCount(), getShardingItemParameters()), jobEventConfiguration, getJobListener()).init(); } @SuppressWarnings("rawtypes") protected LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJobTypeConfig> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration .newBuilder( new DataflowJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)) .overwrite(true).build(); } }
4.樣例
import com.alibaba.fastjson.JSON; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; /** * 示例-流式處理型別 */ @Configuration("DemoFlowTask") public class DemoFlowTask extends DataflowJobTypeConfig<String> { @Override protected String getCron() { return "0 0/1 * * * ?"; } @Override protected Class getJobClass() { return DemoFlowTask.class; } private int testLoopCount=0;// just for test /** * 流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; * @param shardingContext * @return */ @Override public List<String> fetchData(ShardingContext shardingContext) { if(testLoopCount>=2){ return null; //如果是返回NULL,則會停止停業 } String execTime = "12292088575058"; System.out.println(testLoopCount+"->testLoopCount *************1 DemoFlowTask fetchData executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); List<String> demoList = new ArrayList<String>(); demoList.add("" + execTime); testLoopCount++; try{ Thread.sleep(1000*60); }catch (Exception ex){ ex.printStackTrace(); } System.out.println("*************2 DemoFlowTask fetchData end executed:" + execTime + "*******************************" + JSON.toJSONString(shardingContext)); return demoList; } @Override public void processData(ShardingContext shardingContext, List<String> list) { String testData=list != null? list.get(0):""; System.out.println("*************3 DemoFlowTask processData executed:"+testData+"*******************************"+JSON.toJSONString(shardingContext)); } }