Spring-batch(ItemWriter)數據寫入數據庫,普通文件,xml文件,多文件分類寫入
一.ItemWriter簡介
1.對於read讀取數據時是一個item為單位的循環讀取,而對於writer寫入數據則是以chunk為單位,一塊一塊的進行寫入
2.例(我們舉一個小例子來認識其writer原理):
代碼:
OutOverViewApplication
package com.dhcc.batch.batchDemo.output.outview; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class OutOverViewApplication { public static void main(String[] args) { SpringApplication.run(OutOverViewApplication.class, args); } }
OutputViewItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.outview; import java.util.ArrayList; import java.util.List; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class OutputViewItemWriterConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("OutputViewItemWriter") private ItemWriter<? super String> outputViewItemWriter; @Bean public Job OutputViewItemWriterJob3() { return jobBuilderFactory.get("OutputViewItemWriterJob3") .start(OutputViewItemWriterStep3()) .build(); } @Bean public Step OutputViewItemWriterStep3() { return stepBuilderFactory.get("OutputViewItemWriterStep3") .<String, String>chunk(10) .reader(listViewItemRead()) .writer(outputViewItemWriter) .build(); } @Bean @StepScope public ListItemViewReader<String> listViewItemRead() { List<String> dataList=new ArrayList<>(); for(int i=0;i<100;i++) { dataList.add("my name is zhongqiujie"+i); } return new ListItemViewReader<String>(dataList); } }
ListItemViewReader
package com.dhcc.batch.batchDemo.output.outview; import java.util.Iterator; import java.util.List; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; @SuppressWarnings("hiding") public class ListItemViewReader<String> implements ItemReader<String>{ private final Iterator<String> iterator; public ListItemViewReader(List<String> data) { this.iterator = data.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (iterator.hasNext()) { return this.iterator.next(); } else { return null; } } }
OutputViewItemWriter implements
package com.dhcc.batch.batchDemo.output.outview;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
@Component("OutputViewItemWriter")
public class OutputViewItemWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println("writer chunk size is :" + items.size());
for (String item : items) {
System.out.println("writer data is:" + item);
}
}
}
運行結果:
二.將數據寫入到數據庫
1.在spring batch中為我們提供了許多將數據寫入到數據庫中的writer
(1)Neo4jItemWriter;
(2)MongoItemWriter;
..........
2.此處我們只學習JdbcBatchItemWriter
例:我們先在數據庫中建立數據表alipaytrando,結構如下:
接下來我們將項目中的springbatchtest2文件讀出並寫入到數據庫表alipaytrando中
Springbatchtest2文件結構如下:
開始寫代碼:
AlipayTranDo
package com.dhcc.batch.batchDemo.output.db.entity;
public class AlipayTranDo {
private String tranId;
private String channel;
private String tranType;
private String counterparty;
private String goods;
private String amount;
private String isDebitCredit;
private String state;
public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods,
String amount, String isDebitCredit, String state) {
super();
this.tranId = tranId;
this.channel = channel;
this.tranType = tranType;
this.counterparty = counterparty;
this.goods = goods;
this.amount = amount;
this.isDebitCredit = isDebitCredit;
this.state = state;
}
public String getTranId() {
return tranId;
}
public void setTranId(String tranId) {
this.tranId = tranId;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getTranType() {
return tranType;
}
public void setTranType(String tranType) {
this.tranType = tranType;
}
public String getCounterparty() {
return counterparty;
}
public void setCounterparty(String counterparty) {
this.counterparty = counterparty;
}
public String getGoods() {
return goods;
}
public void setGoods(String goods) {
this.goods = goods;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
public String getIsDebitCredit() {
return isDebitCredit;
}
public void setIsDebitCredit(String isDebitCredit) {
this.isDebitCredit = isDebitCredit;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@Override
public String toString() {
return "AlipayTranDO{" +
"tranId=‘" + tranId + ‘\‘‘ +
", channel=‘" + channel + ‘\‘‘ +
", tranType=‘" + tranType + ‘\‘‘ +
", counterparty=‘" + counterparty + ‘\‘‘ +
", goods=‘" + goods + ‘\‘‘ +
", amount=‘" + amount + ‘\‘‘ +
", isDebitCredit=‘" + isDebitCredit + ‘\‘‘ +
", state=‘" + state + ‘\‘‘ +
‘}‘;
}
}
AlipayTranDoFileMapper
package com.dhcc.batch.batchDemo.output.db.util;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
public class AlipayTranDoFileMapper implements FieldSetMapper<AlipayTranDo> {
@Override
public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {
return new AlipayTranDo(fieldSet.readString("tranId")
, fieldSet.readString("channel")
,fieldSet.readString("tranType")
, fieldSet.readString("counterparty")
, fieldSet.readString("goods")
,fieldSet.readString("amount")
, fieldSet.readString("isDebitCredit")
, fieldSet.readString("state")
);
}
}
OutputItemWriterDBApplication
package com.dhcc.batch.batchDemo.output.db.jdbcout;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterDBApplication {
public static void main(String[] args) {
SpringApplication.run(OutputItemWriterDBApplication.class, args);
}
}
*OutputItemWriterDBConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
@Configuration
public class OutputItemWriterDBConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("outputDBItemReader")
private ItemReader<? extends AlipayTranDo> outputDBItemReader;
@Autowired
@Qualifier("outputDBItemWriter")
private ItemWriter<? super AlipayTranDo> outputDBItemWriter;
@Autowired
private MyProcess myProcess;
@Bean
public Job OutputItemWriterDBJob2() {
return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build();
}
@Bean
public Step OutputItemWriterDBStep2() {
return stepBuilderFactory.get("OutputItemWriterDBStep2").<AlipayTranDo, AlipayTranDo>chunk(50)
.reader(outputDBItemReader)
.processor(myProcess)
.writer(outputDBItemWriter)
.build();
}
}
OutputItemWriterDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;
@Configuration
public class OutputItemWriterDBItemReaderConfiguration {
@Bean
public FlatFileItemReader<AlipayTranDo> outputDBItemReader(){
FlatFileItemReader<AlipayTranDo> reader=new FlatFileItemReader<AlipayTranDo>();
reader.setEncoding("UTF-8");
reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv"));
reader.setLinesToSkip(5);
DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
tokenizer.setNames(new String[]
{"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}
);
DefaultLineMapper<AlipayTranDo> lineMapper=new DefaultLineMapper<AlipayTranDo>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper());
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
}
MyProcess
package com.dhcc.batch.batchDemo.output.db.jdbcout;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
@Component
public class MyProcess implements ItemProcessor<AlipayTranDo, AlipayTranDo> {
@Override
public AlipayTranDo process(AlipayTranDo item) throws Exception {
System.out.println(item);
return item;
}
}
OutputItemWriterDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.db.jdbcout;
import javax.sql.DataSource;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
@Configuration
public class OutputItemWriterDBItemWriterConfiguration {
@Autowired
private DataSource dataSource;
@Bean
public JdbcBatchItemWriter<AlipayTranDo> outputDBItemWriter() {
System.out.println();
JdbcBatchItemWriter<AlipayTranDo> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql(
"insert into alipaytrando"
+ "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values"
+ "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) ");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<AlipayTranDo>());
return writer;
}
}
運行結果:
觀察控制臺可得我們的項目運行成功,接下來我們再到數據中觀察數據是否成功插入
發現表中數據已經插入成功
三.將數據寫入到普通文件中
1.FlatFileItemWriter可以將任何一個類型為T的對象數據寫入到普通文件中
2.例:我們將數據庫中的alipaytrando中的數據讀出並且寫入到普通文件中接下裏我們開始編寫代碼:
實體類AlipayTranDo與上一個例子一樣,我們不在重復展示
AlipayTranDoFileMapper
package com.dhcc.batch.batchDemo.output.flatfile;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
public class AlipayTranDoFileMapper implements RowMapper<AlipayTranDo> {
@Override
public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException {
return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"),
rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"),
rs.getString("isDebitCredit"), rs.getString("state"));
}
}
AlipayTranDoLineAggregator
package com.dhcc.batch.batchDemo.output.flatfile;
import org.springframework.batch.item.file.transform.LineAggregator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class AlipayTranDoLineAggregator implements LineAggregator<AlipayTranDo> {
//JSON
private ObjectMapper mapper=new ObjectMapper();
@Override
public String aggregate(AlipayTranDo alipayTranDo) {
try {
return mapper.writeValueAsString(alipayTranDo);
} catch (JsonProcessingException e) {
throw new RuntimeException("unable to writer...",e);
}
}
}
FlatFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlatFileOutputFromDBConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileOutputFromDBItemReader")
private ItemReader<? extends AlipayTranDo> flatFileOutputFromDBItemReader;
@Autowired
@Qualifier("flatFileOutputFromDBItemWriter")
private ItemWriter<? super AlipayTranDo> flatFileOutputFromDBItemWriter;
@Bean
public Job FlatFileOutputFromDBJob() {
return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build();
}
@Bean
public Step FlatFileOutputFromDBStep() {
return stepBuilderFactory.get("FlatFileOutputFromDBStep").<AlipayTranDo, AlipayTranDo>chunk(100)
.reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build();
}
}
FlatFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlatFileOutputFromDBItemReaderConfiguration {
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<AlipayTranDo> flatFileOutputFromDBItemReader() {
JdbcPagingItemReader<AlipayTranDo> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource); // 設置數據源
reader.setFetchSize(100); // 設置一次最大讀取條數
reader.setRowMapper(new AlipayTranDoFileMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 設置查詢的列
queryProvider.setFromClause("from alipaytrando"); // 設置要查詢的表
Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);// 設置排序列
return reader;
}
}
FlatFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.flatfile;
import java.io.File;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
public class FlatFileOutputFromDBItemWriterConfiguration {
@Bean
public FlatFileItemWriter<AlipayTranDo> flatFileOutputFromDBItemWriter(){
FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
try {
File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();
// String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath();
System.out.println("file is create in :"+path);
writer.setResource(new FileSystemResource(path));
writer.setLineAggregator(new AlipayTranDoLineAggregator());
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
OutputItemWriterFlatFileApplication
package com.dhcc.batch.batchDemo.output.flatfile;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterFlatFileApplication {
public static void main(String[] args) {
SpringApplication.run(OutputItemWriterFlatFileApplication.class, args);
}
}
運行結果:
控制臺顯示文件讀取寫入成功,我們根據文件地址,觀察寫入後的普通文件
四.將數據寫入到xml文件中
1.將數據寫入到xml文件中,我們必須用到StaxEventItemWriter;
2.我們也會用到XStreamMarshall來序列文件
例:我們將數據庫表alipaytrando中的數據寫入到本地磁盤中
代碼(此處我們只展示writer,用來寫入的類,其他的均與上一個例子相同):
XMLFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.xmlfile;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
@Configuration
public class XMLFileOutputFromDBItemWriterConfiguration {
@Bean
public StaxEventItemWriter<AlipayTranDo> xmlFileOutputFromDBItemWriter() throws Exception {
XStreamMarshaller marshaller = new XStreamMarshaller();
@SuppressWarnings("rawtypes")
Map<String, Class> aliases = new HashMap<>();
aliases.put("alipayTranDo", AlipayTranDo.class);
marshaller.setAliases(aliases);
StaxEventItemWriter<AlipayTranDo> writer = new StaxEventItemWriter<>();
writer.setRootTagName("alipaytrandos");
writer.setMarshaller(marshaller);
File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile();
System.out.println("file is create in :" + path);
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
}
運行結果:
根據地址觀察寫入後的xml文件
五.將數據寫入到多文件
1.將數據寫入多個文件,我們使用CompositItemWriter<T>或者使用ClassifierCompositItemWriter<T>
2.例(1):我們將數據表alipaytrandao中的數據分別寫入到xml文件和json文件中
此處我們只展示writer(其余代碼與上例相同):
mutipleFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.composit;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {
@Bean
public FlatFileItemWriter<AlipayTranDo> jsonFileItemWriter(){
FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
try {
File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();
// String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath();
System.out.println("file is create in :"+path);
writer.setResource(new FileSystemResource(path));
writer.setLineAggregator(new AlipayTranDoLineAggregator());
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
@Bean
public StaxEventItemWriter<AlipayTranDo> xmlFileItemWriter() throws Exception{
XStreamMarshaller marshaller=new XStreamMarshaller();
@SuppressWarnings("rawtypes")
Map<String, Class> aliases=new HashMap<>();
aliases.put("alipayTranDo", AlipayTranDo.class);
marshaller.setAliases(aliases);
StaxEventItemWriter<AlipayTranDo> writer=new StaxEventItemWriter<>();
writer.setRootTagName("alipaytrandos");
writer.setMarshaller(marshaller);
File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile();
System.out.println("file is create in :"+path);
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
@Bean
public CompositeItemWriter<AlipayTranDo> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
CompositeItemWriter<AlipayTranDo> itemWriter=new CompositeItemWriter<>();
itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
運行結果:
觀察文件:
Json:
Xml:
3.例(2):我們將同一個文件進行分類寫入:
首先我們觀察數據庫表person_buf的數據結構(數據總數是10001):
我們的目標是將數據從數據庫讀出按照id的奇偶分別寫入不同類型的文件中
接下來上代碼:
Person
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import java.util.Date;
public class Person {
private Integer id;
private String name;
private String perDesc;
private Date createTime;
private Date updateTime;
private String sex;
private Float score;
private Double price;
public Person() {
super();
}
public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
Double price) {
super();
this.id = id;
this.name = name;
this.perDesc = perDesc;
this.createTime = createTime;
this.updateTime = updateTime;
this.sex = sex;
this.score = score;
this.price = price;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getCreateTime() {
return createTime;
}
public String getPerDesc() {
return perDesc;
}
public void setPerDesc(String perDesc) {
this.perDesc = perDesc;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public Float getScore() {
return score;
}
public void setScore(Float score) {
this.score = score;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
+ updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
}
}
PersonLineAggregator
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import org.springframework.batch.item.file.transform.LineAggregator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class PersonLineAggregator implements LineAggregator<Person> {
//JSON
private ObjectMapper mapper=new ObjectMapper();
@Override
public String aggregate(Person person) {
try {
return mapper.writeValueAsString(person);
} catch (JsonProcessingException e) {
throw new RuntimeException("unable to writer...",e);
}
}
}
PersonRowMapper
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
/**
* 實現將數據庫中的每條數據映射到Person對象中
* @author Administrator
*
*/
public class PersonRowMapper implements RowMapper<Person> {
/**
* rs一條結果集,rowNum代表當前行
*/
@Override
public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
return new Person(rs.getInt("id")
,rs.getString("name")
,rs.getString("per_desc")
,rs.getDate("create_time")
,rs.getDate("update_time")
,rs.getString("sex")
,rs.getFloat("score")
,rs.getDouble("price"));
}
}
OutputItemWriterMutipleClassFileApplication
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterMutipleClassFileApplication {
public static void main(String[] args) {
SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args);
}
}
ClassifierMutipleFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ClassifierMutipleFileOutputFromDBConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("mutipleFileOutputFromDBItemReader")
private ItemReader<? extends Person> mutipleFileOutputFromDBItemReader;
@Autowired
@Qualifier("alipayTranDoFileOutputFromDBItemWriter")
private ItemWriter<? super Person> alipayTranDoFileOutputFromDBItemWriter;
@Autowired
@Qualifier("jsonFileItemWriter")
private ItemStream jsonFileItemWriter;
@Autowired
@Qualifier("xmlFileItemWriter")
private ItemStream xmlFileItemWriter;
@Bean
public Job mutipleFileOutputFromDBJob1() {
return jobBuilderFactory.get("mutipleFileOutputFromDBJob1")
.start(mutipleFileOutputFromDBStep1())
.build();
}
@Bean
public Step mutipleFileOutputFromDBStep1() {
return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").<Person, Person>chunk(100)
.reader(mutipleFileOutputFromDBItemReader)
.writer(alipayTranDoFileOutputFromDBItemWriter)
.stream(jsonFileItemWriter)
.stream(xmlFileItemWriter)
.build();
}
}
mutipleFileOutputFromDBItemReaderConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class mutipleFileOutputFromDBItemReaderConfiguration {
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Person> mutipleFileOutputFromDBItemReader() {
JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource); // 設置數據源
reader.setFetchSize(100); // 設置一次最大讀取條數
reader.setRowMapper(new PersonRowMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 設置查詢的列
queryProvider.setFromClause("from person_buf"); // 設置要查詢的表
Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
sortKeys.put("id", Order.ASCENDING);// 按照升序排序
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);// 設置排序列
return reader;
}
}
mutipleFileOutputFromDBItemWriterConfiguration
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {
@Bean
public FlatFileItemWriter<Person> jsonFileItemWriter(){
FlatFileItemWriter<Person> writer=new FlatFileItemWriter<Person>();
try {
File path=new File("D:"+File.separator+"person.json").getAbsoluteFile();
System.out.println("file is create in :"+path);
writer.setResource(new FileSystemResource(path));
writer.setLineAggregator(new PersonLineAggregator());
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
@Bean
public StaxEventItemWriter<Person> xmlFileItemWriter() throws Exception{
XStreamMarshaller marshaller=new XStreamMarshaller();
@SuppressWarnings("rawtypes")
Map<String, Class> aliases=new HashMap<>();
aliases.put("person", Person.class);
marshaller.setAliases(aliases);
StaxEventItemWriter<Person> writer=new StaxEventItemWriter<>();
writer.setRootTagName("persons");
writer.setMarshaller(marshaller);
File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile();
System.out.println("file is create in :"+path);
writer.setResource(new FileSystemResource(path));
writer.afterPropertiesSet();
return writer;
}
@Bean
public ClassifierCompositeItemWriter<Person> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
ClassifierCompositeItemWriter<Person> itemWriter=new ClassifierCompositeItemWriter<Person>();
itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter()));
return itemWriter;
}
}
MyWriterClassifier
package com.dhcc.batch.batchDemo.output.mutiple.classifier;
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
public class MyWriterClassifier implements Classifier<Person, ItemWriter<? super Person>> {
private ItemWriter<Person> jsonWriter;
private ItemWriter<Person> xmlWriter;
/**
*
*/
private static final long serialVersionUID = -2911015707834323846L;
public MyWriterClassifier(ItemWriter<Person> jsonWriter, ItemWriter<Person> xmlWriter) {
this.jsonWriter = jsonWriter;
this.xmlWriter = xmlWriter;
}
@Override
public ItemWriter<? super Person> classify(Person classifiable) {
if (classifiable.getId()%2==0) {
return jsonWriter;
}else {
return xmlWriter;
}
}
}
運行結果:
觀察文件:
Person.json:(我們可以看出id為偶數的都寫在了json文件中)
Person.xml:(我們可以看出id為奇數的都寫在了xml文件中)
Spring-batch(ItemWriter)數據寫入數據庫,普通文件,xml文件,多文件分類寫入