1. 程式人生 > >雜談——Java執行緒池詳解

雜談——Java執行緒池詳解

在一個應用程式中,我們無可避免地需要多次使用執行緒,也就意味著,我們需要多次建立並銷燬執行緒。而建立並銷燬執行緒的過程勢必會消耗記憶體。

舉個例子,假設我們要去爬三百主流媒體網站,每天要抓一次資料,平均每次開銷50毫秒,處理開銷2毫秒,則可以開二十五個執行緒,假設每個網站有十個請求,那麼三百個網站就有3000個請求。

從上面我們可以知道,CPU等待時間/CPU執行時間=25:1,這樣的開銷太大了,而這二十多個執行緒又都必須保持存活,那執行緒銷燬的將超級大 。

1.執行緒池

而在Java中,記憶體資源是及其寶貴的,出現上文描述的情況絕對是不明智的。那麼如何節省開銷呢?

這時候就需要引入執行緒池了。執行緒池是什麼呢?

執行緒池是Java中開闢出的一種管理執行緒的概念

從概念以及應用場景,還是引入這個概念的目的,我們都可以斷定,執行緒池可以更加方便的管理執行緒,也可以減少記憶體的開銷。

那麼,我們應該如何建立一個執行緒池呢?執行緒池是Java自帶的,Java中已經提供了建立執行緒池的一個類:Executor。

而我們建立時,一般使用它的子類:ThreadPoolExecutor,其構造器原始碼如下:

public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
            BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory,
            RejectedExecutionHandler paramRejectedExecutionHandler) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
            throw new IllegalArgumentException();
        if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
            throw new NullPointerException();
        this.corePoolSize = paramInt1;
        this.maximumPoolSize = paramInt2;
        this.workQueue = paramBlockingQueue;
        this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
        this.threadFactory = paramThreadFactory;
        this.handler = paramRejectedExecutionHandler;
    }

也許看起來麻煩,我們就看一個簡略的就好:

public ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,  
                              long keepAliveTime,  
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue,  
                              ThreadFactory threadFactory,  
                              RejectedExecutionHandler handler)

 從上邊的程式碼我們可以看到該構造器有好些個引數,下面就對這些引數進行簡單的解釋:

  • corePoolSize :執行緒池的核心池大小,在建立執行緒池之後,執行緒池預設沒有任何執行緒。當有任務過來的時候才會去建立建立執行緒執行任務。換個說法,執行緒池建立之後,執行緒池中的執行緒數為0,當任務過來就會建立一個執行緒去執行,直到執行緒數達到corePoolSize 之後,就會被到達的任務放在佇列中。換句更精煉的話:corePoolSize 表示允許執行緒池中允許同時執行的最大執行緒數。如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。
  • maximumPoolSize :執行緒池允許的最大執行緒數,他表示最大能建立多少個執行緒。maximumPoolSize肯定是大於等於corePoolSize。
  • keepAliveTime :表示非核心執行緒沒有任務時最多保持多久然後停止。預設情況下,只有執行緒池中執行緒數大於corePoolSize 時,keepAliveTime 才會起作用。換句話說,當執行緒池中的執行緒數大於corePoolSize,並且一個執行緒空閒時間達到了keepAliveTime,那麼就是shutdown。
  • Unit:keepAliveTime 的單位。
  • workQueue :一個阻塞佇列,用來儲存等待執行的任務,當執行緒池中的執行緒數超過它的corePoolSize的時候,執行緒會進入阻塞佇列進行阻塞等待。通過workQueue,執行緒池實現了阻塞功能
  • threadFactory :執行緒工廠,用來建立執行緒。
  • handler :表示當拒絕處理任務時的策略,我們可以在任務滿了之後拒絕一些任務。

如果不好理解這些引數,可以參考下圖:

下面我們就來了解一下這些引數的細節。

任務快取佇列

任務快取佇列,即workQueue,它用來存放等待執行的任務。

workQueue的型別為BlockingQueue<Runnable>,通常可以取下面三種類型:

  • 有界任務佇列ArrayBlockingQueue:基於陣列的先進先出佇列,此佇列建立時必須指定大小;
  • 無界任務佇列LinkedBlockingQueue:基於連結串列的先進先出佇列,如果建立時沒有指定此佇列大小,則預設為Integer.MAX_VALUE;
  • 直接提交佇列synchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。

