1. 程式人生 > 程式設計 >Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解

Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解

1 前言

Spring Batch遠端分割槽對於大量資料的處理非常擅長,它的實現有多種方式,如本地Jar包模式MQ模式Kubernetes模式。這三種模式的如下:

(1)本地Jar包模式:分割槽處理的worker為一個Java程序,從jar包啟動,通過jvm引數和資料庫傳遞引數;官方提供示例程式碼。

(2)MQ模式worker是一個常駐程序,ManagerWorker通過訊息佇列來傳遞引數;網上有不少相關示例程式碼。

(3)Kubernetes模式workerK8s中的PodManager直接啟動Pod來處理;網上並沒有找到任何示例程式碼。

本文將通過程式碼來講解第一種模式(本地Jar包模式

),其它後續再介紹。

Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解

建議先看下面文章瞭解一下:

Spring Batch入門:Spring Batch入門教程篇

Spring Batch並行處理介紹:詳解SpringBoot和SpringBatch 使用

2 程式碼講解

本文程式碼中,ManagerWorker是放在一起的,在同一個專案裡,也只會打一個jar包而已;我們通過profile來區別是manager還是worker,也就是通過Spring Profile實現一份程式碼,兩份邏輯。實際上也可以拆成兩份程式碼,但放一起更方便測試,而且程式碼量不大,就沒有必要了。

2.1 專案準備

2.1.1 資料庫

首先我們需要準備一個數據庫,因為Manager

Worker都需要同步狀態到DB上,不能直接使用嵌入式的記憶體資料庫了,需要一個外部可共同訪問的資料庫。這裡我使用的是H2 Database,安裝可參考:把H2資料庫從jar包部署到Kubernetes,並解決Ingress不支援TCP的問題。

2.1.2 引入依賴

maven引入依賴如下所示:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
 <groupId>com.h2database</groupId>
 <artifactId>h2</artifactId>
 <scope>runtime</scope>
</dependency>

<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-deployer-local</artifactId>
 <version>2.4.1</version>
</dependency>

<dependency>
 <groupId>org.springframework.batch</groupId>
 <artifactId>spring-batch-integration</artifactId>
</dependency>

spring-cloud-deployer-local用於部署和啟動worker,非常關鍵;其它就是Spring BatchTask相關的依賴;以及資料庫連線。

2.1.3 主類入口

Springboot的主類入口如下:

@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {
 public static void main(String[] args) {
 SpringApplication.run(PkslowRemotePartitionJar.class,args);
 }
}

Springboot的基礎上,添加了Spring BatchSpring Cloud Task的支援。

2.2 關鍵程式碼編寫

前面的資料庫搭建和其它程式碼沒有太多可講的,接下來就開始關鍵程式碼的編寫。

2.2.1 分割槽管理Partitioner

Partitioner是遠端分割槽中的核心bean,它定義了分成多少個區、怎麼分割槽,要把什麼變數傳遞給worker。它會返回一組<分割槽名,執行上下文>的鍵值對,即返回Map<String,ExecutionContext>。把要傳遞給worker的變數放在ExecutionContext中去,支援多種型別的變數,如Stringintlong等。實際上,我們不建議通過ExecutionContext來傳遞太多資料;可以傳遞一些標識或主鍵,然後worker自己去拿資料即可。

具體程式碼如下:

private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {
 return new Partitioner() {
 @Override
 public Map<String,ExecutionContext> partition(int gridSize) {

 Map<String,ExecutionContext> partitions = new HashMap<>(gridSize);

 for (int i = 0; i < GRID_SIZE; i++) {
 ExecutionContext executionContext = new ExecutionContext();
 executionContext.put("partitionNumber",i);
 partitions.put("partition" + i,executionContext);
 }

 return partitions;
 }
 };
}

上面分成4個區,程式會啟動4個worker來處理;給worker傳遞的引數是partitionNumber

2.2.2 分割槽處理器PartitionHandler

