1. 程式人生 > >Spring Batch 初探、使用樣例

Spring Batch 初探、使用樣例

工作裡用到了Spring Batch專案,以前我也是做平行計算的,聽說過Spring Batch不過並沒有去具體瞭解過。今天抽空看看官方文件進行下初步的瞭解。

    

層次架構如上圖。分三層:應用層,核心層,基礎設施層。應用層包括所有的batch任務和使用者開發的程式碼。核心層包括在執行期執行一個任務所需要的類,例如:JobLauncher,Job和Step的實現。應用和核心層都在基礎設施層之上,基礎設施層包括通用的讀寫器(readers and writers)以及如RetryTemplate等服務。

Spring Batch中的一些概念:


Job

Job是Step的容器,用來定義和配置整個任務的資訊:

  • Job的名字
  • 定義Step的順序
  • 定義任務可否重啟

Job Config Sample:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/></job>

JobInstance

是實際執行的Job例項,例項間資料也和業務獨立。

JobParameters

Job執行的引數。。似乎沒什麼好解釋的。

JobExecution

JobExecution是一個技術上的概念,只一次單獨的執行任務。執行可以以成功和失敗結尾,但是JobInstance只有在JobExecution成功結束的情況下,才被認為是完成的。例如,一個JobInstance第一次執行失敗,重新執行,即為另一個JobExecution但是是同一個JobInstance。

Step

一個Job是有一個或者多個Step組成的。Step可複雜可簡單。與Job類似,Step也有對應的StepExecution。


StepExecution

Step的每一次執行都是即為一個StepExecution。每次Step執行都會建立一個StepExecution。每個execution都包含了相關的Step和JobExecution的引用以及事務相關的資料和起始、結束時間。每個StepExecution也包含了一個ExecutionContext,包含了開發需要持久化的資料。

ExecutionContext

key/value對的物件,用於儲存需要記錄的上下文資訊。scope是包括StepExecution和JobExecution。類似於Quartz中的JobDataMap。可以儲存執行過程的資訊,用於故障時重新執行和繼續執行,甚至狀態回滾等等,不過需要使用者自行記錄。任何時刻,一個StepExecution只會有一個ExecutionContext,由Spring Batch框架負責持久化和保證讀取的準確性。

另外,需要注意的是,每個JobExectuion都有一個ExecutionContext,每個StepExecution也有一個獨立的ExecutionContext。例如:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

上述程式碼中,兩個ExecutionContext是不相等的,Step中的context是在每個提交點上被儲存,而job的context會在兩個Step執行之間被儲存。

JobRepository

JobRepository是關於前面提到的Job模版(Stereotypes)的持久化機制。提供了對了JobLauncher,Job和Step實現類的CRUD操作。當初次載入Job的時候,從repository中獲取 JobExecution,在執行的過程中,StepExecution和JobExectuion的實現類通過repository持久化。

JobLauncher

JobLauncher是用於用指定的JobParameters載入Job的介面。

public interface JobLauncher {

    public JobExecution run(Job job, JobParameters jobParameters)
                throws JobExecutionAlreadyRunningException, JobRestartException;
}

Item Reader、 Item Writer、Item Processor

分別用於讀、寫和轉換業務資料。轉換即是進行資料模型的轉換。

名稱空間(Batch Namesapce)

<beans:beans xmlns="http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

完整使用樣例:

配置檔案 batch-context.xml