拒絕策略

handler的拒絕策略有四種:

  • AbortPolicy:不執行新任務,直接丟擲異常RejectedExecutionException,提示執行緒池已滿
  • DisCardPolicy:不執行新任務,也不丟擲異常(即不做任何處理)
  •  DisCardOldSetPolicy:將訊息佇列中的第一個任務替換為當前新進來的任務執行,丟棄佇列中最老的一個請求,也就是即將被執行的一個任務並嘗試再次提交當前任務。
  •  CallerRunsPolicy:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交執行緒的效能極有可能會急劇下降。

執行緒池的任務處理策略:

如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;

如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;

如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止。

執行緒池的關閉

ThreadPoolExecutor提供了兩個方法,用於執行緒池的關閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務

shutdownNow():立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務

原始碼分析

首先來看最核心的execute方法,這個方法在AbstractExecutorService中並沒有實現,從Executor介面,直到ThreadPoolExecutor才實現了改方法,

ExecutorService中的submit(),invokeAll(),invokeAny()都是呼叫的execute方法,所以execute是核心中的核心,原始碼分析將圍繞它逐步展開。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果正在執行的執行緒數小於corePoolSize,那麼將呼叫addWorker 方法來建立一個新的執行緒,並將該任務作為新執行緒的第一個任務來執行。
       當然,在建立執行緒之前會做原子性質的檢查,如果條件不允許,則不建立執行緒來執行任務,並返回false.  

         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 如果一個任務成功進入阻塞佇列,那麼我們需要進行一個雙重檢查來確保是我們已經新增一個執行緒(因為存在著一些執行緒在上次檢查後他已經死亡)或者
       當我們進入該方法時,該執行緒池已經關閉。所以,我們將重新檢查狀態,執行緒池關閉的情況下則回滾入佇列,執行緒池沒有執行緒的情況則建立一個新的執行緒。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
       如果任務無法入佇列(佇列滿了),那麼我們將嘗試新開啟一個執行緒(從corepoolsize到擴充到maximum),如果失敗了,那麼可以確定原因,要麼是
       執行緒池關閉了或者飽和了(達到maximum),所以我們執行拒絕策略。

         */
    
    // 1.當前執行緒數量小於corePoolSize,則建立並啟動執行緒。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        // 成功,則返回
return;
            c = ctl.get();
        }
    // 2.步驟1失敗,則嘗試進入阻塞佇列,
        if (isRunning(c) && workQueue.offer(command)) {
       // 入佇列成功,檢查執行緒池狀態,如果狀態部署RUNNING而且remove成功,則拒絕任務
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 
如果當前worker數量為0,通過addWorker(null, false)建立一個執行緒,其任務為null

            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    // 3. 步驟1和2失敗,則嘗試將執行緒池的數量有corePoolSize擴充至
maxPoolSize,如果失敗,則拒絕任務

        else if (!addWorker(command, false))
            reject(command);
    }
當然啦,也許有很多人對程式碼不敏感,看了程式碼或許也是一臉懵,接下來用一個流程圖來講一講,他究竟幹了什麼事:

結合上面的流程圖來逐行解析,首先前面進行空指標檢查,workerCountOf()方法能夠取得當前執行緒池中的執行緒的總數,取得當前執行緒數與核心池大小比較:

  • 如果小於,將通過addWorker()方法排程執行。
  • 如果大於核心池大小,那麼就提交到等待佇列。
  • 如果進入等待佇列失敗,則會將任務直接提交給執行緒池。
  • 如果執行緒數達到最大執行緒數,那麼就提交失敗,執行拒絕策略。

用大白話說,就是:任務進來時,首先執行判斷,判斷核心執行緒是否處於空閒狀態,如果不是,核心執行緒就先就執行任務,如果核心執行緒已滿,則判斷任務佇列是否有地方存放該任務,若果有,就將任務儲存在任務佇列中,等待執行,如果滿了,在判斷最大可容納的執行緒數,如果沒有超出這個數量,就開創非核心執行緒執行任務,如果超出了,就呼叫handler實現拒絕策略。
2.addWorker()方法

