一份基於quartz的任務多執行緒處理模板
阿新 • • 發佈:2019-01-02
本任務處理模板使用maven管理具體jar包依賴,使用quartz2.2.2搭建的一個定時任務處理模板,模板提供了一個CommonJob類用於quartz呼叫,此類的作用是處理任務模板類,規定了處理任務的步驟為:①獲取待處理任務列表;②遍歷待處理任務列表,逐一進行處理。然後只需要注入一個具體的任務類,此任務類可注入相應的業務處理service,service需實現CommonJobService中各個方法,service中相應方法可宣告事物,以便模板類呼叫完成具體任務處理流程。
具體XML配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:component-scan base-package="com.defonds.scheduler" /> <!-- 具體的任務action需要實現commonJobService內各方法 --> <bean id="fileReadTask" class="com.action.FileReadTaskAction"> <property name="fileReadService" ref="fileReadService"></property> </bean> <bean id="fileReadService" class="com.service.impl.FileReadServiceImpl"> </bean> <bean id="commonJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject"> <!-- 為commonJob模板注入具體的任務action --> <bean class="com.job.CommonJob"> <property name="autoTaskAction" ref="fileReadTask"></property> <property name="threadPoolSize" value="5"></property> </bean> </property> <property name="targetMethod" value="execute"></property> </bean> <!-- Run the job every 5 seconds --> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="commonJob" /> <property name="cronExpression" value="0/5 * * * * ?" /> </bean> <!-- Scheduler factory bean to glue together jobDetails and triggers to Configure Quartz Scheduler --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="jobDetails"> <list> <ref bean="commonJob" /> </list> </property> <property name="triggers"> <list> <ref bean="cronTrigger" /> </list> </property> </bean> </beans>
commonJob類程式碼如下:
package com.job; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import com.common.AutoTask; import com.common.Transaction; public class CommonJob{ private static Logger logger = LoggerFactory.getLogger(CommonJob.class); private String threadPoolSize; private AutoTask autoTaskAction;// 注入的任務 private Map dupMap = new ConcurrentHashMap(); // 定時任務執行入口方法 public void execute() throws Exception{ List<Object> list = autoTaskAction.fetchData();// 獲取待處理狀態任務資料 ExecutorService threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize)); // 遍歷獲取的待處理任務列表 for(Object data:list){ int uniqueId = autoTaskAction.getUniqueId(data); // 對任務做防重處理 if(checkDup(uniqueId)){ logger.debug("This transaction is under processing now : "+uniqueId); continue;// 若任務已在 處理,則暫不處理該任務,開始處理下一個任務 } //檢查任務是否為待處理狀態(任務為狀態驅動,再次確認任務是否為待處理狀態,若是則繼續處理,否則說明已正在被處理或已處理完) if(!checkStatus(uniqueId)){ logger.debug("This transaction is not in wait-process status : "+uniqueId); continue; } // 將任務包裝為Runnable RunnableTask task = new RunnableTask(data); // 提交給執行緒池執行 threadPool.execute(task); } threadPool.shutdown(); } protected boolean checkDup(int uniqueId){ // job類內部維護一個防重表,若無此任務ID,則放置入Map,若已存在,則說明任務已在處理 if(dupMap.get(uniqueId) == null){ dupMap.put(uniqueId, ""); return false; }else{ return true; } } protected boolean checkStatus(int uniqueId){ //根據ID獲取任務詳情 Transaction t = autoTaskAction.loadTransactionById(uniqueId); //若任務狀態仍未待處理則檢查成功,繼續處理任務,否則false,不處理該任務 if("wait_process".equals(t.getStatus())){ return true; }else{ return false; } } public class RunnableTask implements Runnable{ private Object data; // 構造方法需將任務實體傳入 public RunnableTask(Object data){ this.data = data; } public void run() { int uniqueId = autoTaskAction.getUniqueId(data); try{ // 呼叫注入的任務類處理任務方法處理任務 autoTaskAction.deal(data); }catch(Exception e){ e.printStackTrace(); }finally{ // 處理完任務或丟擲異常時均在防重map中將任務ID刪除 dupMap.remove(uniqueId); } } } public void setThreadPoolSize(String threadPoolSize) { threadPoolSize = threadPoolSize; } public void setAutoTaskAction(AutoTask autoTaskAction) { this.autoTaskAction = autoTaskAction; } }
該類提供了任務處理模板,其中execute方法為quartz定時任務入口方法(xml中設定),負責獲取待處理任務列表,初始化執行緒池,遍歷待處理任務列表,依次處理。其中我們將任務處理方法封裝進內部Runnable類,以便利用執行緒池更快處理任務列表。任務處理時注意防重及任務狀態確認,防止重複處理
以下為具體任務處理action及介面方法類:
package com.action; import java.util.List; import com.common.AutoTask; import com.common.Transaction; import com.service.CommonJobService; public class FileReadTaskAction implements AutoTask{ private CommonJobService fileReadService; public List<Object> fetchData() { List<Object> list = fileReadService.fetchData(); return list; } public int getUniqueId(Object data) { Transaction t = (Transaction)data; return t.getTransId(); } public void deal(Object data) { fileReadService.deal(data); } public Transaction loadTransactionById(int transId) { return (Transaction)fileReadService.loadTransactionById(transId); } public void setFileReadService(CommonJobService fileReadService) { this.fileReadService = fileReadService; } }
package com.common;
import java.util.List;
public interface AutoTask {
public List<Object> fetchData();
public int getUniqueId(Object data);
public void deal(Object data);
public Transaction loadTransactionById(int transId);
}
package com.service;
import java.util.List;
import com.common.Transaction;
public interface CommonJobService {
public List<Object> fetchData();
public void deal(Object data);
public Transaction loadTransactionById(int transId);
}
最後為quartz的IOC容器啟動類,通過run as java application即可實現定時處理任務
package com.auto;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App
{
public static void main( String[] args )
{
ApplicationContext applicationContext = null;
try {
//獲取配置檔案,啟動IOC容器
applicationContext = new ClassPathXmlApplicationContext("classpath*:quartz.xml");
System.out.println("AutoHandler is started successfully!!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
後修正了CommonJob.java的執行緒池生成方式,實現了InitializingBean介面,通過註解@value注入執行緒池PoolSize引數,讓他在bean剛開始例項化後即產生執行緒池,完成任務列表後無需關閉執行緒池,等待下次任務週期再次複用即可。這樣整個bean的生命週期只用建立一次固定大小執行緒池,省去了多次建立、銷燬執行緒池的開銷。程式碼如下:
package com.job;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import com.common.AutoTask;
import com.common.Transaction;
public class CommonJob implements InitializingBean{
private static Logger logger = LoggerFactory.getLogger(CommonJob.class);
@Value("${commonJob.threadPoolSize}")
private String threadPoolSize;
private AutoTask autoTaskAction;// 注入的任務
private Map dupMap = new ConcurrentHashMap();
private ExecutorService threadPool;
// 定時任務執行入口方法
public void execute() throws Exception{
List<Object> list = autoTaskAction.fetchData();// 獲取待處理狀態任務資料
//ExecutorService threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize));
// 遍歷獲取的待處理任務列表
for(Object data:list){
int uniqueId = autoTaskAction.getUniqueId(data);
// 對任務做防重處理
if(checkDup(uniqueId)){
logger.debug("This transaction is under processing now : "+uniqueId);
continue;// 若任務已在 處理,則暫不處理該任務,開始處理下一個任務
}
//檢查任務是否為待處理狀態(任務為狀態驅動,再次確認任務是否為待處理狀態,若是則繼續處理,否則說明已正在被處理或已處理完)
if(!checkStatus(uniqueId)){
logger.debug("This transaction is not in wait-process status : "+uniqueId);
continue;
}
// 將任務包裝為Runnable
RunnableTask task = new RunnableTask(data);
// 提交給執行緒池執行
threadPool.execute(task);
}
}
protected boolean checkDup(int uniqueId){
// job類內部維護一個防重表,若無此任務ID,則放置入Map,若已存在,則說明任務已在處理
if(dupMap.get(uniqueId) == null){
dupMap.put(uniqueId, "");
return false;
}else{
return true;
}
}
protected boolean checkStatus(int uniqueId){
//根據ID獲取任務詳情
Transaction t = autoTaskAction.loadTransactionById(uniqueId);
//若任務狀態仍未待處理則檢查成功,繼續處理任務,否則false,不處理該任務
if("wait_process".equals(t.getStatus())){
return true;
}else{
return false;
}
}
public class RunnableTask implements Runnable{
private Object data;
// 構造方法需將任務實體傳入
public RunnableTask(Object data){
this.data = data;
}
public void run() {
int uniqueId = autoTaskAction.getUniqueId(data);
try{
// 呼叫注入的任務類處理任務方法處理任務
autoTaskAction.deal(data);
}catch(Exception e){
e.printStackTrace();
}finally{
// 處理完任務或丟擲異常時均在防重map中將任務ID刪除
dupMap.remove(uniqueId);
}
}
}
/*public void setThreadPoolSize(String threadPoolSize) {
threadPoolSize = threadPoolSize;
}*/
public void setAutoTaskAction(AutoTask autoTaskAction) {
this.autoTaskAction = autoTaskAction;
}
@Override
public void afterPropertiesSet() throws Exception {
threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize));
}
}