1. 程式人生 > >執行緒池深入解析筆記

執行緒池深入解析筆記

概述

這篇筆記也是整理於寒假實習期間,由於美顏相機demo中的縮圖渲染功能涉及到多執行緒,使用了執行緒池減少資源開銷,所以整理一下執行緒池筆記,以便以後查詢(待完善)。
使用執行緒池的主要目的在於:
1. 降低資源消耗
2. 提高響應速度
3. 提高執行緒的可管理性

幾個引數

執行緒池的構造器中主要有以下幾個引數
1. corePoolSize:核心執行緒的數量。核心執行緒是在執行完任務之後也不會被銷燬的執行緒。
2. maximumPoolSize:執行緒池中的最大執行緒數。表示執行緒池中最多能同時存在多少個執行緒。
3. keepAliveTime:表示執行緒沒有任務執行時最多能存活多久。預設情況下只在執行緒數大於corePoolSize時起作用。但是通過呼叫allowCoreThreadTimeOut(boolean)方法可以線上程池中數量不超過corePoolSize時也會起作用,直到執行緒池中的執行緒數為0。
4. unit:keepAliveTime的時間單位,具體取值看原始碼。
5. workQueue:一個阻塞佇列,用來儲存等待執行的任務。一般情況下有以下幾種佇列:
* ArrayBlockingQueue:基於陣列的有界

阻塞佇列,按照FIFO原則對元素排序。
* LinkedBlockingQueue:基於連結串列的無界阻塞佇列,按照FIFO排序。Executors.newFixedThreadPool()使用了這個佇列。
* SynchronizedQueue:一個不儲存元素的阻塞佇列,每個插入操作都必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態。Executors.newCachedThreadPool使用了這個佇列。
* PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
6. threadFactory:執行緒工廠,用於建立執行緒。
7. handler:飽和策略,即當佇列和執行緒池都滿了,應該採取什麼策略來處理新提交的任務。通常有以下四種策略:
* ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException。
* ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋異常。
* ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務。

* ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒執行任務。

執行緒池的原理

執行緒的狀態

  public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */
        NEW,

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
RUNNABLE, /** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */ BLOCKED, /** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called <tt>Object.wait()</tt> * on an object is waiting for another thread to call * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on * that object. A thread that has called <tt>Thread.join()</tt> * is waiting for a specified thread to terminate. */ WAITING, /** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */ TIMED_WAITING, /** * Thread state for a terminated thread. * The thread has completed execution. */ TERMINATED; }

由Thread內部定義的列舉型別State可以看出執行緒共有六種狀態:
1. NEW:新建執行緒,但是還沒呼叫start()方法。處於NEW狀態的執行緒有自己的記憶體空間。
2. RUNNABLE:可執行狀態,執行緒物件建立且呼叫了start()方法後進入該狀態,該狀態的執行緒位於可執行執行緒池中,等待被執行緒排程器選中,獲取CPU的使用權。
3. BLOCKED:等待獲取鎖時進入的狀態,執行緒被掛起,通常是因為它在等待一個鎖。當某個synchronized正好有執行緒在使用時,一個執行緒嘗試進入這個臨界區,就會被阻塞。當它搶到鎖之後,才會從BLOCKED狀態轉換為RUNNABLE狀態。
4. WAITING:當呼叫wait()、join()、park()方法時,進入WAITING狀態。前提是這個執行緒已經擁有鎖。

WAITING狀態與BLOCKED狀態的區別在於:①、BLOCKED是虛擬機器認為程式還不能進入某個臨界區。 ②、WAITING狀態的先決條件是當前執行緒已經在臨界區中,但是它自己通過判定業務上的引數,發現還有一些其他配合的資源沒有準備充分而選擇等待再做其他事情。

可以通過notify/notifyAll動作,從等待池中喚醒執行緒重新恢復到RUNNABLE狀態。
5. TIMED_WAITING:通過wait(t)、sleep(t)、join(t)等方法進入此狀態。當時間達到時觸發執行緒回到工作狀態RUNNABLE。interrupt只對WAITING或者TIMED_WAITING狀態的執行緒起作用。
6. TERMINATED:執行緒結束後就是這種狀態。也就是run方法執行完畢。

執行緒池的狀態

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

由ThreadPoolExecutor中定義的常量可以知道執行緒池由五種狀態:
1. RUNNING:能接受新提交的任務,並且也能處理阻塞佇列的任務。
2. SHUTDOWN:關閉狀態,不再接受新的任務,但可以繼續處理阻塞佇列中一儲存的任務。呼叫shutdown()方法會進入此狀態。
3. STOP:不能接受新任務,也不能處理佇列中的任務。呼叫shutdownNow()方法會進入此狀態。
4. TIDYING:如果所有的任務都已終止,workerCount(有效執行緒數)為0,執行緒池會進入這個狀態。只有會呼叫terminated()方法進入TERMINATED狀態。
5. TERMINATED:終止狀態。
狀態轉換圖如下:
![enter description here][1]

