SpringXD 自定義Job模組開發
阿新 • • 發佈:2018-12-23
SpringXD中的Job實際即為Spring Batch中的Job,因此我們先按照Spring Batch的規範開發一個簡單的Job。
專案依賴:
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId> slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
實際這裡無需配置spring-batch的依賴,因為會在springxd的parent pom中宣告。而springxd的parent一般我們都會宣告的。即:
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<!-- 1.1.x or later -->
<version>1.3.1.RELEASE</version>
</parent>
Job開發:
package cn.rongcapital.springxd.job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author li.hzh
* @date 2016-11-01 14:03
*/
@Configuration
public class HelloWorldJob {
private static Logger logger = LoggerFactory.getLogger(HelloWorldJob.class);
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean(name = "helloworldJob")
public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
return jobs.get("myJob").start(step1).next(step2).build();
}
@Bean
protected Step step1() {
return steps.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
logger.info("Step One");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
protected Step step2() {
return steps.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
logger.info("Step Two");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
簡單的不能再簡單了。就是兩個步驟,一個列印Step One Hello,一個列印Step Two World。本地執行確認可以正常執行。
配置SpringXD配置檔案 即使是使用JavaConfig的方式開發,也需要配置一個properties檔案,宣告jobClass的base_package,例如:
base_packages=org.springframework.springxd.samples.batch
究其原因,可參見原始碼:
/**
* Create a simple module based on the provided {@link ModuleDescriptor}, {@link ModuleOptions}, and {@link ModuleDeploymentProperties}.
*
* @param moduleDescriptor descriptor for the composed module
* @param moduleOptions module options for the composed module
* @param deploymentProperties deployment related properties for the composed module
* @return new simple module instance
*/
private Module createSimpleModule(ModuleDescriptor moduleDescriptor, ModuleOptions moduleOptions,
ModuleDeploymentProperties deploymentProperties) {
if (log.isInfoEnabled()) {
log.info("creating simple module " + moduleDescriptor);
}
SimpleModuleDefinition definition = (SimpleModuleDefinition) moduleDescriptor.getModuleDefinition();
ClassLoader moduleClassLoader = ModuleUtils.createModuleRuntimeClassLoader(definition, moduleOptions, this.parentClassLoader);
Class<? extends SimpleModule> moduleClass = determineModuleClass((SimpleModuleDefinition) moduleDescriptor.getModuleDefinition(),
moduleOptions);
Assert.notNull(moduleClass,
String.format("Required module artifacts are either missing or invalid. Unable to determine module type for module definition: '%s:%s'.",
moduleDescriptor.getType(), moduleDescriptor.getModuleName()));
return SimpleModuleCreator
.createModule(moduleDescriptor, deploymentProperties, moduleClassLoader, moduleOptions, moduleClass);
}
在createSimpleModule方法中需要獲取moduleClass,取不到會報錯。而獲取的方式是,
private Class<? extends SimpleModule> determineModuleClass(SimpleModuleDefinition moduleDefinition,
ModuleOptions moduleOptions) {
String name = (String) moduleOptions.asPropertySource().getProperty(MODULE_EXECUTION_FRAMEWORK_KEY);
if ("spark".equals(name)) {
return NonBindingResourceConfiguredModule.class;
}
else if (ModuleUtils.resourceBasedConfigurationFile(moduleDefinition) != null) {
return ResourceConfiguredModule.class;
}
else if (JavaConfiguredModule.basePackages(moduleDefinition).length > 0) {
return JavaConfiguredModule.class;
}
return null;
}
最後一個else if裡,可見JavaConfiguredModule會需要查詢basePackages屬性的。
public static String[] basePackages(SimpleModuleDefinition moduleDefinition) {
Properties properties = ModuleUtils.loadModuleProperties(moduleDefinition);
//Assert.notNull(propertiesFile, "required module properties not found.");
if (properties == null) {
return new String[0];
}
String basePackageNames = properties.getProperty(BASE_PACKAGES);
return StringUtils.commaDelimitedListToStringArray(basePackageNames);
}
而這個屬性是從properties配置檔案中查詢的
/**
* Return a resource that can be used to load the module '.properties' file (containing <i>e.g.</i> information
* about module options, or null if no such file exists.
*/
public static Resource modulePropertiesFile(SimpleModuleDefinition definition) {
return ModuleUtils.locateModuleResource(definition, ".properties");
}
進入打包SpringXD Module的環節。
打包SpringXD Module pom配置
<parent>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<!-- 1.1.x or later -->
<version>1.3.1.RELEASE</version>
</parent>
需要制定springxd的parent裡,因為其中配置了maven build所需的外掛。通過命令打包:
mvn package
然後上傳到SpringXD,進入xd-shell
bin/xd-shell
xd:>module upload --type job --name helloworld --file /data/dps-springxd-job-helloworld-1.0.0.BUILD-SNAPSHOT.jar
Successfully uploaded module 'job:helloworld'
執行Job
xd:>job create --name helloworldJob --definition "helloworld" --deploy
Successfully created and deployed job 'helloworldJob'
xd:>job launch helloworldJob
Successfully submitted launch request for job 'helloworldJob'
注:在筆者的分散式環境中,通過module upload後,沒有自動分發到container節點上,臨時通過手動拷貝完成。此問題待排查。