專案中可配置執行緒池的實現
阿新 • • 發佈:2019-02-06
執行緒池在專案中的使用
一個專案中可能多次使用到執行緒池,比如發郵件的時候需要使用執行緒池,執行訊息入庫的時候可能需要執行緒池,我們可以通過資料庫配置來實現執行緒池使用
1.資料庫表中中配置執行緒池的核心引數
主要包括以下引數:
執行緒池名:excutor_name
核心執行緒數:core_pool_size
最大執行緒數:max_pool_size
任務佇列大小:max_queue_size
佇列的型別:queue_type (LIMITED :有界任務佇列,UNLIMITED:無界任務佇列)
2.提供執行緒倉庫類和執行緒工廠類
執行緒倉庫類的作用:初始化一個Map,key為執行緒池名,name為執行緒池,提供put執行緒池,remove執行緒池,get執行緒池的方法
執行緒工廠類的作用:根據資料庫初始化所有執行緒池
倉庫類具體實現:
package cn.pool.executor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; public class ExecutorRepository { private Map<String, ExecutorService> executors; private ExecutorRepository() { executors = new HashMap<String, ExecutorService>(); } //private static final ExecutorRepository repository = new ExecutorRepository(); /** * 延遲初始化單例模式 */ private static class SingletonHandler{ final static ExecutorRepository repository = new ExecutorRepository(); } public static ExecutorRepository getInstance(){ return SingletonHandler.repository; } /** * put執行緒池的方法(同步方法) */ public synchronized void put(String executorName,ExecutorService exec){ executors.put(executorName, exec); } /** * remove執行緒池的方法(同步方法) */ public synchronized void remove(String executorName){ executors.remove(executorName); } /** * 查詢執行緒池的方法(普通方法) */ public ExecutorService get(String executorName){ return executors.get(executorName); } }
執行緒工廠的實現:
@Component public class ExecutorFactory { @Autowired private BaseDao baseDao; /** * 類載入的時候初始化所有的執行緒池 */ @PostConstruct public void initExecutors(){ //從資料庫中查詢所有的執行緒池 List<ExecutorDTO> executorsList = baseDao.queryList("query.all.executors",new ExecutorDTO()); if(executorsList == null ){ return; } //建立所有執行緒池並放入Map中待用 ExecutorRepository repository = ExecutorRepository.getInstance(); for (ExecutorDTO executor : executorsList) { repository.put(executor.getExecutorName(), createThreadPoolExecutor(executor)); } } /** * 根據queueType建立執行緒池 */ private ExecutorService createThreadPoolExecutor(ExecutorDTO executor) { ExecutorService exec = null; if("LIMITED".equals(executor.getQueueType())){ //有界的任務佇列 exec= new ThreadPoolExecutor(executor.getCorePoolSize(), executor.getMaxPoolSize(), 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(executor.getMaxQueueSize())); }else{ exec = new ThreadPoolExecutor(executor.getCorePoolSize(), executor.getMaxPoolSize(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } return exec; } /** * 獲取對應的執行緒池 */ public static ExecutorService getExecutorByName(String executorName){ return ExecutorRepository.getInstance().get(executorName); } }
3.提供2種類型的任務
一種為實現Runnable介面的任務,另外一種是實現Callable介面的任務
實現Runnable介面
/**
* 實現Runnable介面的任務介面
* 任務只要實現這個介面即可,即是實現了Runnable介面的任務
*/
public interface IThreadService {
void run(Object o);
}
package cn.pool.executor;
import java.io.Serializable;
/**
* 實現Runnnable介面的任務包裝DTO
*/
public class RunnableTask implements Runnable,Serializable{
private static final long serialVersionUID = -993258256608603676L;
private String executorName;
private IThreadService threadService;
private Object param;
@Override
public void run() {
try {
threadService.run(param);
} catch (Exception e) {
// TODO: handle exception
}
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public IThreadService getThreadService() {
return threadService;
}
public void setThreadService(IThreadService threadService) {
this.threadService = threadService;
}
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
@Override
public String toString() {
return "RunnableTask [executorName=" + executorName +"]";
}
}
實現Callable介面的任務
/**
* 實現Callable介面的任務介面
* 任務只要實現這個介面即可,即是實現了Callable介面的任務
*/
public interface ICallableService {
Object call(Object o);
}
package cn.pool.executor;
import java.util.concurrent.Callable;
public class CallableTask implements Callable<Object> {
private String executorName;
private ICallableService callableService;
private Object param;
@Override
public Object call() throws Exception {
try {
return callableService.call(param);
} catch (Exception e) {
// TODO: 記錄日誌等
}
return null;
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public ICallableService getCallableService() {
return callableService;
}
public void setCallableService(ICallableService callableService) {
this.callableService = callableService;
}
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
}
4.提供公共的執行緒池工具類供使用
package cn.pool.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 執行緒池工具類
*/
public class ExecutorUtils {
/**
* 執行runnable介面的任務
*/
public static void addThreadToPool(String executorName,IThreadService threadService,Object param){
RunnableTask task =new RunnableTask();
task.setExecutorName(executorName);
task.setThreadService(threadService);
task.setParam(param);
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
exec.execute(task);
}
/**
* 執行callable介面的任務
*/
public static Future addCallableTaskToPool(String executorName,ICallableService callableService ,Object param){
CallableTask task = new CallableTask();
task.setExecutorName(executorName);
task.setCallableService(callableService);
task.setParam(param);
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
return exec.submit(task);
}
/**
* 2中情況下進行飽和策略(1.執行緒池關閉的過程中提交任務,2.有界任務佇列已經滿了,執行緒數已經達到最大執行緒數),已經有了4種飽和策略實現
* {@link ThreadPoolExecutor.AbortPolicy}
* {@link ThreadPoolExecutor.CallerRunsPolicy}
* {@link ThreadPoolExecutor.DiscardOldestPolicy}
* {@link ThreadPoolExecutor.DiscardPolicy}
* 預設的是AbortPolicy,直接丟擲{@link RejectedExecutionException},以防止提交任務執行緒終止
* 捕獲異常處理,或者實現自己的飽和策略
*
*/
public static void setRejectedExecutionHandler(String executorName,RejectedExecutionHandler handler){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
if(exec instanceof ThreadPoolExecutor){
((ThreadPoolExecutor) exec).setRejectedExecutionHandler(handler);
}
}
/**
* 獲取任務佇列的大小
*/
public static int getTaskQueueSize(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
return ((ThreadPoolExecutor) exec).getQueue().size();
}
/**
* 關閉執行緒池的方法,此方法等待當前正在執行的任務和佇列中等待的任務全部執行完畢後關閉
*/
public static void shutDownExecutor(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
exec.shutdown();
}
public static int getAliveThread(String executorName){
ExecutorService exec = ExecutorFactory.getExecutorByName(executorName);//獲取對應的執行緒池執行任務
if(exec == null){
throw new RuntimeException("指定的執行緒池不存在");
}
return ((ThreadPoolExecutor) exec).getActiveCount();
}
}
5.執行緒池的使用:
a.資料庫新增需要的執行緒池(執行緒池名字和引數)
b.需要使用執行緒池的service實現IThreadService或者ICallableSercvice,重寫run或者call方法
c.通過utils工具類呼叫即可
public class ExecutorTest {
@Resource(name="cn.paic.mail.MailService"); //MailService實現了IThreadService
private IThreadService mailService;
public void sendMail(){
String param = "1";
ExecutorUtils.addThreadToPool("sendMailExecutorPool", mailService, param);
//執行其他邏輯
}
}