執行過程

我們通過submit(Runnable)方法來提交任務時,執行程式碼如下:

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

它會把Runnable物件封裝成一個RunnableFuture物件。然後呼叫execute方法。因此真正的執行邏輯在execute方法裡:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         //通過ctl.get()方法獲取int值,記錄了執行緒池當前的runState與workerCount。
        int c = ctl.get(); 
        //然後通過workerCountOf方法取出低29位的值,表示當前活動的執行緒數。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
        //如果新增失敗,重新獲取ctl值。
            c = ctl.get();
        }
        //如果當前執行緒池是Running狀態,且成功將任務新增進任務佇列中
        if (isRunning(c) && workQueue.offer(command)) {
        //再次獲取ctl值,如果不是執行狀態,把剛新增進去的任務移除。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
            //呼叫reject方法,根據設定的拒絕策略執行不同的操作。
                reject(command);
                //如果當前有效執行緒數是0,則執行addWorker操作。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //執行到這裡,有兩種情況:①、執行緒池已經不是RUNNING狀態。②、執行緒池是RUNNING狀態,但workerCount >= corePoolSize且workQueue已滿。則再次執行addWorker方法。
        else if (!addWorker(command, false))
        //如果失敗則拒絕執行任務。
            reject(command);
    }

相關解析看程式碼中的註釋。

關於addWorker方法,有兩個引數 Runnable firstTask 與 boolean core。第一個引數為null,表示線上程池中建立一個執行緒但不會啟動。第二個引數為true,表示將執行緒池的執行緒數量上限設定為corePoolSize,為false表示將上限設定為maximumPoolSize。

執行緒池的排程方式

怎樣開始執行

執行緒池通過addWorker方法成功建立一個新執行緒之後,會呼叫新執行緒的start方法。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

t.start()句程式碼中,t實際是Worker中的Thread型別常量:

/** Thread this worker is running in.  Null if factory fails. */
final Thread thread;

thread是在Worker的構造方法中被初始化的:

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

在Worker的構造方法中,可以看到通過ThreadFactory建立了一個新的執行緒,同時傳入了this,即是當前的worker物件。Woker類實現了Runnable介面,因此可以作為引數傳入。這樣,當在addWorker()方法中呼叫t.start()方法時,實際呼叫的就是Worker物件中的run()方法。那麼接著看Worker中的run()方法:

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

裡面只有一句,呼叫runWorker(this)。進入runWorker(this)瞧瞧:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

基本原理就是通過迴圈,不斷地從任務佇列中獲取任務,並執行。在上面的原始碼中可以看到,beforeExecute()/afterExecute()方法也是在這裡面被呼叫的。

怎樣結束執行

AQS

執行緒池中的異常捕獲

合理配置執行緒池(待整理):

http://gityuan.com/2016/01/16/thread-pool/
4.1 合理地配置執行緒池
需要針對具體情況而具體處理,不同的任務類別應採用不同規模的執行緒池,任務類別可劃分為CPU密集型任務、IO密集型任務和混合型任務。

對於CPU密集型任務:執行緒池中執行緒個數應儘量少,不應大於CPU核心數;
對於IO密集型任務:由於IO操作速度遠低於CPU速度,那麼在執行這類任務時,CPU絕大多數時間處於空閒狀態,那麼執行緒池可以配置儘量多些的執行緒,以提高CPU利用率;
對於混合型任務:可以拆分為CPU密集型任務和IO密集型任務,當這兩類任務執行時間相差無幾時,通過拆分再執行的吞吐率高於序列執行的吞吐率,但若這兩類任務執行時間有資料級的差距,那麼沒有拆分的意義。

相關推薦

執行深入解析筆記

概述 這篇筆記也是整理於寒假實習期間,由於美顏相機demo中的縮圖渲染功能涉及到多執行緒,使用了執行緒池減少資源開銷,所以整理一下執行緒池筆記,以便以後查詢(待完善)。 使用執行緒池的主要目的在於: 1. 降低資源消耗 2. 提高響應速度 3.

死磕 java執行系列之執行深入解析——體系結構

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 簡介 Java的執行緒池是塊硬骨頭,對執行緒池的原始碼做深入研究不僅能提高對Java整個併發程式設計的理解,也能提高自己在面試中的表現,增加被錄取的可能性。 本系列將分成很多個章節,本章作為執行緒池的第一章將對

