1. 程式人生 > >spring-batch+quartz的示例

spring-batch+quartz的示例

   spring-batch是一個為批量處理面生的輕量級框架,關於它的描述不多說了,網上挺多的。下面要說的是一個spring-batch+quartz的一示例,完成的功能是從mysql中定時取資料,然後處理資料,最後再寫入到mysql的另一個表裡。

 pom檔案如下,所需要的資源全在裡面

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.batch.sample</groupId>
  <artifactId>batchSample001</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>batchSample001</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>3.0.6.RELEASE</version>
    </dependency>    
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-orm</artifactId>
        <version>4.2.4.RELEASE</version>
    </dependency>   
    
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
    </dependency>   
     
<dependency>  
    <groupId>org.slf4j</groupId>  
    <artifactId>slf4j-log4j12</artifactId>  
    <version>1.7.2</version>  
</dependency> 

	<dependency>
		<groupId>org.quartz-scheduler</groupId>
		<artifactId>quartz</artifactId>
		<version>1.8.6</version>
	</dependency>
	
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context-support</artifactId>
		<version>4.0.5.RELEASE</version>
	</dependency>
	
  </dependencies>
</project>

spring-batch+quartz的配置檔案如下:

<beans
	xmlns="http://www.springframework.org/schema/beans"	
	xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	http://www.springframework.org/schema/batch
	http://www.springframework.org/schema/batch/spring-batch.xsd
	http://www.springframework.org/schema/jdbc
	http://www.springframework.org/schema/jdbc/spring-jdbc.xsd"
	>
	
	
	<batch:job id="billingJob" restartable="true">
		<batch:step id="billingStep">
			<tasklet task-executor="taskExecutor" throttle-limit="50"  start-limit="3">
				<chunk reader="userDbReader" processor="billingProcessor"
					writer="billDbWriter" commit-interval="200" chunk-completion-policy="">
				</chunk>
			</tasklet>
		</batch:step>
	</batch:job>
	
	  <!-- run every 10 seconds -->
  <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
	<property name="triggers">
	  <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
		<property name="jobDetail" ref="jobDetail" />
		<property name="cronExpression" value="0 0/2 * * * ?" />
	  </bean>
	</property>
  </bean>
	
	
<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
	<property name="jobClass" value="org.springframework.batch.quartz.JobQuartz" />
	<property name="group" value="quartz-batch" />
	<property name="jobDataAsMap">
	  <map>
		<entry key="jobName" value-ref="billingJob" />
		<entry key="jobRepository" value-ref="jobRepository" />
	  </map>
	</property>
  </bean>
  
	
	
	<bean id="taskExecutor"
		class="org.springframework.core.task.SimpleAsyncTaskExecutor">
	</bean>

		<bean id="placeholderProperties" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:batch-${ENVIRONMENT:hsql}.properties</value>
			</list>
		</property>
		<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
		<property name="ignoreResourceNotFound" value="true" />
		<property name="ignoreUnresolvablePlaceholders" value="false" />
		<property name="order" value="1" />
		</bean>
	
		<!--  Initialise the database if enabled: 
	<jdbc:initialize-database data-source="dataSource" enabled="${batch.data.source.init}" ignore-failures="DROPS">
		<jdbc:script location="${batch.drop.script}"/>
		<jdbc:script location="${batch.schema.script}"/>
	</jdbc:initialize-database>
	-->

	
	<bean id="dataSource"
		class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="com.mysql.jdbc.Driver" />
		<property name="url"
			value="jdbc:mysql://localhost:3306/springbatch" />
		<property name="username" value="root" />
		<property name="password" value="root" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
		<property name="dataSource" ref="dataSource" />
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<!-- processors -->

	<bean id="billingProcessor"
		class="org.springframework.batch.sample.BillingItemProcessor"></bean>
	<!-- readers -->
	<bean id="userDbReader"
		class="org.springframework.batch.item.database.JdbcPagingItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="rowMapper" ref="userDbMapper" />
		<property name="queryProvider" ref="userQueryProvider" />
	</bean>
	<bean id="userDbMapper"
		class="org.springframework.batch.sample.UserDbMapper" />

	<bean id="userQueryProvider"
		class="org.springframework.batch.item.database.support.MySqlPagingQueryProvider">
		<property name="selectClause" value="u.id,u.name,u.age,u.balance" />
		<property name="fromClause" value="users u" />
		<property name="sortKeys">
        			<map>
        				<entry key="u.id" value="ASCENDING"/>
        				<entry key="u.name" value="DESCENDING"/>
        			</map>
        </property>
	</bean>


	<bean id="billDbWriter"
		class="org.springframework.batch.item.database.JdbcBatchItemWriter">
		<property name="dataSource" ref="dataSource" />
		<property name="sql"
			value="insert into bills(id,user_id,fees,paid_fees,unpaid_fees,pay_status) values(:id,:user.id,:fees,:paidFees,:unpaidFees,:payStatus)" />
		<property name="itemSqlParameterSourceProvider"
			ref="itemSqlParameterSourceProvider" />
	</bean>
	

	<bean id="itemSqlParameterSourceProvider"
		class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</beans>

quartz的時間呼叫實現如下:
package org.springframework.batch.quartz;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.sample.BillingItemProcessor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class JobQuartz extends QuartzJobBean{
	public static final String RUN_MONTH_KEY = "run.month";
	@Override
	protected void executeInternal(JobExecutionContext paramJobExecutionContext)
			throws JobExecutionException {
		
		 Logger logger = LoggerFactory.getLogger(BillingItemProcessor.class);
		 logger.info("beginning");	
			
			SimpleJobLauncher launcher = new SimpleJobLauncher();
			Map<String, Object> jobDataMap = paramJobExecutionContext.getMergedJobDataMap();
			launcher.setJobRepository((JobRepository) jobDataMap.get("jobRepository"));
			launcher.setTaskExecutor(new SyncTaskExecutor());
			try {
				Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
				parameters.put(RUN_MONTH_KEY, new JobParameter("2011-2"+System.currentTimeMillis()));
				
				Long startTime=System.currentTimeMillis();
				
				JobExecution je = launcher.run((Job) jobDataMap.get("jobName"),
						new JobParameters(parameters));
				System.out.println("使用時間:"+(System.currentTimeMillis()-startTime));
				
				//System.out.println(je);
				//System.out.println(je.getJobInstance());
				//System.out.println(je.getStepExecutions());
			} catch (Exception e) {
				e.printStackTrace();
			}		
			 System.out.println("endding");
	}

}
呼叫的程式碼如下:
	public static void main(String[] args) {
	
	 Logger logger = LoggerFactory.getLogger(BillingItemProcessor.class);
	 logger.info("aaaaaaaaaaaaaaa");
		
		System.setProperty("ENVIRONMENT", "mysql");
		
		ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("parallel.xml");
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}		

	}

原始碼下載地址:http://download.csdn.net/detail/riapgypm/9404081