1. 程式人生 > 程式設計 >快速使用元件-spring batch(3)讀檔案資料到資料庫

快速使用元件-spring batch(3)讀檔案資料到資料庫

tags: springbatch


1.引言

上一篇文章《快速瞭解元件-spring batch(2)之helloworld》Spring Batch進行了入門級的開發,也對基本的元件有了一定的瞭解。但實際開發過程中,更多的是涉及檔案及資料庫的操作,以定時後臺執行的方式,實現批處理操作。典型操作是從文字資料(csv/txt等檔案)中讀取資料,然後寫入到資料庫儲存。如下圖所示:

讀檔案流程

若需要開發此過程,可以按照上一篇文章所寫的,自定義ItemReaderItemWriter來實現,但是Spring Batch其實已經提供現成的檔案讀取和資料庫寫入的元件,開發人員可以直接使用,提高開發效率。本文將會對檔案讀取和資料庫寫入進行實戰介紹。

2.開發環境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日誌元件logback:1.2.3
  • lombok:1.18.6

3.Spring Batch提供的讀-處理-寫元件一覽

在使用Spring Batch內建的讀寫元件時,首先我們先弄清楚有哪些元件可以用,按讀、寫、處理,見下面說明。Spring Batch已提供了比較全面的支援。

3.1 ItemReader

ItemReader 說明
ListItemReader 讀取List型別資料,只能讀一次
ItemReaderAdapter ItemReader介面卡,可以複用現有的讀操作
FlatFileItemReader 讀Flat型別檔案
StaxEventItemReader 讀XML型別檔案
JdbcCursorItemReader 基於JDBC遊標方式讀資料庫
HibernateCursorItemReader 基於Hibernate遊標方式讀資料庫
StoredProcedureItemReader 基於儲存過程讀資料庫
JpaPagingItemReader 基於Jpa方式分頁讀資料庫
JdbcPagingItemReader 基於JDBC方式分頁讀資料庫
HibernatePagingItemReader 基於Hibernate方式分頁讀取資料庫
JmsItemReader 讀取JMS佇列
IteratorItemReader 迭代方式的讀元件
MultiResourceItemReader 多檔案讀元件
MongoItemReader 基於分散式檔案儲存的資料庫 MongoDB讀元件
Neo4jItemReader 面向網路的資料庫Neo4j的讀元件
ResourcesItemReader 基於批量資源的讀元件,每次讀取返回資源物件 AmqpItemReader讀取AMQP佇列元件
RepositoryItemReader 基於 Spring Data的讀元件

3.2 ItemWriter

ItemWriter 說明
FlatFileItemWriter 寫Flat型別檔案
MultiResourceItemWriter 多檔案寫元件
StaxEventItemWriter 寫XML型別檔案
AmqpItemWriter 寫AMQP型別訊息
ClassifierCompositeItemWriter 根據 Classifier路由不同的Item到特定的ItemWriter處理
HiberateItemWriter 基於Hibernate方式寫資料庫
ItemWriterAdapter ItemWriter介面卡,可以複用現有的寫服務
JdbcBatchItemWriter 基於JDBC方式寫資料庫
JmsItemWriter 寫JMS佇列 JpaItemWriter基於Jpa方式寫資料庫
GemfireItemWriter 基於分散式資料庫Gemfire的寫元件
SpELMappingGemfireItemWriter 基於Spring表示式語言寫分散式資料庫Gemfire的寫元件
MimeMessageItemWriter 傳送郵件的寫元件
MongoItemWriter 基於分散式檔案儲存的資料庫MongoDB寫元件
Neo4jItemWriter 面向網路的資料庫Neo4j的讀元件
PropertyExtractingDelegatingItemWriter 屬性抽取代理寫元件:通過呼叫給定的 Spring Bean方法執行寫入,引數由Item中指定的屬性欄位獲取作為引數
RepositoryItemWriter基於 Spring Data的寫元件
SimpleMailMessageItemWriter 傳送郵件的寫元件
CompositeItemWriter 條目寫的組合模式,支援組裝多個ItemWriter

3.3 ItemProcessor

ItemProcessor 說明
CompositeItemProcessor 組合處理器,可以封裝多個業務處理服務
ItemProcessorAdapter ItemProcessor介面卡,可以複用現有的業務處理服務
PassThroughItemProcessor 不做任何業務處理,直接返回讀到的資料
ValidatingItemProcessor 資料校驗處理器,支援對資料的校驗,如果校驗不通過可以進行過濾掉或者通過skip的方式跳過對記錄的處理