死磕 java執行系列之執行深入解析——生命週期

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 上一章我們一起重溫了下執行緒的生命週期(六種狀態還記得不?),但是你知不知道其實執行緒池也是有生命週期的呢?! 問題 (1)

死磕 java執行系列之執行深入解析——普通任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了Java中執行緒池的體系結構、構造方法和生命週期,本章我們一起來學習執行緒池中普通任務到底是怎麼執行的。

死磕 java執行系列之執行深入解析——未來任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:執行緒池原始碼部分如無特殊說明均指ThreadPoolExecutor類。 簡介 前面我們一起學習了執行緒池中普通任務的執行流程,但其實執行緒池中還有一種任務,叫作未來任務(future task),使用它

死磕 java執行系列之執行深入解析——定時任務執行流程

(手機橫屏看原始碼更方便) 注:java原始碼分析部分如無特殊說明均基於 java8 版本。 注:本文基於ScheduledThreadPoolExecutor定時執行緒池類。 簡介 前面我們一起學習了普通任務、未來任務的執行流程,今天我們再來學習一種新的任務——定時任務。 定時任務是我們經常會用到的一

Java高階應用:執行全面解析

  什麼是執行緒池?   很簡單,簡單看名字就知道是裝有執行緒的池子,我們可以把要執行的多執行緒交給執行緒池來處理,和連線池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的複用。   執行緒池的好處 &n

四種Java執行用法解析

本文為大家分析四種Java執行緒池用法,供大家參考,具體內容如下 http://www.jb51.net/article/81843.htm 1、new Thread的弊端 執行一個非同步任務你還只是如下new Thread嗎? new Thread(new Runn

併發程式設計的藝術-執行原始碼解析

執行緒池的作用: 1,降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。 2,提搞響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。 3,提高系統的客觀理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定

017.多執行-執行原理解析以及合理配置

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveT

Java多執行執行深入分析

執行緒池是併發包裡面很重要的一部分,在實際情況中也是使用很多的一個重要元件。 下圖描述的是執行緒池API的一部分。廣義上的完整執行緒池可能還包括Thread/Runnable、Timer/TimerTask等部分。這裡只介紹主要的和高階的API以及架構和原理。 大

Java 多執行ThreadPoolExecutor解析及Executors類中提供的靜態方法來建立執行

上面的程式碼可能看起來不是那麼容易理解,下面我們一句一句解釋:   首先,判斷提交的任務command是否為null,若是null,則丟擲空指標異常;   接著是這句,這句要好好理解一下: if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(c

執行原理解析

執行緒池的原始碼及原理[JDK1.6實現] 1.執行緒池的包含的內容 2.執行緒池的資料結構【核心類ThreadPoolExecutor】: worker:工作類,一個worker代表啟動了一個執行緒,它啟動後會迴圈執行workQueue裡面的所有任務 workQueu

Java執行原始碼解析及高質量程式碼案例

引言 本文為Java高階程式設計中的一些知識總結,其中第一章對Jdk 1.7.0_25中的多執行緒架構中的執行緒池ThreadPoolExecutor原始碼進行架構原理介紹以及原始碼解析。第二章則分析了幾個違反Java高質量程式碼案例以及相應解決辦法。如有總結

Android執行原始碼解析

     上一篇部落格大概瞭解了下執行緒池是什麼,這篇部落格將在原始碼的基礎上去驗證上一篇部落格中提到的 Thread執行流程。我的部落格保證是一個字一個字敲出來的    1.執行緒池原始碼解析     在ThreadPoolExecutor類中,最核心的任務提交方法是e

執行原始碼解析

<span style="font-family: "microsoft yahei"; font-size: 12px; background-color: rgb(255, 255, 255);">轉載:</span> http://bl

ThreadPoolExecutor執行原始碼解析

關鍵構造屬性:    volatile int runState;                  保證了多執行緒的共享可見性    static final int RUNNING    = 0;          static final int SHUTDOWN   

【Java進階】執行深入理解

Java併發程式設計:執行緒池的使用在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。那

java併發程式設計之執行,執行,ansync執行原始碼解析

前言 java開源長了, 程式碼久了,網上對於執行緒那是眾說紛紜,一直縈繞我心頭的,jdk執行緒池好還是spring執行緒池好

執行之ThreadPoolExecutor執行原始碼分析筆記

1.執行緒池的作用 一方面當執行大量非同步任務時候執行緒池能夠提供較好的效能,在不使用執行緒池的時候,每當需要執行非同步任務時候是直接 new 一執行緒進行執行,而執行緒的建立和銷燬是需要開銷的。使用執行緒池時候,執行緒池裡面的執行緒是可複用的,不會每次執行非同步任務時候都重新建立和銷燬執行緒。 另一方面