整合jdk的多執行緒框架用spring管理
一,spring配置thread檔案 可以將這塊配置單獨寫成applicationContext-thread.xml引入 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd" default-lazy-init="false">
<description>ThreadPool</description>
<bean id="collectExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="${collectExecutor.corePoolSize}" /> <property name="maxPoolSize" value="${collectExecutor.maxPoolSize}" /> <property name="queueCapacity" value="${collectExecutor.queueCapacity}" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <!-- 處理任務的優先順序為:核心執行緒corePoolSize、任務佇列workQueue、最大執行緒 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 四種預定義的處理程式策略: 在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時RejectedExecutionException。 在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。 在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。 -->
</beans> 注:這裡面的幾個引數,可以寫一個properties檔案引入,或直接在這寫死。我們這裡的初始引數值如下: collectExecutor.corePoolSize=100 collectExecutor.maxPoolSize=100 collectExecutor.queueCapacity=100
二,jdk執行緒池工具類核心程式碼
/** * 執行緒池管理 * */ public class ThreadPool {
private static ExecutorService pool = null; private static final int radix = 20; private final static Logger log = LoggerFactory .getLogger(ThreadPool.class); static { new ThreadPool(radix); }
/** * 構造是根據cpu數量來構造,預設一個cpu構造20個執行緒 */ private ThreadPool(int num) { if (pool == null) pool = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() * num, new HandlerThreadFactory());
/* * new ThreadPoolExecutor(4, 20, 20, TimeUnit.SECONDS, new * LinkedBlockingQueue<Runnable>()); */ }
/** * 無返回的執行緒執行 * * @param runable * ,要求實現Runnable的介面 */ public static void execute(Runnable command) { pool.execute(command); }
/** * 有返回值的 * * @param task * 不過要求完成實現Callable介面 * @return */ public static <T extends Object> T execute(Callable<T> task) { Future<T> future = pool.submit(task); try { return future.get(); } catch (Exception e) { return null; } }
/** * 有返回值的 * * @param task * 不過要求完成實現Callable介面 * @param times * 多少時間(秒) * @return */ public static <T extends Object> T execute(Callable<T> task, long times) { Future<T> future = pool.submit(task); try { return future.get(times, TimeUnit.SECONDS); } catch (Exception e) { return null; } }
/** * 批量執行緒執行,統一返回結果,tasks物件是Callable實現,雖然可以批量執行執行緒, 但這結果集合list需要控制好; * * @param tasks * @return */ public static <T extends Object> List<T> executeAll( List<? extends Callable<T>> tasks) { List<Future<T>> futures = null; List<T> result = new ArrayList<T>(); try { futures = pool.invokeAll(tasks); for (Future<T> future : futures) { try { result.add(future.get()); } catch (ExecutionException e) { log.info("執行任務列表出錯:" + e.getMessage()); log.info(e.getMessage()); continue; } catch (java.util.concurrent.CancellationException e) { log.info("計算被取消 :" + e.getMessage()); continue; } } } catch (InterruptedException e) { log.info("執行任務列表出錯:" + e.getMessage()); return result; } return result; }
/** * 批量執行緒執行,統一返回結果,tasks物件是Callable實現,雖然可以批量執行執行緒, 但這結果集合list需要控制好; * * @param tasks * 任務集合 * @param times * 多少時間(秒) * @return */ public static <T extends Object> List<T> executeAll( List<? extends Callable<T>> tasks, long times) { List<Future<T>> futures = null; List<T> result = new ArrayList<T>(); try { futures = pool.invokeAll(tasks, times == 0 ? 30 : times, TimeUnit.SECONDS); for (Future<T> future : futures) { try { result.add(future.get()); } catch (ExecutionException e) { log.info("執行任務列表出錯:" + e.getMessage()); continue; } catch (java.util.concurrent.CancellationException e) { log.info("計算被取消 :" + e.getMessage()); continue; } } } catch (InterruptedException e) { log.info("執行任務列表出錯:" + e.getMessage()); return result; } return result; }
/** * 關閉執行緒池 */ public static void close() { if (!pool.isShutdown()) { pool.shutdown(); } }
public static void main(String[] args) {
for (int i = 0; i < 3; i++) { pool.execute(new Runnable() { public void run() { throw new RuntimeException(); } }); }
}
}
執行緒工廠 /** * 執行緒工廠 * */ public class HandlerThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix;
public HandlerThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread() .getThreadGroup(); namePrefix = "pcenterutilpool-" + poolNumber.getAndIncrement() + "-thread-"; }
@Override public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); t.setUncaughtExceptionHandler(new UncaughtExceptionHandler()); return t; }
}
執行緒異常處理類 /** * 執行緒異常處理類 * */ public class UncaughtExceptionHandler implements java.lang.Thread.UncaughtExceptionHandler { private final static Logger log = LoggerFactory .getLogger(UncaughtExceptionHandler.class); // 線上快要被滅的時候會呼叫 public void uncaughtException(Thread t, Throwable e) { log.error(e.getMessage(), e); }
} 到此執行緒池就整合完成了,下面看使用例子: ThreadPool.execute(new Runnable() { @Override public void run() { //TODO your code } }); 任何地方要使用,基本就只要使用工具類ThreadPool裡的方法就行了。裡面的方法有返回的/無返回的都有,夠用了。