快速使用元件-spring batch(3)讀檔案資料到資料庫
tags: springbatch
1.引言
上一篇文章《快速瞭解元件-spring batch(2)之helloworld》對Spring Batch
進行了入門級的開發,也對基本的元件有了一定的瞭解。但實際開發過程中,更多的是涉及檔案及資料庫的操作,以定時後臺執行的方式,實現批處理操作。典型操作是從文字資料(csv/txt
等檔案)中讀取資料,然後寫入到資料庫儲存。如下圖所示:
若需要開發此過程,可以按照上一篇文章所寫的,自定義ItemReader
和ItemWriter
來實現,但是Spring Batch
其實已經提供現成的檔案讀取和資料庫寫入的元件,開發人員可以直接使用,提高開發效率。本文將會對檔案讀取和資料庫寫入進行實戰介紹。
2.開發環境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發IDE: IDEA
- 構建工具Maven: 3.3.9
- 日誌元件logback:1.2.3
- lombok:1.18.6
3.Spring Batch提供的讀-處理-寫元件一覽
在使用Spring Batch
內建的讀寫元件時,首先我們先弄清楚有哪些元件可以用,按讀、寫、處理,見下面說明。Spring Batch
已提供了比較全面的支援。
3.1 ItemReader
ItemReader | 說明 |
---|---|
ListItemReader | 讀取List型別資料,只能讀一次 |
ItemReaderAdapter | ItemReader介面卡,可以複用現有的讀操作 |
FlatFileItemReader | 讀Flat型別檔案 |
StaxEventItemReader | 讀XML型別檔案 |
JdbcCursorItemReader | 基於JDBC遊標方式讀資料庫 |
HibernateCursorItemReader | 基於Hibernate遊標方式讀資料庫 |
StoredProcedureItemReader | 基於儲存過程讀資料庫 |
JpaPagingItemReader | 基於Jpa方式分頁讀資料庫 |
JdbcPagingItemReader | 基於JDBC方式分頁讀資料庫 |
HibernatePagingItemReader | 基於Hibernate方式分頁讀取資料庫 |
JmsItemReader | 讀取JMS佇列 |
IteratorItemReader | 迭代方式的讀元件 |
MultiResourceItemReader | 多檔案讀元件 |
MongoItemReader | 基於分散式檔案儲存的資料庫 MongoDB讀元件 |
Neo4jItemReader | 面向網路的資料庫Neo4j的讀元件 |
ResourcesItemReader | 基於批量資源的讀元件,每次讀取返回資源物件 AmqpItemReader讀取AMQP佇列元件 |
RepositoryItemReader | 基於 Spring Data的讀元件 |
3.2 ItemWriter
ItemWriter | 說明 |
---|---|
FlatFileItemWriter | 寫Flat型別檔案 |
MultiResourceItemWriter | 多檔案寫元件 |
StaxEventItemWriter | 寫XML型別檔案 |
AmqpItemWriter | 寫AMQP型別訊息 |
ClassifierCompositeItemWriter | 根據 Classifier路由不同的Item到特定的ItemWriter處理 |
HiberateItemWriter | 基於Hibernate方式寫資料庫 |
ItemWriterAdapter | ItemWriter介面卡,可以複用現有的寫服務 |
JdbcBatchItemWriter | 基於JDBC方式寫資料庫 |
JmsItemWriter | 寫JMS佇列 JpaItemWriter基於Jpa方式寫資料庫 |
GemfireItemWriter | 基於分散式資料庫Gemfire的寫元件 |
SpELMappingGemfireItemWriter | 基於Spring表示式語言寫分散式資料庫Gemfire的寫元件 |
MimeMessageItemWriter | 傳送郵件的寫元件 |
MongoItemWriter | 基於分散式檔案儲存的資料庫MongoDB寫元件 |
Neo4jItemWriter | 面向網路的資料庫Neo4j的讀元件 |
PropertyExtractingDelegatingItemWriter | 屬性抽取代理寫元件:通過呼叫給定的 Spring Bean方法執行寫入,引數由Item中指定的屬性欄位獲取作為引數 |
RepositoryItemWriter基於 | Spring Data的寫元件 |
SimpleMailMessageItemWriter | 傳送郵件的寫元件 |
CompositeItemWriter | 條目寫的組合模式,支援組裝多個ItemWriter |
3.3 ItemProcessor
ItemProcessor | 說明 |
---|---|
CompositeItemProcessor | 組合處理器,可以封裝多個業務處理服務 |
ItemProcessorAdapter | ItemProcessor介面卡,可以複用現有的業務處理服務 |
PassThroughItemProcessor | 不做任何業務處理,直接返回讀到的資料 |
ValidatingItemProcessor | 資料校驗處理器,支援對資料的校驗,如果校驗不通過可以進行過濾掉或者通過skip的方式跳過對記錄的處理 |
4.開發流程
根據當前示例,從csv
檔案中讀資料,寫入到mysql
資料庫,只需要使用FlatFileItemReader
和JdbcBatchItemWriter
即可。下面對開發流程作簡要說明。示例工程可以在這裡獲取,裡面有檔案resources/user-data.csv
及相應的目標資料庫指令碼mytest.sql
。
4.1 建立spring batch資料庫
4.1.1 建立資料庫並執行sql指令碼
Spring Batch
的執行需要資料庫的支援,以儲存任務的執行狀態及結果。因此需要先建立資料庫。在mysql
中建立名為my_spring_batch
的資料庫。並在此資料庫中執行
Spring Batch
的資料庫指令碼,指令碼位置在spring-batch-core-4.1.2.RELEASE.jar
的jar包中的\org\springframework\batch\core\schema-mysql.sql
(也可以在示例工程的sql
資料夾中獲取)。執行完成後,資料庫表如下圖所示:
4.1.2 資料庫表說明
資料庫共9張表,以seq
結尾的是用於生成主鍵的。其它6張表,以batch_job
開頭的是儲存任務的相關資訊,batch_step
開頭的儲存步驟相關資訊。
-
job
與job instance
與job execution
關係 任務job
是我們說的邏輯概念,即完整的一個批處理工作,它的例項就是job instance
,此任務資訊是儲存在batch_job_instance
表中。有點類似java中的類和類例項的概念,是一對多的關係。對於每一個job instance
,執行的時候會生成記錄儲存在batch_job_execution
中,表示每一個job
執行的實際情況。注意,這裡job instance
和job execution
也是一對多的關係,即同一個例項有可能會執行多次(如上一次執行失敗了,後面重新再執行)。 -
batch_job_execution_context
及batch_job_execution_params
儲存任務執行時需要用到的上下文(以json
格式儲存)及執行時使用的引數。 -
batch_step_execution
及batch_step_execution_context
儲存任務執行過程中的作業步驟及執行時上下文。
4.1.3 建立示例目標資料庫
本示例只涉及一個test_user
表。建立mytest
資料庫庫,執行mytest.sql
指令碼即可。
4.2 配置多資料來源
一般來說,我們會把Spring Batch
的資料儲存在獨立的資料庫中,而實際的應用使用的則是目標資料庫,因此需要配置多資料來源訪問。基於上一篇文章的工程進行開發。
4.2.1 新增mysql資料庫依賴
<!-- 資料庫相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
複製程式碼
4.2.2 配置多資料來源訪問
Spring Boot
對多資料來源的支援比較友好,配置也很簡單,先在配置檔案中新增資料庫配置,然後在java配置檔案中新增相應的註解即可。如下:
-
application.properties
配置內容
# spring batch db
spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.username=root
spring.datasource.password=111111
# target db
spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.target-datasource.username=root
spring.target-datasource.password=111111
複製程式碼
-
DataSourceConfig
配置內容 新建DataSourceConfig.java
檔案,配置多資料來源,如下:
@Configuration
public class DataSourceConfig {
@Bean("datasource")
@ConfigurationProperties(prefix="spring.datasource")
@Primary
public DataSource batchDatasource() {
return DataSourceBuilder.create().build();
}
@Bean("targetDatasource")
@ConfigurationProperties(prefix="spring.target-datasource")
public DataSource targetDatasource() {
return DataSourceBuilder.create().build();
}
}
複製程式碼
這樣,後面就可以直接使用datasource
及targetDatasource
兩個Bean進行資料庫訪問。
4.3 新增User實體
本例項中,讀取csv
檔案,轉為User
實體,然後儲存到資料庫,因此需要先把User
這個實體作一個定義。使用了lombok
和jpa
的註解,如下:
@Entity
@Data
@Table(name="test_user")
public class User{
@Id
@GeneratedValue
/**
* id
*/
private Long id;
/**
* 姓名
*/
private String name;
/**
* 手機號
*/
private String phone;
...略
複製程式碼
4.4 新增檔案讀取元件ItemReader
使用內建的FlatFileItemReader
即可。如下:
@Bean
public ItemReader file2DbItemReader(){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return new FlatFileItemReaderBuilder<User>()
.name(funcName)
.resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
.delimited()
.names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
.fieldSetMapper(new UserFieldSetMapper())
.build();
}
複製程式碼
說明:
-
FlatFileItemReaderBuilder
用於建立FlatFileItemReader
,設定相應的行為,包括使用它來設定讀取檔案的位置(resource
),檔案分隔符(預設是','
),是否跳過前面幾行(linesToSkip
),標識每一列對應的列名稱(可與資料庫的欄位名一致)。設定檔案欄位與資料庫實體欄位的對應關係。 - 設定檔案欄位與資料庫實體欄位的對應關係,使用
FieldSetMapper
來實現,其中FieldSet
代表每一行文字資料,返回值即為實體物件。如下所示:
public class UserFieldSetMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
String patternYmd = "yyyy-MM-dd";
String patternYmdHms = "yyyy-MM-dd HH:mm:ss";
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setName(fieldSet.readString("name"));
user.setPhone(fieldSet.readString("phone"));
user.setTitle(fieldSet.readString("title"));
user.setEmail(fieldSet.readString("email"));
user.setGender(fieldSet.readString("gender"));
//此欄位有可能為null
String dataOfBirthStr = fieldSet.readString("date_of_birth");
if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){
user.setDateOfBirth(null);
}else{
DateTime dateTime = DateUtil.parse(dataOfBirthStr,patternYmd);
user.setDateOfBirth(dateTime.toJdkDate());
}
user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms));
user.setSysCreateUser(fieldSet.readString("sys_create_user"));
user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms));
user.setSysUpdateUser(fieldSet.readString("sys_update_user"));
return user;
}
}
複製程式碼
4.5 新增處理元件ItemProcessor
由於csv
文字檔案中的資料null
值資料識別符號為\N
,因此可以在處理元件中進行處理,把識別符號\N
設定為null
值。如下所示:
@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User user) throws Exception {
user.setPhone(checkStr(user.getPhone()));
user.setTitle(checkStr(user.getTitle()));
user.setEmail(checkStr(user.getEmail()));
user.setGender(checkStr(user.getGender()));
log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
return user;
}
public String checkStr(String dataToCheck){
if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
return null;
}
return dataToCheck;
}
}
複製程式碼
4.6 新增資料庫寫入元件ItemWriter
資料庫寫入元件使用JdbcBatchItemWriter
即可,如下:
@Bean
public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
"VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
.dataSource(datasource)
.build();
}
複製程式碼
說明:
- 使用
JdbcBatchItemWriterBuilder
進行JdbcBatchItemWriter
的建立,設定插入資料庫的sql語句,同時指定資料來源即可。 -
@Qualifier("targetDatasource") DataSource datasource
用於指定資料來源 - 使用
BeanPropertyItemSqlParameterSourceProvider
可以直接把讀取的資料實體的屬性資料作為引數填充到sql
語句中,從而實現資料插入操作。
4.7 組裝完整任務
經過上面的操作,可以使用一個java配置,把讀、寫、處理組裝成完整的step
和job
,如下所示(詳細可見示例工程檔案):
File2DbBatchConfig.java
@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName)
.listener(file2DbListener)
.flow(file2DbStep)
.end().build();
}
@Bean
public Step file2DbStep(ItemReader file2DbItemReader,ItemProcessor file2DbProcessor,ItemWriter file2DbWriter){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName)
.<User,User>chunk(10)
.reader(file2DbItemReader)
.processor(file2DbProcessor)
.writer(file2DbWriter)
.build();
}
複製程式碼
4.8 編寫測試
參考上一文章的ConsoleJobTest
,編寫File2DbJobTest
檔案。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {
@Autowired
private JobLauncherService jobLauncherService;
@Autowired
private Job file2DbJob;
@Test
public void testFile2DbJob() throws JobParametersInvalidException,JobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException {
//構建任務執行引數
JobParameters jobParameters = JobUtil.makeJobParameters();
//執行並顯示結果
Map<String,Object> stringObjectMap = jobLauncherService.startJob(file2DbJob,jobParameters);
Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
}
複製程式碼
執行後結果輸出如下(exitCode=COMPLETED
):
5.總結
本文先對Spring Batch
的開箱即用的ItemReader
,ItemWriter
、ItemProcessor
作了一個簡要的概覽,然後以讀csv
檔案,處理null值,再插入到資料庫的處理邏輯為案例,介紹了Spring Batch
的資料庫指令碼,FlatFileItemReader
及JdbcBatchItemWriter
的使用。希望對大家更深入的瞭解Spring Batch
有幫助,並能用到實踐中。
參考資源
-
劉相《Spring Batch 批處理框架》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。
-
《Spring Batch - Reference Documentation》:書中對Spring Batch進行了詳細的描述,本文章主要參考此書。