1. 程式人生 > >整合jdk的多執行緒框架用spring管理

整合jdk的多執行緒框架用spring管理

一,spring配置thread檔案     可以將這塊配置單獨寫成applicationContext-thread.xml引入 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xmlns="http://www.springframework.org/schema/beans"     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"     default-lazy-init="false">

    <description>ThreadPool</description>

    <bean id="collectExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">         <property name="corePoolSize" value="${collectExecutor.corePoolSize}" />         <property name="maxPoolSize" value="${collectExecutor.maxPoolSize}" />         <property name="queueCapacity" value="${collectExecutor.queueCapacity}" />         <property name="rejectedExecutionHandler">             <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />           </property>     </bean>     <!--      處理任務的優先順序為:核心執行緒corePoolSize、任務佇列workQueue、最大執行緒 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。     四種預定義的處理程式策略:     在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時RejectedExecutionException。     在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。     在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。     在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。     -->

</beans> 注:這裡面的幾個引數,可以寫一個properties檔案引入,或直接在這寫死。我們這裡的初始引數值如下: collectExecutor.corePoolSize=100 collectExecutor.maxPoolSize=100 collectExecutor.queueCapacity=100

二,jdk執行緒池工具類核心程式碼

/**  * 執行緒池管理  *   */ public class ThreadPool {

    private static ExecutorService pool = null;     private static final int radix = 20;     private final static Logger log = LoggerFactory             .getLogger(ThreadPool.class);     static {         new ThreadPool(radix);     }

    /**      * 構造是根據cpu數量來構造,預設一個cpu構造20個執行緒      */     private ThreadPool(int num) {         if (pool == null)             pool = Executors.newFixedThreadPool(Runtime.getRuntime()                     .availableProcessors() * num, new HandlerThreadFactory());

        /*          * new ThreadPoolExecutor(4, 20, 20, TimeUnit.SECONDS, new          * LinkedBlockingQueue<Runnable>());          */     }

    /**      * 無返回的執行緒執行      *       * @param runable      *            ,要求實現Runnable的介面      */     public static void execute(Runnable command) {         pool.execute(command);     }

    /**      * 有返回值的      *       * @param task      *            不過要求完成實現Callable介面      * @return      */     public static <T extends Object> T execute(Callable<T> task) {         Future<T> future = pool.submit(task);         try {             return future.get();         } catch (Exception e) {             return null;         }     }

    /**      * 有返回值的      *       * @param task      *            不過要求完成實現Callable介面      * @param times      *            多少時間(秒)      * @return      */     public static <T extends Object> T execute(Callable<T> task, long times) {         Future<T> future = pool.submit(task);         try {             return future.get(times, TimeUnit.SECONDS);         } catch (Exception e) {             return null;         }     }

    /**      * 批量執行緒執行,統一返回結果,tasks物件是Callable實現,雖然可以批量執行執行緒, 但這結果集合list需要控制好;      *       * @param tasks      * @return      */     public static <T extends Object> List<T> executeAll(             List<? extends Callable<T>> tasks) {         List<Future<T>> futures = null;         List<T> result = new ArrayList<T>();         try {             futures = pool.invokeAll(tasks);             for (Future<T> future : futures) {                 try {                     result.add(future.get());                 } catch (ExecutionException e) {                     log.info("執行任務列表出錯:" + e.getMessage());                     log.info(e.getMessage());                     continue;                 } catch (java.util.concurrent.CancellationException e) {                     log.info("計算被取消 :" + e.getMessage());                     continue;                 }             }         } catch (InterruptedException e) {             log.info("執行任務列表出錯:" + e.getMessage());             return result;         }         return result;     }

    /**      * 批量執行緒執行,統一返回結果,tasks物件是Callable實現,雖然可以批量執行執行緒, 但這結果集合list需要控制好;      *       * @param tasks      *            任務集合      * @param times      *            多少時間(秒)      * @return      */     public static <T extends Object> List<T> executeAll(             List<? extends Callable<T>> tasks, long times) {         List<Future<T>> futures = null;         List<T> result = new ArrayList<T>();         try {             futures = pool.invokeAll(tasks, times == 0 ? 30 : times,                     TimeUnit.SECONDS);             for (Future<T> future : futures) {                 try {                     result.add(future.get());                 } catch (ExecutionException e) {                     log.info("執行任務列表出錯:" + e.getMessage());                     continue;                 } catch (java.util.concurrent.CancellationException e) {                     log.info("計算被取消 :" + e.getMessage());                     continue;                 }             }         } catch (InterruptedException e) {             log.info("執行任務列表出錯:" + e.getMessage());             return result;         }         return result;     }

    /**      * 關閉執行緒池      */     public static void close() {         if (!pool.isShutdown()) {             pool.shutdown();         }     }

    public static void main(String[] args) {

        for (int i = 0; i < 3; i++) {             pool.execute(new Runnable() {                 public void run() {                     throw new RuntimeException();                 }             });         }

       }

}

執行緒工廠 /**  * 執行緒工廠  *   */ public class HandlerThreadFactory implements ThreadFactory {     static final AtomicInteger poolNumber = new AtomicInteger(1);     final ThreadGroup group;     final AtomicInteger threadNumber = new AtomicInteger(1);     final String namePrefix;

    public HandlerThreadFactory() {         SecurityManager s = System.getSecurityManager();         group = (s != null) ? s.getThreadGroup() : Thread.currentThread()                 .getThreadGroup();         namePrefix = "pcenterutilpool-" + poolNumber.getAndIncrement()                 + "-thread-";     }

    @Override     public Thread newThread(Runnable r) {

        Thread t = new Thread(group, r, namePrefix                 + threadNumber.getAndIncrement(), 0);         if (t.isDaemon())             t.setDaemon(false);         if (t.getPriority() != Thread.NORM_PRIORITY)             t.setPriority(Thread.NORM_PRIORITY);         t.setUncaughtExceptionHandler(new UncaughtExceptionHandler());         return t;     }

}

執行緒異常處理類 /**  * 執行緒異常處理類  *   */ public class UncaughtExceptionHandler implements         java.lang.Thread.UncaughtExceptionHandler {     private final static Logger log = LoggerFactory             .getLogger(UncaughtExceptionHandler.class);     // 線上快要被滅的時候會呼叫     public void uncaughtException(Thread t, Throwable e) {         log.error(e.getMessage(), e);     }

} 到此執行緒池就整合完成了,下面看使用例子: ThreadPool.execute(new Runnable() {          @Override           public void run() {                   //TODO your code           }  }); 任何地方要使用,基本就只要使用工具類ThreadPool裡的方法就行了。裡面的方法有返回的/無返回的都有,夠用了。