1. 程式人生 > >Java執行緒池(2)——執行緒池中的幾個重要方法詳解

Java執行緒池(2)——執行緒池中的幾個重要方法詳解

【內容摘要】

在java中,如果需要進行多執行緒程式設計,可以採用java自帶的執行緒池來實現,執行緒池對於我們新手來說是一個非常好的選擇,因為我們可以不用關心執行緒池中執行緒是如何排程的,避免在多執行緒程式設計過程產生死鎖等問題。在瞭解執行緒池的使用前,本文首先介紹一下java執行緒池的內部原理。

【正文】

上一篇文件http://blog.csdn.net/zly_ir/article/details/78785237 在介紹了執行緒池中的幾個重要的類過程中,提到了幾個重要的方法,如execute()方法等,在本篇文章中,我們將詳細介紹execute()方法是如何實現的,在介紹方法之前,我們首先回顧一下ThreadPoolExecutor中的幾個重要引數,這些引數將在後續方法中經常出現:

//任務快取佇列,用來存放等待執行的任務
private final BlockingQueue<Runnable> workQueue;              

//執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小                                                              //runState等)的改變都要使用這個鎖
private final ReentrantLock mainLock = new ReentrantLock();   

//用來存放工作集
private final HashSet<Worker> workers = new
HashSet<Worker>(); //執行緒存貨時間 private volatile long keepAliveTime; //是否允許為核心執行緒設定存活時間 private volatile boolean allowCoreThreadTimeOut; //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列) private volatile int corePoolSize; //執行緒池最大能容忍的執行緒數 private volatile int maximumPoolSize; //執行緒池中當前的執行緒數
private volatile int poolSize; //任務拒絕策略 private volatile RejectedExecutionHandler handler; //執行緒工廠,用來建立執行緒 private volatile ThreadFactory threadFactory; //用來記錄執行緒池中曾經出現過的最大執行緒數 private int largestPoolSize; //用來記錄已經執行完畢的任務個數 private long completedTaskCount;

1.execute()方法

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法,所以我們以execute()方法的實現原理為例進行說明:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //如果執行緒池中當前執行緒數不小於核心池大小
    //或者addIfUnderCorePoolSize這個方法返回false
    //則進入if語句執行接下去的程式碼
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        //如果當前執行緒池處於RUNNING狀態,則將任務放入任務快取佇列;
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        //如果當前執行緒池不處於RUNNING狀態或者任務放入快取佇列失敗
        //則執addIfUnderMaximumPoolSize(command)
        else if (!addIfUnderMaximumPoolSize(command))
            //如果執行addIfUnderMaximumPoolSize方法失敗
            //則執行reject()方法進行任務拒絕處理。
            reject(command); // is shutdown or saturated
    }
}

2.在execute()方法中,有兩個重要的功能模組,addIfUnderCorePoolSize和addIfUnderMaximumPoolSize,下面介紹這兩個功能的具體實現,並附上註釋
(1)addIfUnderCorePoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //判斷當前執行緒池中的執行緒數目是否小於核心池大小;
        //判斷前首先獲取鎖,目的是為了防止在在execute方法判斷的時候
        //poolSize小於corePoolSize,而判斷完之後,在其他執行緒中又向
        //執行緒池提交了任務,就可能導致poolSize不小於corePoolSize了
        if (poolSize < corePoolSize && runState == RUNNING)
            //建立執行緒去執行firstTask任務
            //傳進去的引數為提交的任務,返回值為Thread型別
            //具體實現下面會介紹   
            t = addThread(firstTask);   
        } finally {
        mainLock.unlock();
    }
    //為空則表明建立執行緒失敗
    //(即poolSize>=corePoolSize或者runState不等於RUNNING)
    if (t == null)
        return false;
    t.start();
    return true;
}

在addIfUnderCorePoolSize方法中,有一個任務新增函式addThread,它的具體實現如下:

private Thread addThread(Runnable firstTask) {
    //用提交的任務建立了一個Worker物件
    Worker w = new Worker(firstTask);
    //呼叫執行緒工廠threadFactory建立了一個新的執行緒t
    Thread t = threadFactory.newThread(w); 
    if (t != null) {
        //將執行緒t的引用賦值給了Worker物件的成員變數thread
        w.thread = t;  
        //通過workers.add(w)將Worker物件新增到工作集當中
        workers.add(w);
        //當前執行緒數加1
        int nt = ++poolSize;       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

addThread中的Worker類是一個實現了Runnable 的類,具體實現如下

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }

    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
             //beforeExecute方法是ThreadPoolExecutor類的一個方法,
             //沒有具體實現,使用者可以根據自己需要過載這個方法和後面的
             //afterExecute方法進行統計資訊,比如某個任務的執行時間等   
            beforeExecute(thread, task);          
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

    public void run() {
        try {
            //首先執行的是通過構造器傳進來的任務firstTask
            Runnable task = firstTask;
            firstTask = null;
            //不斷通過getTask()去取新的任務來執行
            //getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務佇列中沒有任務時,進行清理工作       
        }
    }
}

上述run()方法中的getTask是ThreadPoolExecutor類中的方法,他的具體實現如下:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            //判斷當前執行緒池狀態,如果runState大於SHUTDOWN
            //(即為STOP或者TERMINATED),則直接返回null。
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            //如果runState為SHUTDOWN或者RUNNING,則從任務快取佇列取任務。
            if (state == SHUTDOWN)
                r = workQueue.poll();
            //如果執行緒數大於核心池大小或者允許為核心池執行緒設定空閒時間,
            //則通過poll取任務,若等待一定的時間取不到任務,則返回null
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出  
            //workerCanExit() 方法將在之後介紹
            if (workerCanExit()) {    
                if (runState >= SHUTDOWN) 
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker,該方法將在之後介紹
                return null;
            }
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

在getTask()中,如果執行緒池處於STOP狀態、或者任務佇列已為空或者允許為核心池執行緒設定空閒存活時間並且執行緒數大於1時,允許worker退出。如果允許worker退出,則呼叫interruptIdleWorkers()中斷處於空閒狀態的worker,這裡有兩個重要函式,一個是workerCanExit(),另一個是interruptIdleWorkers(),下面分別介紹這兩個函式的具體實現

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大於等於STOP,或者任務快取佇列為空了
    //或者  允許為核心池執行緒設定空閒存活時間並且執行緒池中的執行緒數目大於1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

這裡有一個非常巧妙的設計方式,假如我們來設計執行緒池,可能會有一個任務分派執行緒,當發現有執行緒空閒時,就從任務快取佇列中取一個任務交給空閒執行緒執行。但是在這裡,並沒有採用這樣的方式,因為這樣會要額外地對任務分派執行緒進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的執行緒去任務快取佇列裡面取任務來執行。

(2)addIfUnderMaximumPoolSize,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是線上程池中的執行緒數達到了核心池大小並且往任務佇列中新增任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

【附錄】

執行緒池大小配置參考值

  • 如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設定為2*NCPU

【參考文獻】