Spring Batch 在大型企業中的最佳實踐
批處理應用通常有以下特點:
數據量大,從數萬到數百萬甚至上億不等;
整個過程全部自動化,並預留一定接口進行自定義配置;
這樣的應用通常是周期性運行,比如按日、周、月運行;
對數據處理的準確性要求高,並且需要容錯機制、回滾機制、完善的日誌監控等。
什麽是Spring batch
Spring batch是一個輕量級的全面的批處理框架,它專為大型企業而設計,幫助開發健壯的批處理應用。Spring batch為處理大批量數據提供了很多必要的可重用功能,比如日誌追蹤、事務管理、job執行統計、重啟job和資源管理等。同時它也提供了優化和分片技術用於實現高性能的批處理任務。
它的核心功能包括:
事務管理
基於塊的處理過程
聲明式的輸入/輸出操作
啟動、終止、重啟任務
重試/跳過任務
基於Web的管理員接口
筆者所在的部門屬於國外某大型金融公司的CRM部門,在日常工作中我們經常需要開發一些批處理應用,對Spring Batch有著豐富的使用經驗。近段時間筆者特意總結了這些經驗。
使用Spring Batch 3.0以及Spring Boot
在使用Spring Batch時推薦使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:
支持JSR-352標準
支持Spring4以及Java8
增強了Spring Batch Integration的功能
支持SQLite
支持Spring4和Java8是一個重大的提升。這樣就可以使用Spring4引入的Spring boot組件,從而開發效率方面有了一個質的飛躍。引入Spring-batch框架只需要在build.gradle中加入一行代碼即可:
compile("org.springframework.boot:spring-boot-starter-batch")
而增強Spring Batch Integration的功能後,我們就可以很方便的和Spring家族的其他組件集成,還可以以多種方式來調用job,也支持遠程分區操作以及遠程塊處理。
而支持JobScope後我們可以隨時為對象註入當前Job實例的上下文信息。只要我們指定Bean的scope為job scope,那麽就可以隨時使用jobParameters和jobExecutionContext等信息。
@Component
@JobScope
public class CustomClass {
@Value("#{jobParameters[jobDate]}")
private String jobDate;
@Value("#{jobExecutionContext[‘input.name‘]}.")
private String fileName;
}
使用Java Config而不是xml的配置方式
之前我們在配置job和step的時候都習慣用xml的配置方式,但是隨著時間的推移發現問題頗多。
xml文件數急劇膨脹,配置塊長且復雜,可讀性很差;
xml文件缺少語法檢查,有些低級錯誤只有在運行集成測試的時候才能發現;
在xml文件中進行代碼跳轉時IDE的支持力度不夠;
我們漸漸發現使用純Java類的配置方式更靈活,它是類型安全的,而且IDE的支持更好。在構建job或step時采用的流式語法相比xml更加簡潔易懂。
@Bean
public Step step(){
return stepBuilders.get("step")
.chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.listener(logSkipListener())
.build();
}
在這個例子中可以很清楚的看到該step的配置,比如reader/processor/writer組件,以及配置了哪些listener等。
本地集成測試中使用內存數據庫
Spring batch在運行時需要數據庫支持,因為它需要在數據庫中建立一套schema來存儲job和step運行的統計信息。而在本地集成測試中我們可以借助Spring batch提供的內存Repository來存儲Spring batch的任務執行信息,這樣既避免了在本地配置一個數據庫,又可以加快job的執行。先為Job的配置類添加擴展類:DefaultBatchConfigurer。
public class CustomJobConfiguration extends DefaultBatchConfigurer {
...
}
我們在build.gradle中加入對hsqldb的依賴:
runtime(‘org.hsqldb:hsqldb:2.3.2’)
然後在測試類中添加對DataSource的配置。
@EnableAutoConfiguration
@EnableBatchProcessing
@DataJpaTest
@Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class})
public class TestConfiguration {
}
並且在applicaton.properties配置中添加初始化Database的配置:spring.batch.initializer.enable=true
合理的使用Chunk機制
Spring batch在配置Step時采用的是基於Chunk的機制。即每次讀取一條數據,再處理一條數據,累積到一定數量後再一次×××給writer進行寫入操作。這樣可以最大化的優化寫入效率,整個事務也是基於Chunk來進行。
當我們在需要將數據寫入到文件、數據庫中之類的操作時可以適當設置Chunk的值以滿足寫入效率最大化。但有些場景下我們的寫入操作其實是調用一個web service或者將消息發送到某個消息隊列中,那麽這些場景下我們就需要設置Chunk的值為1,這樣既可以及時的處理寫入,也不會由於整個Chunk中發生異常後,在重試時出現重復調用服務或者重復發送消息的情況。
使用Listener來監視job執行情況並及時做相應的處理
Spring batch提供了大量的Listener來對job的各個執行環節進行全面的監控。
在job層面Spring batch提供了JobExecutionListener接口,其支持在Job開始或結束時進行一些額外處理。在step層面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同時對Retry和Skip操作也提供了RetryListener及SkipListener。
通常我們會為每個job都實現一個JobExecutionListener,在afterJob操作中我們輸出job的執行信息,包括執行時間、job參數、退出代碼、執行的step以及每個step的詳細信息。這樣無論是開發、測試還是運維人員都對整個job的執行情況了如指掌。
如果某個step會發生skip的操作,我們也會為其實現一個SkipListener,並在其中記錄skip的數據條目,用於下一步的處理。
實現Listener有兩種方式,一種是繼承自相應的接口,比如繼承JobExecutionListener接口,另一種是使用annoation(註解)的方式。經過實踐我們認為使用註解的方式更好一些,因為使用接口你需要實現接口的所有方法,而使用註解則只需要對相應的方法添加annoation即可。
下面的這個類采用了繼承接口的方式,我們看到其實我們只用到了第一個方法,第二個和第三個都沒有用到。但是我們必須提供一個空的實現。
public class CustomSkipListener implements SkipListener {
@Override
public void onSkipInRead(Throwable t) {
// business logic
}
@Override
public void onSkipInWrite(String item, Throwable t) {
// no need
}
@Override
public void onSkipInProcess(String item, Throwable t) {
// no need
}
}
而使用annoation的方式可以簡寫為:
public class CustomSkipListener {
@OnSkipInRead
public void onSkipInRead(Throwable t) {
// business logic
}
}
使用Retry和Skip增強批處理工作的健壯性
在處理百萬級的數據過程過程中難免會出現異常。如果一旦出現異常而導致整個批處理工作終止的話那麽會導致後續的數據無法被處理。Spring Batch內置了Retry(重試)和Skip(跳過)機制幫助我們輕松處理各種異常。我們需要將異常分為三種類型。第一種是需要進行Retry的異常,它們的特點是該異常可能會隨著時間推移而消失,比如數據庫目前有鎖無法寫入、web服務當前不可用、web服務滿載等。所以對它們適合配置Retry機制。第二種是需要Skip的異常,比如解析文件的某條數據出現異常等,因為對這些異常即使執行Retry每次的結果也都是相同,但又不想由於某條數據出錯而停止對後續數據的處理。第三種異常是需要讓整個Job立刻失敗的異常,比如如果出現了OutOfMemory的異常,那麽需要整個Job立刻終止運行。
一般來說需要Retry的異常也要配置Skip選項,從而保證後續的數據能夠被繼續處理。我們也可以配置SkipLimit選項保證當Skip的數據條目達到一定數量後及時終止整個Job。
有時候我們需要在每次Retry中間隔做一些操作,比如延長Retry時間,恢復操作現場等,Spring Batch提供了BackOffPolicy來達到目的。下面是一個配置了Retry機制、Skip機制以及BackOffPolicy的step示例。
@Bean
public Step step(){
return stepBuilders.get("step")
.chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(logProcessListener())
.faultTolerant()
.skipLimit(10)
.skip(UnknownGenderException.class)
.skip(ServiceUnavailableException.class)
.retryLimit(5)
.retry(ServiceUnavailableException.class)
.backOffPolicy(backoffPolicy)
.listener(logSkipListener())
.build();
}
使用自定義的Decider來實現Job flow
在Job執行過程中不一定都是順序執行的,我們經常需要根據某個job的輸出數據或執行結果來決定下一步的走向。以前我們會把一些判斷放置在下遊step中進行,這樣可能會導致有些step實際運行了,但其實並沒有做任何事情。比如一個step執行過程中會將失敗的數據條目記錄到一個報告中,而下一個step會判斷有沒有生成報告,如果生成了報告則將該報告發送給指定聯系人,如果沒有則不做任何事情。這種情況下可以通過Decider機制來實現Job的執行流程。在Spring batch 3.0中Decider已經從Step中獨立出來,和Step處於同一級別。
public class ReportDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (report.isExist()) {
return new FlowExecutionStatus(“SEND");
}
return new FlowExecutionStatus(“SKIP");
}
}
而在job配置中可以這樣來使用Decider。這樣整個Job的執行流程會更加清晰易懂。
public Job job() {
return new JobBuilder("petstore")
.start(orderProcess())
.next(reportDecider)
.on("SEND").to(sendReportStep)
.on("SKIP").end().build()
.build();
}
采用多種機制加速Job的執行
批處理工作處理的數據量大,而執行窗口一般又要求比較小。所以必須要通過多種方式來加速Job的執行。一般我們有四種方式來實現:
在單個step中多線程執行任務
並行執行不同的Step
並行執行同一個Step
遠程執行Chunk任務
單個step多線程執行任務可以借助於taskExecutor來實現。這種情況適合於reader、writer是線程安全且是無狀態的場景。我們還可以設置線程數量。
public Step step() {
return stepBuilders.get("step")
.tasklet(tasklet)
.throttleLimit(20)
.build();
}
上述示例中的tasklet需要實現TaskExecutor,Spring Batch提供了一個簡單的多線程TaskExecutor供我們使用:SimpleAsyncTaskExecutor。
並行執行不同的Step在Spring batch中很容易實現,以下是一個示例:
public Job job() {
return stepBuilders.get("parallelSteps")
.start(step1)
.split(asyncTaskExecutor).add(flow1, flow2)
.next(step3)
.build();
}
在這個示例中我們先執行step1,然後並行執行flow1和flow2,最後再執行step3。
Spring batch提供了PartitionStep來實現對同一個step在多個進程中實現並行處理。通過PartitonStep再配合PartitionHandler可以將一個step擴展到多個Slave上實現並行運行。
遠程執行Chunk任務則是將某個Step的processer操作分割到多個進程中,多個進程通過一些中間件進行通訊(比如采用消息的方式)。這種方式適合於Processer是瓶頸而Reader和Writer不是瓶頸的場景。
結語
Spring Batch對批處理場景進行了合理的抽象,封裝了大量的實用功能,使用它來開發批處理應用可以達到事半功倍的效果。在使用的過程中我們仍需要堅持總結一些最佳實踐,從而能夠交付高質量的可維護的批處理應用,滿足企業級應用的苛刻要求。
Spring Batch 在大型企業中的最佳實踐