elastic-job之Dataflow型別作業實現
阿新 • • 發佈:2018-12-30
一、前序
二、Dataflow是什麼?
Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。
三、 怎麼開啟?
可通過DataflowJobConfiguration配置是否流式處理。
流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。
如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。
四、程式碼展示
4.1、要實現Dataflow當然要實現它的介面DataflowJob<T>, 我們這裡定義了Foo的一個普通類
package com.lwl.boot.job.dataflow; import java.io.Serializable; public final class Foo implements Serializable { private static final long serialVersionUID = 2706842871078949451L; private final long id; private final String location; private Status status; public Foo(final long id, final String location, final Status status) { this.id = id; this.location = location; this.status = status; } public long getId() { return id; } public String getLocation() { return location; } public Status getStatus() { return status; } public void setStatus(final Status status) { this.status = status; } public String toString() { return String.format("id: %s, location: %s, status: %s", id, location, status); } public enum Status { TODO, COMPLETED } }
4.2、由於我們未採用資料庫,所以我們模擬了一個數據集合類和一個工廠類
package com.lwl.boot.job.dataflow; import org.springframework.stereotype.Repository; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Repository public class FooRepository { private Map<Long, Foo> data = new ConcurrentHashMap<>(300, 1); public FooRepository() { init(); } private void init() { addData(0L, 100L, "Beijing"); addData(100L, 200L, "Shanghai"); addData(200L, 300L, "Guangzhou"); } private void addData(final long idFrom, final long idTo, final String location) { for (long i = idFrom; i < idTo; i++) { data.put(i, new Foo(i, location, Foo.Status.TODO)); } } public List<Foo> findTodoData(final String location, final int limit) { List<Foo> result = new ArrayList<>(limit); int count = 0; for (Map.Entry<Long, Foo> each : data.entrySet()) { Foo foo = each.getValue(); if (foo.getLocation().equals(location) && foo.getStatus() == Foo.Status.TODO) { result.add(foo); count++; if (count == limit) { break; } } } return result; } public void setCompleted(final long id) { //設定狀態,不讓他一直抓取 data.get(id).setStatus(Foo.Status.COMPLETED); } }
package com.lwl.boot.job.dataflow; public class FooRepositoryFactory { private static FooRepository fooRepository = new FooRepository(); public static FooRepository repository() { return fooRepository; } }
DataflowJob的實現類
package com.lwl.boot.job.dataflow; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.springframework.util.CollectionUtils; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.dataflow.DataflowJob; /** * Dataflow型別用於處理資料流, * 需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。 * */ public class ApiMyElasticJobDataflow implements DataflowJob<Foo>{ private FooRepository fooRepository = FooRepositoryFactory.repository(); @Override public List<Foo> fetchData(ShardingContext context) { System.out.println("-------------------------------------fetchData: "+context.getShardingParameter()+"---------------------------------------------"); List<Foo> result = fooRepository.findTodoData(context.getShardingParameter(), 10); System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s | count: %d", context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH", CollectionUtils.isEmpty(result)?0:result.size())); return result; } @Override public void processData(ShardingContext shardingContext, List<Foo> data) { System.out.println("-------------------------------------processData: "+shardingContext.getShardingParameter()+"---------------------------------------------"); System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s | count: %d", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS", CollectionUtils.isEmpty(data)?0:data.size())); for (Foo each : data) { fooRepository.setCompleted(each.getId()); } } }
最後就是我們的測試類:
這個類中和simple型別最重要的區別就是在於作業型別DataflowJobConfiguration,它的主要引數有package com.lwl.boot.job.dataflow; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; /** * 可通過DataflowJobConfiguration配置是否流式處理。 流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。 如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。 流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。 * */ public class ApiJobDataflow { public static void main(String[] args) { new JobScheduler(registryCenter(),configuration()).init(); } private static CoordinatorRegistryCenter registryCenter() { //配置zookeeper CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "elastic-job-demo")); registryCenter.init(); return registryCenter; } private static LiteJobConfiguration configuration() { // 定義作業核心配置 String shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou"; JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("dataflowJob", "0/20 * * * * ?", 3).shardingItemParameters(shardingItemParameters).build(); // 定義DATAFLOW型別配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, ApiMyElasticJobDataflow.class.getCanonicalName(), true); // 定義Lite作業根配置 String jobShardingStrategyClass = null; LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build(); return dataflowJobRootConfig; } }
coreConfig:作業核心配置
jobClass:作業實現類streamingProcess:是否流式處理資料,如果流式處理資料, 則fetchData不返回空結果將持續執行作業,如果非流式處理資料, 則處理資料完成後作業結束
五、啟動
最後的啟動方式也參考Simple型別,通過啟動幾次main方法,可以看到各個控制檯輸出的日誌資訊