4.開發流程

根據當前示例,從csv檔案中讀資料,寫入到mysql資料庫,只需要使用FlatFileItemReaderJdbcBatchItemWriter即可。下面對開發流程作簡要說明。示例工程可以在這裡獲取,裡面有檔案resources/user-data.csv及相應的目標資料庫指令碼mytest.sql

4.1 建立spring batch資料庫

4.1.1 建立資料庫並執行sql指令碼

Spring Batch的執行需要資料庫的支援,以儲存任務的執行狀態及結果。因此需要先建立資料庫。在mysql中建立名為my_spring_batch的資料庫。並在此資料庫中執行 Spring Batch的資料庫指令碼,指令碼位置在spring-batch-core-4.1.2.RELEASE.jar的jar包中的\org\springframework\batch\core\schema-mysql.sql(也可以在示例工程sql資料夾中獲取)。執行完成後,資料庫表如下圖所示:

資料庫

4.1.2 資料庫表說明

資料庫共9張表,以seq結尾的是用於生成主鍵的。其它6張表,以batch_job開頭的是儲存任務的相關資訊,batch_step開頭的儲存步驟相關資訊。

  • jobjob instancejob execution關係 任務job是我們說的邏輯概念,即完整的一個批處理工作,它的例項就是job instance,此任務資訊是儲存在batch_job_instance表中。有點類似java中的類和類例項的概念,是一對多的關係。對於每一個job instance,執行的時候會生成記錄儲存在batch_job_execution中,表示每一個job執行的實際情況。注意,這裡job instancejob execution也是一對多的關係,即同一個例項有可能會執行多次(如上一次執行失敗了,後面重新再執行)。

  • batch_job_execution_contextbatch_job_execution_params 儲存任務執行時需要用到的上下文(以json格式儲存)及執行時使用的引數。

  • batch_step_executionbatch_step_execution_context 儲存任務執行過程中的作業步驟及執行時上下文。

4.1.3 建立示例目標資料庫

本示例只涉及一個test_user表。建立mytest資料庫庫,執行mytest.sql指令碼即可。

4.2 配置多資料來源

一般來說,我們會把Spring Batch的資料儲存在獨立的資料庫中,而實際的應用使用的則是目標資料庫,因此需要配置多資料來源訪問。基於上一篇文章的工程進行開發。

4.2.1 新增mysql資料庫依賴

<!-- 資料庫相關依賴-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>
複製程式碼

4.2.2 配置多資料來源訪問

Spring Boot對多資料來源的支援比較友好,配置也很簡單,先在配置檔案中新增資料庫配置,然後在java配置檔案中新增相應的註解即可。如下:

  • application.properties配置內容
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
複製程式碼
  • DataSourceConfig配置內容 新建DataSourceConfig.java檔案,配置多資料來源,如下:
@Configuration
public class DataSourceConfig {
    @Bean("datasource")
    @ConfigurationProperties(prefix="spring.datasource")
    @Primary
    public DataSource batchDatasource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("targetDatasource")
    @ConfigurationProperties(prefix="spring.target-datasource")
    public DataSource targetDatasource() {
        return DataSourceBuilder.create().build();
    }
}
複製程式碼

這樣,後面就可以直接使用datasourcetargetDatasource兩個Bean進行資料庫訪問。

4.3 新增User實體

本例項中,讀取csv檔案,轉為User實體,然後儲存到資料庫,因此需要先把User這個實體作一個定義。使用了lombokjpa的註解,如下:

@Entity
@Data
@Table(name="test_user")
public class User{
    @Id
    @GeneratedValue
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 手機號
     */
    private String phone;
    ...略
複製程式碼

4.4 新增檔案讀取元件ItemReader

使用內建的FlatFileItemReader即可。如下:

    @Bean
    public ItemReader file2DbItemReader(){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return new FlatFileItemReaderBuilder<User>()
                .name(funcName)
                .resource(new ClassPathResource("user-data.csv"))
//                .linesToSkip(1)
                .delimited()
                .names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
                .fieldSetMapper(new UserFieldSetMapper())
                .build();
    }
複製程式碼

說明:

