spring-batch (ItemProcessor) 數據處理過程
阿新 • • 發佈:2018-10-12
scope 完成 delegate lex super figure rri tin 方法 Spring-batch學習總結(五)
學習目標:掌握ItemProcessor
1.ItemProcessor:spring-batch中數據處理的過程
2.ItemProcessor主要用於實現業務邏輯,驗證,過濾,等
3.Spring-batch為我們提供ItemProcessor<I,O>這個接口,它包含一個方法O process(I item
4.我們用代碼進行演示:
例:我們讀取數據庫表person_buf中的數據,將其id為奇數的數據剔除,將讀出name進行字母大寫轉換
首先觀察數據庫表數據結構:
學習目標:掌握ItemProcessor
1.ItemProcessor:spring-batch中數據處理的過程
2.ItemProcessor主要用於實現業務邏輯,驗證,過濾,等
3.Spring-batch為我們提供ItemProcessor<I,O>這個接口,它包含一個方法O process(I item
4.我們用代碼進行演示:
例:我們讀取數據庫表person_buf中的數據,將其id為奇數的數據剔除,將讀出name進行字母大寫轉換
首先觀察數據庫表數據結構:
代碼:
Person
package com.dhcc.batch.batchDemo.processor; import java.util.Date; public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; } }
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor; import org.springframework.batch.item.file.transform.LineAggregator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; public class PersonLineAggregator implements LineAggregator<Person> { //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } } }
PersonRowMapper
package com.dhcc.batch.batchDemo.processor; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; /** * 實現將數據庫中的每條數據映射到Person對象中 * @author Administrator * */ public class PersonRowMapper implements RowMapper<Person> { /** * rs一條結果集,rowNum代表當前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); } }
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class ProcessorFileApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessorFileApplication.class, args);
}
}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
public class ProcessorFileOutputFromDBConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private ItemProcessor<Person, Person> fristNameUpperCaseProcessor;
@Autowired
private ItemProcessor<Person, Person> idFilterProcessor;
@Bean
public Job ProcessorFileOutputFromDBJob() {
return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")
.start(ProcessorFileOutputFromDBStep())
.build();
}
@Bean
public Step ProcessorFileOutputFromDBStep() {
return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")
.<Person, Person>chunk(100)
.reader(ProcessorFileOutputFromItemWriter())
.processor(personDataProcessor())
.writer(ProcessorFileOutputFromItemReader())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Person> ProcessorFileOutputFromItemWriter() {
JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource); // 設置數據源
reader.setFetchSize(100); // 設置一次最大讀取條數
reader.setRowMapper(new PersonRowMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 設置查詢的列
queryProvider.setFromClause("from person_buf"); // 設置要查詢的表
Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
sortKeys.put("id", Order.ASCENDING);// 按照升序排序
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);// 設置排序列
return reader;
}
@Bean
public CompositeItemProcessor<Person, Person> personDataProcessor(){
CompositeItemProcessor<Person, Person> processor=new CompositeItemProcessor<>();
List<ItemProcessor<Person, Person>> listProcessor=new ArrayList<>();
listProcessor.add(fristNameUpperCaseProcessor);
listProcessor.add(idFilterProcessor);
processor.setDelegates(listProcessor);
return processor;
}
@Bean
@StepScope
public FlatFileItemWriter<Person> ProcessorFileOutputFromItemReader() {
FlatFileItemWriter<Person> writer = new FlatFileItemWriter<Person>();
try {
File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();
System.out.println("file is create in :" + path);
writer.setResource(new FileSystemResource(path));
writer.setLineAggregator(new PersonLineAggregator());
writer.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return writer;
}
}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class FristNameUpperCaseProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws Exception {
return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),
item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());
}
}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
@Component
public class IdFilterProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person item) throws Exception {
if (item.getId() % 2 == 0) {
return item;
} else {
return null;
}
}
}
運行結果:
觀察寫入完成後的文件:
可以看出我們已經完成了我們的目標
spring-batch (ItemProcessor) 數據處理過程