elastic-job--作業型別
阿新 • • 發佈:2018-12-14
elastic-job提供了三種類型的作業:
- Simple型別作業 SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。
- Dataflow型別作業 Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。可通過DataflowJobConfiguration配置是否流式處理。流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。實際開發中,Dataflow型別的job還是很有好用的
- Script型別作業 Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼,使用不多,可以參見github文件。
1.流式作業
1.1 流式作業介面
/**
* 資料流分散式作業介面.
*
* @author zhangliang
*
* @param <T> 資料型別
*/
public interface DataflowJob<T> extends ElasticJob {
/**
* 獲取待處理資料.
*
* @param shardingContext 分片上下文
* @return 待處理的資料集合
*/
List<T> fetchData(ShardingContext shardingContext);
/**
* 處理資料.
*
* @param shardingContext 分片上下文
* @param data 待處理資料集合
*/
void processData(ShardingContext shardingContext, List<T> data);
}
當作業配置為流式的時候,每次觸發作業後會排程一次fetchData獲取資料,如果獲取到了資料會排程processData方法處理資料,處理完後又繼續調fetchData獲取資料,再調processData處理,如此迴圈,就像流水一樣。直到fetchData沒有獲取到資料或者發生了重新分片才會停止。
public class MyDataflowJob implements DataflowJob<String> {
private static final ThreadLocal<Integer> LOOP_COUNTER = new ThreadLocal<>();
private static final int LOOP_TIMES = 10;//每次獲取流處理迴圈次數
private static final AtomicInteger COUNTER = new AtomicInteger(1);//計數器
@Override
public List<String> fetchData(ShardingContext shardingContext) {
Integer current = LOOP_COUNTER.get();
if (current == null) {
current = 1;
} else {
current += 1;
}
LOOP_COUNTER.set(current);
System.out.println(Thread.currentThread() + "------------current--------" + current);
if (current > LOOP_TIMES) {
System.out.println("\n\n\n\n");
return null;
} else {
int shardingItem = shardingContext.getShardingItem();
List<String> datas = Arrays.asList(getData(shardingItem), getData(shardingItem), getData(shardingItem));
return datas;
}
}
private String getData(int shardingItem) {
return shardingItem + "-" + COUNTER.getAndIncrement();
}
@Override
public void processData(ShardingContext shardingContext, List<String> data) {
System.out.println(Thread.currentThread() + "--------" +data);
}
}