Spring Batch遠端分割槽的本地Jar包模式的程式碼詳解
1 前言
Spring Batch
遠端分割槽對於大量資料的處理非常擅長,它的實現有多種方式,如本地Jar包模式
、MQ模式
、Kubernetes模式
。這三種模式的如下:
(1)本地Jar包模式
:分割槽處理的worker
為一個Java程序
,從jar
包啟動,通過jvm
引數和資料庫傳遞引數;官方提供示例程式碼。
(2)MQ模式
:worker
是一個常駐程序,Manager
和Worker
通過訊息佇列來傳遞引數;網上有不少相關示例程式碼。
(3)Kubernetes模式
:worker
為K8s
中的Pod
,Manager
直接啟動Pod
來處理;網上並沒有找到任何示例程式碼。
本文將通過程式碼來講解第一種模式(本地Jar包模式
建議先看下面文章瞭解一下:
Spring Batch入門:Spring Batch入門教程篇
Spring Batch並行處理介紹:詳解SpringBoot和SpringBatch 使用
2 程式碼講解
本文程式碼中,Manager
和Worker
是放在一起的,在同一個專案裡,也只會打一個jar
包而已;我們通過profile
來區別是manager
還是worker
,也就是通過Spring Profile
實現一份程式碼,兩份邏輯。實際上也可以拆成兩份程式碼,但放一起更方便測試,而且程式碼量不大,就沒有必要了。
2.1 專案準備
2.1.1 資料庫
首先我們需要準備一個數據庫,因為Manager
Worker
都需要同步狀態到DB
上,不能直接使用嵌入式的記憶體資料庫了,需要一個外部可共同訪問的資料庫。這裡我使用的是H2 Database
,安裝可參考:把H2資料庫從jar包部署到Kubernetes,並解決Ingress不支援TCP的問題。
2.1.2 引入依賴
maven
引入依賴如下所示:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-task</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-deployer-local</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId> </dependency>
spring-cloud-deployer-local
用於部署和啟動worker
,非常關鍵;其它就是Spring Batch
和Task
相關的依賴;以及資料庫連線。
2.1.3 主類入口
Springboot
的主類入口如下:
@EnableTask @SpringBootApplication @EnableBatchProcessing public class PkslowRemotePartitionJar { public static void main(String[] args) { SpringApplication.run(PkslowRemotePartitionJar.class,args); } }
在Springboot
的基礎上,添加了Spring Batch
和Spring Cloud Task
的支援。
2.2 關鍵程式碼編寫
前面的資料庫搭建和其它程式碼沒有太多可講的,接下來就開始關鍵程式碼的編寫。
2.2.1 分割槽管理Partitioner
Partitioner
是遠端分割槽中的核心bean
,它定義了分成多少個區、怎麼分割槽,要把什麼變數傳遞給worker
。它會返回一組<分割槽名,執行上下文>的鍵值對,即返回Map<String,ExecutionContext>
。把要傳遞給worker
的變數放在ExecutionContext
中去,支援多種型別的變數,如String
、int
、long
等。實際上,我們不建議通過ExecutionContext
來傳遞太多資料;可以傳遞一些標識或主鍵,然後worker
自己去拿資料即可。
具體程式碼如下:
private static final int GRID_SIZE = 4; @Bean public Partitioner partitioner() { return new Partitioner() { @Override public Map<String,ExecutionContext> partition(int gridSize) { Map<String,ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put("partitionNumber",i); partitions.put("partition" + i,executionContext); } return partitions; } }; }
上面分成4個區,程式會啟動4個worker
來處理;給worker
傳遞的引數是partitionNumber
。
2.2.2 分割槽處理器PartitionHandler
PartitionHandler
也是核心的bean
,它決定了怎麼去啟動worker
,給它們傳遞什麼jvm
引數(跟之前的ExecutionContext
傳遞不一樣)。
@Bean public PartitionHandler partitionHandler(TaskLauncher taskLauncher,JobExplorer jobExplorer,TaskRepository taskRepository) throws Exception { Resource resource = this.resourceLoader.getResource(workerResource); DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher,jobExplorer,resource,"workerStep",taskRepository); List<String> commandLineArgs = new ArrayList<>(3); commandLineArgs.add("--spring.profiles.active=worker"); commandLineArgs.add("--spring.cloud.task.initialize-enabled=false"); commandLineArgs.add("--spring.batch.initializer.enabled=false"); partitionHandler .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs)); partitionHandler .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(2); partitionHandler.setApplicationName("PkslowWorkerJob"); return partitionHandler; }
上面程式碼中:
resource
是worker
的jar
包地址,表示將啟動該程式;
workerStep
是worker
將要執行的step
;
commandLineArgs
定義了啟動worker
的jvm
引數,如--spring.profiles.active=worker
;
environment
是manager
的系統環境變數,可以傳遞給worker
,當然也可以選擇不傳遞;
MaxWorkers
是最多能同時啟動多少個worker
,類似於執行緒池大小;設定為2,表示最多同時有2個worker
來處理4個分割槽。
2.2.3 Manager和Worker的Batch定義
完成了分割槽相關的程式碼,剩下的就只是如何定義Manager
和Worker
的業務程式碼了。
Manager
作為管理者,不用太多業務邏輯,程式碼如下:
@Bean @Profile("!worker") public Job partitionedJob(PartitionHandler partitionHandler) throws Exception { Random random = new Random(); return this.jobBuilderFactory.get("partitionedJob" + random.nextInt()) .start(step1(partitionHandler)) .build(); } @Bean public Step step1(PartitionHandler partitionHandler) throws Exception { return this.stepBuilderFactory.get("step1") .partitioner(workerStep().getName(),partitioner()) .step(workerStep()) .partitionHandler(partitionHandler) .build(); }
Worker
主要作用是處理資料,是我們的業務程式碼,這裡就演示一下如何獲取Manager
傳遞過來的partitionNumber
:
@Bean public Step workerStep() { return this.stepBuilderFactory.get("workerStep") .tasklet(workerTasklet(null,null)) .build(); } @Bean @StepScope public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) { return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception { Thread.sleep(6000); //增加延時,檢視效果,通過jps:在jar情況下會新起java程序 System.out.println("This tasklet ran partition: " + partitionNumber); return RepeatStatus.FINISHED; } }; }
通過表示式@Value("#{stepExecutionContext['partitionNumber']}")
獲取Manager
傳遞過來的變數;注意要加註解@StepScope
。
3 程式執行
因為我們分為Manager
和Worker
,但都是同一份程式碼,所以我們先打包一個jar
出來,不然manager
無法啟動。配置資料庫和Worker
的jar
包地址如下:
spring.datasource.url=jdbc:h2:tcp://localhost:9092/test spring.datasource.username=pkslow spring.datasource.password=pkslow spring.datasource.driver-class-name=org.h2.Driver pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar
執行程式如下:
可以看到啟動了4次Java
程式,還給出日誌路徑。
通過jps
命令檢視,能看到一個Manager
程序,還有兩個worker
程序:
4 複雜變數傳遞
前面講了Manager
可以通過ExecutionContext
傳遞變數,如簡單的String
、long
等。但其實它也是可以傳遞複雜的Java
物件的,但對應的類需要可序列化,如:
import java.io.Serializable; public class Person implements Serializable { private Integer age; private String name; private String webSite; //getter and setter }
Manager
傳遞:
executionContext.put("person",new Person(0,"pkslow","www.pkslow.com"));
Worker
接收:
@Value("#{stepExecutionContext['person']}") Person person
5 總結
本文介紹了Spring Batch
遠端分割槽的本地Jar包模式
,只能在一臺機器上執行,所以也是無法真正發揮出遠端分割槽的作用。但它對我們後續理解更復雜的模式是有很大幫助的;同時,我們也可以使用本地模式進行開發測試,畢竟它只需要一個數據庫就行了,依賴很少。