SpringBatch中的retry和skip機制實現分析
先簡單說明下SpringBatch在SpringBoot中的使用。
如果要在springboot中使用batch的話,直接加入以下依賴即可:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
然後使用註解開啟Batch模塊:
... @EnableBatchProcessing public class Application { ... }
之後就可以註入JobBuilderFactory和StepBuilderFactory:
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
有了這2個factory之後,就可以build job。
SpringBatch中的相關基礎概念比如ItemReader、ItemWriter、Chunk等本文就不介紹了。
我們以FlatFileItemReader作為reader,一個自定義Writer用於打印reader中讀取出來的數據。
這個定義的writer遇到good job這條數據的時候會報錯,具體邏輯如下:
@Override public void write(List<? extends String> items) throws Exception { System.out.println("handle start =====" + items); for(String a : items) { if(a.equals("good job")) { throw new Exception("custom exception"); } } System.out.println("handle end.. -----" + items); }
其中reader中讀取的文件中的數據如下:
hello world
hello coder
good job
cool
66666
我們使用StepBuilderFactory構造Step,chunkSize設置為2。然後在job1中使用並執行:
stepBuilderFactory.get("teststep").chunk(2).reader(reader).writer(writer).build();
執行job1後console打印如下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
job1遇到了good job這條數據,writer拋出了異常,由於沒有使用skip或者retry機制,導致整個流程停止。job1的處理流程底層在SimpleChunkProcessor這個類中完成,包括processor、writer的使用。
接下裏我們構造一個job2,job2使用skip機制(其中skipLimit必須要和skip(Class<? extends Throwable> type)一起使用),skip機制可以防止writer發生異常後不停止整個job,但是需要同時滿足skip的限制次數和skip對應的Exception是發生異常的父類或自身關系條件才不會停止整個job,這裏我們使用Exception作為異常和Integer.MAX_VALUE作為skip的限制次數為例:
stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();
執行job2 後console打印如下:
stepBuilderFactory.get.get("teststep").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();
執行job2 後console打印如下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
我們看到good job這條數據發生的異常被skip掉了,job完整的執行。
但是發現了另外一個問題,那就是處理 [good job, cool] 這批數據的時候,發生了異常,但是接下來執行了 [good job] 和 [cool] 這兩批chunk為1的批次。這是在ItemWriter中執行的,它也會在ItemWriteListener中執行多次。
換句話說,如果使用了skip功能,那麽對於需要被skip的批次數據中會進行scan操作找出具體是哪1條數據的原因,這裏的scan操作指的是一條一條數據的遍歷。
這個過程為什麽叫scan呢? 在源碼中,FaultTolerantChunkProcessor類(處理帶有skip或者retry機制的處理器,跟SimpleChunkProcessor類似,只不過SimpleChunkProcessor處理簡單的Job)裏有個私有方法scan:
private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
...
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
List<O> items = Collections.singletonList(outputIterator.next()); // 拿出需要寫的數據中的每一條數據
inputIterator.next();
try {
writeItems(items); // 寫每條數據
doAfterWrite(items);
contribution.incrementWriteCount(1);
inputIterator.remove();
outputIterator.remove();
}
catch (Exception e) { // 寫的時候如果發生了異常
doOnWriteError(e, items);
if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
inputIterator.remove();
outputIterator.remove();
}
else {
// 具體的skip策略
checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
}
if (rollbackClassifier.classify(e)) {
throw e;
}
}
chunkMonitor.incrementOffset();
if (outputs.isEmpty()) { // 批次裏的所有數據處理完畢之後 scanning 設置為false
data.scanning(false);
inputs.setBusy(false);
chunkMonitor.resetOffset();
}
}
這個scan方法觸發的條件是UserData這個內部類裏的scanning被設置為true,這裏被設置為true是在處理批次數據出現異常後並且不能retry的情況下才會被設置的。
try {
batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
rollbackClassifier));
}
catch (Exception e) {
RetryContext context = contextHolder.get();
if (!batchRetryTemplate.canRetry(context)) {
// 設置scanning為true
data.scanning(true);
}
throw e;
}
這就是為什麽skip機制在skip數據的時候會去scan批次中的每條數據,然後並找出需要被skip的數據的原理。
job3帶有retry功能,retry的功能在於出現某個異常並且這個異常可以被retry所接受的話會進行retry,retry的次數可以進行配置,我們配置了3次retry:
stepBuilderFactory.get.get("teststep").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).retryLimit(3).retry(Exception.class).build();
執行 job3後console打印如下:
handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]
good job, cool] 這批數據retry了3次,而且都失敗了。失敗之後進行了skip操作。
SpringBatch中的retry和skip都有對應的policy實現,默認的retry policy是SimpleRetryPolicy,可以設置retry次數和接收的exception。比如可以使用NeverRetryPolicy:
.retryPolicy(new NeverRetryPolicy())
使用NeverRetryPolicy之後,便不再retry了,只會skip。SpringBatch內部的retry是使用Spring的retry模塊完成的。執行的時候可以設置RetryCallback和RecoveryCallback。
SpringBatch中默認的skip policy是LimitCheckingItemSkipPolicy。
SpringBatch中的retry和skip機制實現分析