<beans:beans xmlns= "http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
     <beans:bean id ="first-tasklet"
           class= "com.coderli.spring.batch.firstjob.FirstTasklet" ></beans:bean >

     <beans:bean id ="jobRepository"
           class= "org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" >
           <beans:property name ="transactionManager" ref= "transactionManager" />
     </beans:bean >

     <beans:bean id ="transactionManager"
           class= "org.springframework.batch.support.transaction.ResourcelessTransactionManager" ></beans:bean >

     <job id ="onecoder-job">
           <step id ="first-step" next="secend-step">
               <tasklet ref ="first-tasklet"></ tasklet>
           </step >
           <step id ="secend-step" >
               <tasklet >
                    <chunk reader ="myReader" writer= "myWriter" processor ="myProcessor"
                         commit-interval= "1"></chunk >
               </tasklet >
           </step >
     </job >

     <beans:bean id ="myReader" class= "com.coderli.spring.batch.firstjob.MyReader" ></beans:bean >
     <beans:bean id ="myWriter" class= "com.coderli.spring.batch.firstjob.MyWriter" ></beans:bean >
     <beans:bean id ="myProcessor"
           class= "com.coderli.spring.batch.firstjob.MyProcessor" ></beans:bean >
     <beans:bean id ="myFirstJobLauncher"
           class= "org.springframework.batch.core.launch.support.SimpleJobLauncher" >
           <beans:property name ="taskExecutor" ref= "syncTaskExecutor" />
           <beans:property name ="jobRepository" ref= "jobRepository" />
     </beans:bean >
     <beans:bean id ="syncTaskExecutor"
           class= "org.springframework.core.task.SyncTaskExecutor" />
     <beans:bean id ="jobLauncherTestUtils"
           class= "org.springframework.batch.test.JobLauncherTestUtils" >
           <beans:property name ="job" ref= "onecoder-job"></beans:property >
     </beans:bean >

</beans:beans>

First-Tasklet類

package com.coderli.spring.batch.firstjob;

import lombok.extern.slf4j.Slf4j;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

@Slf4j
public class FirstTasklet implements Tasklet {

     @Override
     public RepeatStatus execute(StepContribution contribution,
              ChunkContext chunkContext) throws Exception {
           log.info( "This is tasklet one in step one of job MyJob");
           return RepeatStatus.FINISHED;
     }
}

Reader類:

package com.coderli.spring.batch.firstjob;

import lombok.extern.slf4j.Slf4j;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

/**
* 自定義Reader類
*
* @author OneCoder
* @date 2014年9月28日 下午2:33:43
*/
@Slf4j
public class MyReader implements ItemReader<MyModel> {

     private int count;

     @Override
     public MyModel read() throws Exception, UnexpectedInputException,
              ParseException, NonTransientResourceException {
           log.info( "This is my reader in step two of job: [MyJob.]");
          MyModel model = null;
           if ( count < 2) {
               model = new MyModel();
               model.setDescription( "My Description");
               model.setId( "My ID");
               model.setName( "My Name");
               count++;
          }
           return model;
     }

}

writer類

package com.coderli.spring.batch.firstjob;

import java.util.List;

import lombok.extern.slf4j.Slf4j;

import org.springframework.batch.item.ItemWriter;

/**
* 自定義Writer類
*
* @author OneCoder
* @date 2014年9月28日 下午2:46:20
*/
@Slf4j
public class MyWriter implements ItemWriter<String> {

     @Override
     public void write(List<? extends String> items) throws Exception {
           log.info( "This is my writer in step two for job: [MyJob].");
           log.info( "Write the JSON string to the console.");
           for (String item : items) {
               log.info( "Write item: {}", item);
          }
     }

}

processor類

package com.coderli.spring.batch.firstjob;

import lombok.extern.slf4j.Slf4j;

import org.springframework.batch.item.ItemProcessor;

import com.google.gson.Gson;

/**
*
* @author OneCoder
* @date 2014年9月28日 下午2:39:48
*/
@Slf4j
public class MyProcessor implements ItemProcessor<MyModel, String> {

     @Override
     public String process(MyModel item) throws Exception {
           log.info( "This is my process in step two of job: [MyJob].");
           log.info( "Transfer MyModel to JSON string.");
          Gson gson = new Gson();
           return gson.toJson( item);
     }
}

測試類

package com.coderli.spring.batch.firstjob;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
* 任務啟動器,通過JUnit測試方式啟動
*
* @author OneCoder
* @date 2014年9月28日 下午3:03:03
*/
@RunWith(SpringJUnit4ClassRunner. class)
@ContextConfiguration(locations = { "../batch-context.xml" })
public class MyFirstJobTest {

     @Autowired
     private JobLauncherTestUtils jobLauncherTestUtils;


     @Test
     public void testJob() throws Exception {
           jobLauncherTestUtils.launchJob();
          JobExecution jobExecution = jobLauncherTestUtils .launchJob();
          Assert. assertEquals("COMPLETED", jobExecution.getExitStatus().getExitCode());
     }
}

簡單介紹上述程式碼裡用到的東西,@SLF4J註解是lombok中的,之前介紹過。測試用的Spring-test和spring-batch-test提供的相關功能。