1. 程式人生 > 實用技巧 >自定義執行緒池ThreadPoolExecutor

自定義執行緒池ThreadPoolExecutor

使用自定義的方式建立執行緒池

Java本身提供的獲取執行緒池的方式

使用Executors直接獲取執行緒池,注意,前四個方式的底層都是通過new ThreadPoolExecutor()的方式建立的執行緒池,只是引數不一樣而已,我們也正是利用了這點特性來實現自己的執行緒池

1. newCachedThreadPool

建立一個可快取無限制數量的執行緒池,
如果執行緒池中沒有空閒的執行緒的話,再來任務會新建執行緒,
執行緒60s內沒被使用,則銷燬。
簡單的說,忙不過來的時候就新建執行緒

Executors.newCachedThreadPool()

底層實現
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

常駐核心執行緒數為0
執行緒池最大執行緒數Integer.MAX_VALUE
執行緒空閒60S回收
使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險

2. newFixedThreadPool

建立一直指定大小的執行緒池,如果執行緒池滿了,後面的任務會在佇列中等待,等拿到空閒的執行緒才能執行

Executors.newFixedThreadPool(nThreads)

底層實現
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

常駐核心執行緒數自定義
執行緒池最大執行緒數等於常駐核心執行緒數
因為最大執行緒數等於常駐核心執行緒,而常駐核心執行緒不會被回收,所以時間引數為0
使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。
**風險:**由於佇列沒有指明長度,預設為Integer.MAX_VALUE,所以有OOM的風險

3. newSingleThreadExecutor

建立一個大小為1的執行緒池,用唯一的執行緒來執行任務,保證任務有序進行

Executors.newSingleThreadExecutor()

底層實現
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

常駐核心執行緒數為1
執行緒池最大執行緒數等於常駐核心執行緒數1
執行緒池裡一共就一個常駐核心執行緒,所以不會被回收,所以時間引數為0
使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。
**風險:**由於佇列沒有指明長度,預設為Integer.MAX_VALUE,所以有OOM的風險

4. newScheduledThreadPool

建立指定大小的執行緒池,支援定時及週期性的執行任務

Executors.newScheduledThreadPool(corePoolSize)

底層實現
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,一下為ScheduledThreadPoolExecutor的構造方法

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

常駐核心執行緒數自定義
執行緒池最大執行緒數等於Integer.MAX_VALUE
不回收執行緒
使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險

5. newWorkStealingPool

JDK1.8 引入
建立持有足夠執行緒的執行緒池來支援給定的並行級別,
並通過使用多個佇列減少競爭,並行級別的引數,如果不傳,預設為cpu的數量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool

Executors.newWorkStealingPool()

底層實現為**ForkJoinPool**,和上面的四個不同
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

正在研究ing

在阿里巴巴Java開發手冊中明確指出,不允許使用jdk自帶的方式獲取執行緒池。就是上面的前四個方法,所以,我們自己建立即可

建立自定義的執行緒工廠

public class ThreadFactoryImpl implements ThreadFactory {
    /**
     * 執行緒池號
     */
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    /**
     * 執行緒字首名稱
     */
    private final String namePrefix;
    /**
     * 建立初始值為1且執行緒安全的執行緒號
     */
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public ThreadFactoryImpl(String whatFeatureOfGroup) {
        namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
    }

    @Override
    public Thread newThread(Runnable r) {
        int threadNextId = threadNumber.getAndIncrement();
        String name = namePrefix + threadNextId;
        Thread thread = new Thread(null, r, name, 0);
        System.out.println("建立的第"+threadNextId+"個執行緒");
        return thread;
    }
}

AtomicInteger 實現了原子性,保證了高併發下的執行緒安全,該系類還有很多。
我們可以在自定義的執行緒工廠裡面新增我們需要的內容
不指定的話,會是預設的執行緒工廠
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

建立自定義執行緒池拒絕策略

public class ThreadPoolRejectHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("task rejected. "+executor.toString());
    }
}

在ThreadPoolExecutor中提供了四個公開的內部靜態類
1. AbortPolicy(預設):丟棄任務並丟擲RejectedExecutionException異常
2. DiscardPloicy:丟棄任務,不丟擲異常(不推薦使用)
3. DiscardOldestPolicy:丟棄佇列中等待最久的任務,把當前任務加入到佇列中
4. CallerRunsPolicy:繞過執行緒池,直接呼叫任務的run()方法。
**根據需求選中合適的策略才是正確的**

實現我們的執行緒池

public class ThreadPoolUtil {

    /**
     * @param corePoolSize    常駐核心執行緒,執行緒池初始化的時候池裡是沒有執行緒的,前面 corePoolSize 個任務是會建立執行緒,
     *                        當前執行緒池中的數量大於常駐核心執行緒數的時候,如果有空閒的執行緒則使用,沒有的話就把任務放到
     *                        工作佇列中
     * @param maximumPoolSize 執行緒池允許建立的最大執行緒數,如果佇列滿了,且執行緒數小於最大執行緒數,則新建臨時執行緒(空閒超過時間會被銷燬的),
     *                        如果佇列為無界佇列,則該引數無用
     * @param workQueueSize   工作佇列,請求執行緒數大於常駐核心執行緒數的時候,將多餘的任務放到工作佇列
     * @param threadName      執行緒名稱
     * @param handler         執行緒池拒絕策略,當執行緒池和佇列都滿了,則呼叫該策略,執行具體的邏輯
     * @author: taoym
     * @date: 2020/9/9 11:35
     * @desc: 自定義執行緒池的實現 總體邏輯就是 前corePoolSize個任務時,來一個任務就建立一個執行緒
     * 如果當前執行緒池的執行緒數大於了corePoolSize那麼接下來再來的任務就會放入到我們上面設定的workQueue佇列中
     * 如果此時workQueue也滿了,那麼再來任務時,就會新建臨時執行緒,那麼此時如果我們設定了keepAliveTime或者設定了allowCoreThreadTimeOut,那麼系統就會進行執行緒的活性檢查,一旦超時便銷燬執行緒
     * 如果此時執行緒池中的當前執行緒大於了maximumPoolSize最大執行緒數,那麼就會執行我們剛才設定的handler拒絕策略
     */
    public static ExecutorService createThreadPool(int corePoolSize,
                                                   int maximumPoolSize,
                                                   int workQueueSize,
                                                   String threadName,
                                                   RejectedExecutionHandler handler) {
        BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
        ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
        // 提前建立好核心執行緒
        //threadPoolExecutor.prestartAllCoreThreads();
        // 常駐核心執行緒的空閒時間超過 keepAliveTime 的時候要被回收
        //threadPoolExecutor.allowCoreThreadTimeOut(true);

        return threadPoolExecutor;
    }
}

註釋寫的很明白了,desc下的註釋來自springForAll的文章。別的都是自己找的加上自己所理解的編寫而成。

文章是剛研究了《碼出高效》的執行緒池篇加上對原始碼文件的理解,趁熱打鐵寫出來的,寫的不好,多多見諒。