上文中我們收到excute()方法中新增任務的方式是使用addWorker()方法,下面我們來看一下它的原始碼。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
     // 外層迴圈,用於判斷執行緒池狀態
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
       // 內層的迴圈,任務是將worker數量加1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    // worker加1後,接下來將woker新增到HashSet<Worker>中,並啟動worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
         // 如果往HashSet<Worker>新增成功,則啟動該執行緒
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker(Runnable firstTask, boolean core)的主要任務是建立並啟動執行緒。

他會根據當前執行緒的狀態和給定的值(core or maximum)來判斷是否可以建立一個執行緒。

addWorker共有四種傳參方式。execute使用了其中三種,分別為:

  • addWorker(paramRunnable, true),執行緒數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false.
  • addWorker(null, false),放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務佇列裡拿任務,這樣就相當於建立了一個新的執行緒,只是沒有馬上分配任務。
  • addWorker(paramRunnable, false),當佇列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果執行緒池也滿了的話就返回false.

還有一種情況是execute()方法沒有使用的

addWorker(null, true)

這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為執行緒池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行。

它的執行流程如下:

1、判斷執行緒池當前是否為可以新增worker執行緒的狀態,可以則繼續下一步,不可以return false:

  •     A、執行緒池狀態>shutdown,可能為stop、tidying、terminated,不能新增worker執行緒
  •     B、執行緒池狀態==shutdown,firstTask不為空,不能新增worker執行緒,因為shutdown狀態的執行緒池不接收新任務
  •     C、執行緒池狀態==shutdown,firstTask==null,workQueue為空,不能新增worker執行緒,因為firstTask為空是為了新增一個沒有任務的執行緒再從workQueue獲取task,而workQueue為空,說明新增無任務執行緒已經沒有意義

2、執行緒池當前執行緒數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了就return false,沒超過則對workerCount+1,繼續下一步

3、線上程池的ReentrantLock保證下,向Workers Set中新增新建立的worker例項,新增完成後解鎖,並啟動worker執行緒,如果這一切都成功了,return true,如果新增worker入Set失敗或啟動失敗,呼叫addWorkerFailed()邏輯。

 

3.常見的四種執行緒池

(1)newFixedThreadPool(固定大小的執行緒池

 

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}

 

固定大小的執行緒池,可以指定執行緒池的大小,該執行緒池corePoolSize和maximumPoolSize相等,阻塞佇列使用的是LinkedBlockingQueue,大小為整數最大值。

該執行緒池中的執行緒數量始終不變,當有新任務提交時,執行緒池中有空閒執行緒則會立即執行,如果沒有,則會暫存到阻塞佇列。對於固定大小的執行緒池,不存線上程數量的變化。同時使用無界的LinkedBlockingQueue來存放執行的任務。當任務提交十分頻繁的時候,LinkedBlockingQueue

迅速增大,存在著耗盡系統資源的問題。而且線上程池空閒時,即執行緒池中沒有可執行任務時,它也不會釋放工作執行緒,還會佔用一定的系統資源,需要shutdown。

(2)newSingleThreadExecutor(單個執行緒執行緒池

 

public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
    }

 

單個執行緒執行緒池,只有一個執行緒的執行緒池,阻塞佇列使用的是LinkedBlockingQueue,若有多餘的任務提交到執行緒池中,則會被暫存到阻塞佇列,待空閒時再去執行。按照先入先出的順序執行任務。

(3)newCachedThreadPool(快取執行緒池

 

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
    }

 

快取執行緒池,快取的執行緒預設存活60秒。執行緒的核心池corePoolSize大小為0,核心池最大為Integer.MAX_VALUE,阻塞佇列使用的是SynchronousQueue。是一個直接提交的阻塞佇列,    他總會迫使執行緒池增加新的執行緒去執行新的任務。在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime(60秒),則工作執行緒將會終止被回收,當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷。如果同時又大量任務被提交,而且任務執行的時間不是特別快,那麼執行緒池便會新增出等量的執行緒池處理任務,這很可能會很快耗盡系統的資源。

(4)newScheduledThreadPool(定時執行緒池

 

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
    }

