1. 程式人生 > 其它 >功能:Java執行緒池

功能:Java執行緒池

Java執行緒池

一、介紹

執行緒池,顧名思義,這是管理一堆執行緒而出現的物件。與資料庫的連線池一致,它的出現解決了執行緒的頻繁建立和銷燬,從而浪費大量資源的問題。

所以,執行緒池中有提前建立好的執行緒,使用時直接分配獲取,使用完再由執行緒池管理是否銷燬。

優點

  • 降低資源消耗,也就是不需要重複多次的建立執行緒
  • 更好的管理執行緒
    • 比如可以獲取當前執行的執行緒是什麼
    • 還在等待執行的任務有什麼

二、使用執行緒池

在JDK5起提供了執行緒池的物件,ExecutorServiceExecutors

其中,ExecutorService和它的子類ThreadPoolExecutor是執行緒池的關鍵

Executors是對應的工具類,裡面有些工廠方法可以快速建立執行緒池

檢視ThreadPoolExecutor的構造方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
}
引數 說明
corePoolSize 核心執行緒數。就算目前空閒,也不會回收這幾個執行緒
maximumPoolSize 最大執行緒數。當前執行緒池可以容納的最大執行緒數量
keepAliveTime 執行緒保持存活時間。如果執行緒空間時間到達,將會進行銷燬(除了核心執行緒)
unit keepAliveTime一起使用,僅是個時間單位
workQueue 工作等待佇列。當執行緒池所有的執行緒都繁忙執行時,新新增的執行任務會暫時保留至此佇列
threadFactory 建立執行緒的執行緒工廠
handler 拒絕策略。當佇列滿了後,還有執行任務進入時的策略

workQueue引數需要傳入一個BlockingQueue,這是個阻塞佇列。BlockingQueue內部使用兩條佇列,允許兩個執行緒同時向佇列一個儲存,一個取出操作。在保證併發安全的同時,提高了佇列的存取效率,不能傳入空物件,可設定容量大小,也可以不設定容量大小,那麼它的容量就是Integer.MAX_VALUE。常用的幾種實現類

說明
ArrayBlockingQueue 規定容量大小的阻塞佇列
LinkedBlockingQueue 既可以規定容量大小,也可以不規定的阻塞佇列
SynchronizedQueue 一個特殊的佇列,生產消費必須交替完成的佇列
生產一個元素後,必須要有進行消費後,才能繼續往佇列內生產元素

handler拒絕策略

當執行緒池指定的佇列容量滿了時,將執行哪種拒絕任務的策略

策略類 說明
AbortPolicy 預設,不執行新任務,直接丟擲異常,提示執行緒池已滿
DiscardPolicy 不執行新任務,也不丟擲異常
DiscardOldestPolicy 它丟棄最老的未處理請求,然後重試執行,除非執行程式被關閉,在這種情況下任務被丟棄。
CallerRunsPolicy 直接在外層呼叫者的執行緒中呼叫新任務

1)小試牛刀

package com.banmoon.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo1 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        // lambda表示式
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
        // 關閉執行緒池,如果不關閉,執行緒池將一直存在,池子內保留著核心執行緒,等待著呼叫
        executorService.shutdown();
    }

}

class MyRunnable implements Runnable{

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

2)Executors工具類

關於此的三個相關方法原始碼,其中還有一些他們的過載,這邊就不細細講了。

這些工具類方法,主要是快速建立ThreadPoolExecutor物件的方法,只是它們的引數各有所不同

public class Executors {
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}
方法 引數說明 效果
newCachedThreadPool 核心執行緒數為0
最大執行緒數已調到Integer.MAX_VALUE
每提交一個執行緒任務,都將新建立一個新的執行緒來執行
如果需要執行的任務很多,這有可能會導致CPU100%的問題
newFixedThreadPool 核心執行緒數和最大執行緒數一致
但佇列長度為Integer.MAX_VALUE
提交的任務將正常交給池子中的執行緒執行,執行完成也不會銷燬,等待執行新的任務
如果執行的任務很多,佇列會一直新增任務等待執行,可能會造成記憶體溢位的問題
newSingleThreadExecutor 核心執行緒數和最大執行緒數都為1
但佇列長度為Integer.MAX_VALUE
newFixedThreadPool類似,但池子中只有一個執行緒

