1. 程式人生 > 實用技巧 >關於執行緒池的幾個問題

關於執行緒池的幾個問題

這裡以java中的 ThreadPoolExecutor 執行緒池來說明:

1:執行緒池的幾個重要的引數,這幾個引數有什麼作用?

線上程中有 核心池大小,最大池大小,任務佇列 這幾個比較重要的引數。

以下是幾個重要引數在原始碼中的體現

//  核心池大小
private volatile int corePoolSize;
//  最大池大小
private volatile int maximumPoolSize;
//  任務佇列
private final BlockingQueue<Runnable> workQueue;

作用:

corePoolSize:當需要執行的任務數量小於corePoolSize則新建立一個執行緒,並且執行任務,如果執行任務數大於corePoolSize則將任務,則將需要執行的任務放入到workQueue佇列中,以下是程式碼:

//  小於核心池大小,則新增一個執行緒然後執行
if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
//  當任務數大於等於核心池大小,則放到佇列中
if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            
if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }

maximumPoolSize:當佇列中的任務已經新增滿了,再往池中新增任務,如果 池中執行緒數<maximumPoolSize則會新建執行緒,反之則廢棄該任務

// 新增到佇列中失敗,則會新建執行緒去執行任務
else if
(!addWorker(command, false)) reject(command);

workQueue:當池中的執行緒數大於等於核心池的大小,則將需要執行的任務放入到佇列workQueue中。

2:執行緒池的工作流程是什麼?

分為3中場景的流程:1:池中執行緒數<核心池數 2:池中執行緒數大於等於核心池數但小於最大池數 3:池中執行緒=最大池數

場景1:新建一個執行緒並且執行任務,進入到 addWorker 的方法中:

// 新增一個任務到workSet中,core  為true
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取執行緒池的執行狀態
            int rs = runStateOf(c);
            ……
            for (;;) {
                // 執行緒池中執行的執行緒數
                int wc = workerCountOf(c);
                
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //  原子增加 執行執行緒的數量 
                if (compareAndIncrementWorkerCount(c))
                    // 跳出迴圈 進入迴圈體外的邏輯
                    break retry;
                 ……
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 將任務封裝成Worker物件
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //  這裡通過加鎖,保證執行緒安全 保證新增任務安全
                mainLock.lock();
                try {
                    ……
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //  將任務放到 workers 集合中 在鎖中新增
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 新增成功後執行任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面看看具體的執行方法 即 t.start(); 方法的具體邏輯,程式碼如下:

//執行 start方法  會執行run 方法
public void run() {
            runWorker(this);
        }

下面是runWorker 的方法:task != null表示需要執行的任務不為空,在池中執行緒<核心池大小為true getTask():從佇列中獲取任務並且執行。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
       try {
            // 進入while迴圈,使該執行緒可以一直處理任務,執行緒不會執行完一次任務就退出,線上程池退出前,執行緒一直工作,以免執行緒資源的浪費
            while (task != null || (task = getTask()) != null) {
                w.lock();
                
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 具體執行任務
                        task.run();
                    } 
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

當這個執行緒執行完任務之後,並不會銷燬執行緒,而是不斷遍歷佇列中的任務,如果佇列中有任務,則執行相應的任務。這個是執行緒池的主要實現邏輯,即達到了執行緒複用的目的。

場景2:池中執行緒數大於等於核心池數但小於最大池數

//  這裡將任務放入到佇列中
if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

放入到佇列中,具體的執行時刻是在,場景1中的 while迴圈中的getTask() 獲取到任務,然後執行

場景3:

// 將任務加到workerSet中,並且新建一個執行緒去執行,這裡的core標識為false, 其餘邏輯和場景1中是一致的
else if (!addWorker(command, false))
            reject(command);

3:如何設定核心池的大小?如何計算?

如果是計算密集型,這時是不需要切換執行緒來計算的,開啟的執行緒數為 CPU個數

如果是IO阻塞密集,則根據實際的阻塞時間來進行計算,如:執行計算時間 為 a 阻塞時間為 b 這時執行緒池的大小應該設定為 (b/a+1)*cpu 核數 這個只是理論指導值