1. 程式人生 > >elastic-job--作業型別

elastic-job--作業型別

elastic-job提供了三種類型的作業:

  1. Simple型別作業 SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。
  2. Dataflow型別作業 Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。可通過DataflowJobConfiguration配置是否流式處理。流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。實際開發中,Dataflow型別的job還是很有好用的
  3. Script型別作業 Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼,使用不多,可以參見github文件。

1.流式作業

1.1 流式作業介面

/**
 * 資料流分散式作業介面.
 * 
 * @author zhangliang
 * 
 * @param <T> 資料型別
 */
public interface DataflowJob<T> extends ElasticJob {

    /**
     * 獲取待處理資料.
     *
     * @param shardingContext 分片上下文
     * @return 待處理的資料集合
     */
List<T> fetchData(ShardingContext shardingContext); /** * 處理資料. * * @param shardingContext 分片上下文 * @param data 待處理資料集合 */ void processData(ShardingContext shardingContext, List<T> data); }

當作業配置為流式的時候,每次觸發作業後會排程一次fetchData獲取資料,如果獲取到了資料會排程processData方法處理資料,處理完後又繼續調fetchData獲取資料,再調processData處理,如此迴圈,就像流水一樣。直到fetchData沒有獲取到資料或者發生了重新分片才會停止。

public class MyDataflowJob implements DataflowJob<String> {

    private static final ThreadLocal<Integer> LOOP_COUNTER = new ThreadLocal<>();
    private static final int LOOP_TIMES = 10;//每次獲取流處理迴圈次數
    private static final AtomicInteger COUNTER = new AtomicInteger(1);//計數器

    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        Integer current = LOOP_COUNTER.get();
        if (current == null) {
            current = 1;
        } else {
            current += 1;
        }
        LOOP_COUNTER.set(current);
        System.out.println(Thread.currentThread() + "------------current--------" + current);
        if (current > LOOP_TIMES) {
            System.out.println("\n\n\n\n");
            return null;
        } else {
            int shardingItem = shardingContext.getShardingItem();
            List<String> datas = Arrays.asList(getData(shardingItem), getData(shardingItem), getData(shardingItem));
            return datas;
        }
    }

    private String getData(int shardingItem) {
        return shardingItem + "-" + COUNTER.getAndIncrement();
    }

    @Override
    public void processData(ShardingContext shardingContext, List<String> data) {
        System.out.println(Thread.currentThread() + "--------" +data);
    }
}