根據需要來進行使用合適的執行緒池,測試下他們的執行方式和快慢

package com.banmoon.pool;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo2 {

    public static void main(String[] args) {
        ExecutorService executorService1 = Executors.newCachedThreadPool();
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 100; i++) {
            executorService1.execute(new MyDemo2(i));
//            executorService2.execute(new MyDemo2(i));
//            executorService3.execute(new MyDemo2(i));
        }

        executorService1.shutdown();
        executorService2.shutdown();
        executorService3.shutdown();

    }

}

class MyDemo2 implements Runnable {

    private Integer i;

    public MyDemo2(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        System.out.println(StrUtil.format("{}:{},時間:{}", Thread.currentThread().getName(), i, DateUtil.now()));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

newCachedThreadPool執行結果可以看到,一共有100個執行緒被創建出來

newFixedThreadPool執行結果,執行的永遠都是那幾個固定的執行緒,這裡我們指定了10個執行緒,所以列印也是10個為一批來進行的。

newSingleThreadExecutor執行結果,從頭到尾就只有一個執行緒在執行

3)執行緒工廠

雖然有預設的執行緒工廠,但如果有需要進行處理的話,還是得記錄一下

package com.banmoon.pool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo3 {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
        for (int i = 0; i < 100; i++) {
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }

}


class MyThreadFactory implements ThreadFactory{
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private String poolName;

    MyThreadFactory(String poolName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        String threadName = poolName + "-" + threadNumber.getAndIncrement();
        Thread t = new Thread(group, r, threadName, 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }

}

執行結果

4)拒絕策略

拒絕策略沒什麼好講的,平常在使用時,注意下容量的大小,以及使用的策略。自己需要執行的任務數量多少,會不會照成記憶體溢位等,從這幾個方面入手,選擇最適合業務的佇列容量和拒絕策略。

策略類 說明
AbortPolicy 預設,不執行新任務,直接丟擲異常,提示執行緒池已滿
DiscardPolicy 不執行新任務,也不丟擲異常
DiscardOldestPolicy 它丟棄最老的未處理請求,然後重試執行,除非執行程式被關閉,在這種情況下任務被丟棄。
CallerRunsPolicy 直接在外層呼叫者的執行緒中呼叫新任務

演示CallerRunsPolicy,會在呼叫者的執行緒中,執行超出容量的任務

package com.banmoon.pool;

import java.util.concurrent.*;

public class Demo4 {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 1; i <= 100; i++) {
            executorService.execute(new MyDemo4(i));
        }
        executorService.shutdown();
    }

}

class MyDemo4 implements Runnable{

    private Integer i;

    public MyDemo4(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行結果,上述執行緒池指定了最大執行緒數為20,佇列容量為20。所以當執行第41個任務時,佇列滿了,將由呼叫者的執行緒來執行這個任務,此處是主執行緒

三、其他

1)執行任務的優先順序

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
        // 判斷是否為空
        if (command == null)
            throw new NullPointerException();
        
        // 判斷當前正在執行的執行緒數是否小於核心執行緒數
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 新增任務至執行緒執行,成功新增則結束
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果核心執行緒都有在執行,將任務放至佇列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果成功推入佇列,將再次檢查執行緒狀態,有執行緒死亡則將當前任務新增至執行緒執行
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果佇列推入任務失敗了,那將直接新增至執行緒執行
        else if (!addWorker(command, false))
            // 如果任務新增至執行緒失敗,則將進行拒絕策略
            reject(command);
    }
    
    /**
     * 會從執行緒工廠獲取執行緒,並新增執行任務
     * @param firstTask 執行的任務
     * @param core 是否可以新增至核心執行緒
     * @return true:成功新增至執行緒執行
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // ...
    }
}

四、最後

執行緒池這東西乾貨還是挺多的,還有挺多沒有整理完。比如說addWorker方法,執行緒池的執行排程等

後續有什麼新的理解繼續補上,未完待續

歡迎來登入我的個人部落格

入我相思門,知我相思苦, 長相思兮長相憶,短相思兮無窮極, 早知如此絆人心,何如當初莫相識。