  • FlatFileItemReaderBuilder用於建立FlatFileItemReader,設定相應的行為,包括使用它來設定讀取檔案的位置(resource),檔案分隔符(預設是','),是否跳過前面幾行(linesToSkip),標識每一列對應的列名稱(可與資料庫的欄位名一致)。設定檔案欄位與資料庫實體欄位的對應關係。
  • 設定檔案欄位與資料庫實體欄位的對應關係,使用FieldSetMapper來實現,其中FieldSet代表每一行文字資料,返回值即為實體物件。如下所示:
public class UserFieldSetMapper implements FieldSetMapper<User> {
    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        String patternYmd = "yyyy-MM-dd";
        String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setName(fieldSet.readString("name"));
        user.setPhone(fieldSet.readString("phone"));
        user.setTitle(fieldSet.readString("title"));
        user.setEmail(fieldSet.readString("email"));
        user.setGender(fieldSet.readString("gender"));
        //此欄位有可能為null
        String dataOfBirthStr = fieldSet.readString("date_of_birth");
        if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
            user.setDateOfBirth(null);
        }else{
            DateTime dateTime = DateUtil.parse(dataOfBirthStr,patternYmd);
            user.setDateOfBirth(dateTime.toJdkDate());
        }
        user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
        user.setSysCreateUser(fieldSet.readString("sys_create_user"));
        user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
        user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
        return user;
    }
}
複製程式碼

4.5 新增處理元件ItemProcessor

由於csv文字檔案中的資料null值資料識別符號為\N,因此可以在處理元件中進行處理,把識別符號\N設定為null值。如下所示:

@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {

    @Override
    public User process(User user) throws Exception {
        user.setPhone(checkStr(user.getPhone()));
        user.setTitle(checkStr(user.getTitle()));
        user.setEmail(checkStr(user.getEmail()));
        user.setGender(checkStr(user.getGender()));
        log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
        return user;
    }

    public String checkStr(String dataToCheck){
        if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
            return null;
        }
        return dataToCheck;
    }
}
複製程式碼

4.6 新增資料庫寫入元件ItemWriter

資料庫寫入元件使用JdbcBatchItemWriter即可,如下:

    @Bean
    public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
        return new JdbcBatchItemWriterBuilder<User>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
                        "VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
                .dataSource(datasource)
                .build();
    }
複製程式碼

說明:

  • 使用JdbcBatchItemWriterBuilder進行JdbcBatchItemWriter的建立,設定插入資料庫的sql語句,同時指定資料來源即可。
  • @Qualifier("targetDatasource") DataSource datasource用於指定資料來源
  • 使用BeanPropertyItemSqlParameterSourceProvider可以直接把讀取的資料實體的屬性資料作為引數填充到sql語句中,從而實現資料插入操作。

4.7 組裝完整任務

經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的stepjob,如下所示(詳細可見示例工程檔案):

File2DbBatchConfig.java

@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName)
                .listener(file2DbListener)
                .flow(file2DbStep)
                .end().build();
    }
@Bean
public Step file2DbStep(ItemReader file2DbItemReader,ItemProcessor file2DbProcessor,ItemWriter file2DbWriter){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName)
                .<User,User>chunk(10)
                .reader(file2DbItemReader)
                .processor(file2DbProcessor)
                .writer(file2DbWriter)
                .build();
    }
複製程式碼

4.8 編寫測試

參考上一文章的ConsoleJobTest,編寫File2DbJobTest檔案。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {

    @Autowired
    private JobLauncherService jobLauncherService;

    @Autowired
    private Job file2DbJob;

    @Test
    public void testFile2DbJob() throws JobParametersInvalidException,JobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException {
        //構建任務執行引數
        JobParameters jobParameters = JobUtil.makeJobParameters();
        //執行並顯示結果
        Map<String,Object> stringObjectMap = jobLauncherService.startJob(file2DbJob,jobParameters);
        Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
    }
}
複製程式碼

執行後結果輸出如下(exitCode=COMPLETED):

執行結果

5.總結

本文先對Spring Batch的開箱即用的ItemReaderItemWriterItemProcessor作了一個簡要的概覽,然後以讀csv檔案,處理null值,再插入到資料庫的處理邏輯為案例,介紹了Spring Batch的資料庫指令碼,FlatFileItemReaderJdbcBatchItemWriter的使用。希望對大家更深入的瞭解Spring Batch有幫助,並能用到實踐中。

參考資源