PartitionHandler也是核心的bean,它決定了怎麼去啟動worker,給它們傳遞什麼jvm引數(跟之前的ExecutionContext傳遞不一樣)。

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,JobExplorer jobExplorer,TaskRepository taskRepository) throws Exception {

 Resource resource = this.resourceLoader.getResource(workerResource);

 DeployerPartitionHandler partitionHandler =
 new DeployerPartitionHandler(taskLauncher,jobExplorer,resource,"workerStep",taskRepository);

 List<String> commandLineArgs = new ArrayList<>(3);
 commandLineArgs.add("--spring.profiles.active=worker");
 commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
 commandLineArgs.add("--spring.batch.initializer.enabled=false");

 partitionHandler
 .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
 partitionHandler
 .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
 partitionHandler.setMaxWorkers(2);
 partitionHandler.setApplicationName("PkslowWorkerJob");

 return partitionHandler;
}

上面程式碼中:

resourceworkerjar包地址,表示將啟動該程式;

workerStepworker將要執行的step

commandLineArgs定義了啟動workerjvm引數,如--spring.profiles.active=worker

environmentmanager的系統環境變數,可以傳遞給worker,當然也可以選擇不傳遞;

MaxWorkers是最多能同時啟動多少個worker,類似於執行緒池大小;設定為2,表示最多同時有2個worker來處理4個分割槽。

2.2.3 Manager和Worker的Batch定義

完成了分割槽相關的程式碼,剩下的就只是如何定義ManagerWorker的業務程式碼了。

Manager作為管理者,不用太多業務邏輯,程式碼如下:

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
 Random random = new Random();
 return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
 .start(step1(partitionHandler))
 .build();
}

@Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {
 return this.stepBuilderFactory.get("step1")
 .partitioner(workerStep().getName(),partitioner())
 .step(workerStep())
 .partitionHandler(partitionHandler)
 .build();
}

Worker主要作用是處理資料,是我們的業務程式碼,這裡就演示一下如何獲取Manager傳遞過來的partitionNumber

@Bean
public Step workerStep() {
 return this.stepBuilderFactory.get("workerStep")
 .tasklet(workerTasklet(null,null))
 .build();
}

@Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
 return new Tasklet() {
 @Override
 public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
 Thread.sleep(6000); //增加延時,檢視效果,通過jps:在jar情況下會新起java程序
 System.out.println("This tasklet ran partition: " + partitionNumber);
 
 return RepeatStatus.FINISHED;
 }
 };
}

通過表示式@Value("#{stepExecutionContext['partitionNumber']}") 獲取Manager傳遞過來的變數;注意要加註解@StepScope

3 程式執行

因為我們分為ManagerWorker,但都是同一份程式碼,所以我們先打包一個jar出來,不然manager無法啟動。配置資料庫和Workerjar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h2.Driver

pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

執行程式如下:

Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解

可以看到啟動了4次Java程式,還給出日誌路徑。

通過jps命令檢視,能看到一個Manager程序,還有兩個worker程序:

Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解

4 複雜變數傳遞

前面講了Manager可以通過ExecutionContext傳遞變數,如簡單的Stringlong等。但其實它也是可以傳遞複雜的Java物件的,但對應的類需要可序列化,如:

import java.io.Serializable;

public class Person implements Serializable {
 private Integer age;
 private String name;
 private String webSite;
 //getter and setter
}

Manager傳遞:

executionContext.put("person",new Person(0,"pkslow","www.pkslow.com"));

Worker接收:

@Value("#{stepExecutionContext['person']}") Person person

5 總結

本文介紹了Spring Batch遠端分割槽的本地Jar包模式,只能在一臺機器上執行,所以也是無法真正發揮出遠端分割槽的作用。但它對我們後續理解更復雜的模式是有很大幫助的;同時,我們也可以使用本地模式進行開發測試,畢竟它只需要一個數據庫就行了,依賴很少。