該執行緒池可用於週期性地去執行任務,通常用於週期性的同步資料。

  • scheduleAtFixedRate:是以固定的頻率去執行任務,週期是指每次執行任務成功執行之間的間隔。
  • schedultWithFixedDelay:是以固定的延時去執行任務,延時是指上一次執行成功之後和下一次開始執行的之前的時間。

總結一下這四種執行緒池:

  • newCachedThreadPool:可快取的執行緒池,該執行緒池中沒有核心執行緒,非核心執行緒的數量為Integer.max_value,就是無限大,當有需要時建立執行緒來執行任務,沒有需要時回收執行緒,適用於耗時少,任務量大的情況。
  • newSecudleThreadPool:週期性執行任務的執行緒池,按照某種特定的計劃執行執行緒中的任務,有核心執行緒,但也有非核心執行緒,非核心執行緒的大小也為無限大。適用於執行週期性的任務。
  • newSingleThreadPool:只有一條執行緒來執行任務,適用於有順序的任務的應用場景。
  • newFixedThreadPool:定長的執行緒池,有核心執行緒,核心執行緒的即為最大的執行緒數量,沒有非核心執行緒

 

4.引申(一):如何選擇執行緒池數量

執行緒池的大小決定著系統的效能,過大或者過小的執行緒池數量都無法發揮最優的系統性能。當然執行緒池的大小也不需要做的太過於精確,只需要避免過大和過小的情況。一般來說,確定執行緒池的大小需要考慮CPU的數量,記憶體大小,任務是計算密集型還是IO密集型等因素。

執行緒池最優大小=NCPU *UCPU(1+W/C)

其中,NCPU = CPU的數量,UCPU = 期望對CPU的使用率 0 ≤ UCPU ≤ 1,W/C = 等待時間與計算時間的比率

 

注:在Java中使用以下方法可以獲取到cpu的數量

int ncpus = Runtime.getRuntime().availableProcessors();

 5.引申(二):執行緒池的正確使用

以下阿里編碼規範裡面說的一段話:

執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
  主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。

6.引申(三) :手動建立執行緒池的注意點

  • 1.任務獨立。如何任務依賴於其他任務,那麼可能產生死鎖。例如某個任務等待另一個任務的返回值或執行結果,那麼除非執行緒池足夠大,否則將發生執行緒飢餓死鎖。
  • 2.合理配置阻塞時間過長的任務。如果任務阻塞時間過長,那麼即使不出現死鎖,執行緒池的效能也會變得很糟糕。在Java併發包裡可阻塞方法都同時定義了限時方式和不限時方式。例如Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任務超時,則標識任務失敗,然後中止任務或者將任務放回佇列以便隨後執行,這樣,無論任務的最終結果是否成功,這種辦法都能夠保證任務總能繼續執行下去。
  • 3.設定合理的執行緒池大小。只需要避免過大或者過小的情況即可,上文的公式執行緒池大小=NCPU *UCPU(1+W/C)
  • 4.選擇合適的阻塞佇列。newFixedThreadPool和newSingleThreadExecutor都使用了無界的阻塞佇列,無界阻塞佇列會有消耗很大的記憶體,如果使用了有界阻塞佇列,它會規避記憶體佔用過大的問題,但是當任務填滿有界阻塞佇列,新的任務該怎麼辦?在使用有界佇列是,需要選擇合適的拒絕策略,佇列的大小和執行緒池的大小必須一起調節。對於非常大的或者無界的執行緒池,可以使用SynchronousQueue來避免任務排隊,以直接將任務從生產者提交到工作者執行緒。

 

好啦,以上就是關於Java執行緒池的相關知識總結,如果大家有什麼不明白的地方或者發現文中有描述不好的地方,歡迎大家留言評論,我們一起學習呀。

 

Biu~~~~~~~~~~~~~~~~~~~~宫å´éªé¾ç«è¡¨æå|é¾ç«gifå¾è¡¨æåä¸è½½å¾ç~~~~~~~~~~~~~~~~~~~~~~pia!

參考部落格:http://www.cnblogs.com/superfj/p/7544971.html

https://blog.csdn.net/weixin_40271838/article/details/79998327