1. 程式人生 > >執行緒基礎:執行緒池(5)——基本使用(上)

執行緒基礎:執行緒池(5)——基本使用(上)

執行緒基礎:執行緒池(5)——基本使用(上)

:http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/50522458

1、概述

從本文開始,我將用兩篇文章的篇幅,為各位讀者呈現JAVA中原生的執行緒池技術。第一篇文章,我將講解JAVA原生執行緒池的基本使用,並由此延伸出JAVA中和執行緒管理相關的類結構體系,然後我們詳細描述JAVA原生執行緒池的結構和工作方式;第二篇文章,我們將繼續深入,講解JAVA原生執行緒池的高階特性,包括Thread工廠、佇列、拒絕原則、鉤子和相關工具類。

如果您是JAVA語言的初學者,請從本篇文章看起;如果您對執行緒池技術已有一定的瞭解,那麼可以只看下一篇文章;如果您是高手,請繞行;如果您對我的觀點有任何意見和建議,請留言,謝謝。^-^

2、為什麼要使用執行緒池

這裡寫圖片描述

前文我們已經講到,執行緒是一個作業系統概念。作業系統負責這個執行緒的建立、掛起、執行、阻塞和終結操作。而作業系統建立執行緒、切換執行緒狀態、終結執行緒都要進行CPU排程——這是一個耗費時間和系統資源的事情(《作業系統知識回顧—程序執行緒模型》

另一方面,目前大多數生產環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨建立一個執行緒,那麼物理機的所有資源基本上都被作業系統建立執行緒、切換執行緒狀態、銷燬執行緒這些操作所佔用,用於業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的執行緒數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理本身

另外,一些作業系統是有最大執行緒數量限制的。當執行的執行緒數量逼近這個值的時候,作業系統會變得不穩定。這也是我們要限制執行緒數量的原因。

3、執行緒池的基本使用方式

JAVA語言為我們提供了兩種基礎執行緒池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService介面(注意,ExecutorService介面本身和“執行緒池”並沒有直接關係,它的定義更接近“執行器”,而“使用執行緒管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。

3-1、簡單使用

首先我們來看看ThreadPoolExecutor類的最簡單使用方式:

package test.thread.pool;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class PoolThreadSimple {

    static {
        BasicConfigurator.configure();
    }

    public static void main(String[] args) throws Throwable {

        /*
         * corePoolSize:核心大小,執行緒池初始化的時候,就會有這麼大
         * maximumPoolSize:執行緒池最大執行緒數
         * keepAliveTime:如果當前執行緒池中執行緒數大於corePoolSize。
         * 多餘的執行緒,在等待keepAliveTime時間後如果還沒有新的執行緒任務指派給它,它就會被回收
         * 
         * unit:等待時間keepAliveTime的單位
         * 
         * workQueue:等待佇列。這個物件的設定是本文將重點介紹的內容
         * */
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>());
        for(int index = 0 ; index < 10 ; index ++) {
            poolExecutor.submit(new PoolThreadSimple.TestRunnable(index));
        }

        // 沒有特殊含義,只是為了保證main執行緒不會退出
        synchronized (poolExecutor) {
            poolExecutor.wait();
        }
    }

    /**
     * 這個就是測試用的執行緒
     * @author yinwenjie
     */
    private static class TestRunnable implements Runnable {

        /**
         * 日誌
         */
        private static Log LOGGER = LogFactory.getLog(TestRunnable.class);

        /**
         * 記錄任務的唯一編號,這樣在日誌中好做識別
         */
        private Integer index;

        public TestRunnable(int index) {
            this.index = index;
        }

        /**
         * @return the index
         */
        public Integer getIndex() {
            return index;
        }

        @Override
        public void run() {
            /*
             * 執行緒中,就只做一件事情:
             * 等待60秒鐘的事件,以便模擬業務操作過程
             * */
            Thread currentThread  = Thread.currentThread();
            TestRunnable.LOGGER.info("執行緒:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行===");
            synchronized (currentThread) {
                try {
                    currentThread.wait(60000);
                } catch (InterruptedException e) {
                    TestRunnable.LOGGER.error(e.getMessage(), e);
                }
            }

            TestRunnable.LOGGER.info("執行緒:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成");
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

隨後的文章中我們將對執行緒池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler引數和一些常用/不常用的設定項進行逐一講解。

3-2、ThreadPoolExecutor邏輯結構和工作方式

在上面的程式碼中,我們建立執行緒池的時候使用了ThreadPoolExecutor中最簡單的一個建構函式:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
  • 1
  • 2
  • 3
  • 4
  • 5

建構函式中需要傳入的引數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些引數(和後續將要介紹的引數)的含義,就首先要搞清楚ThreadPoolExecutor執行緒池的邏輯結構。

這裡寫圖片描述

一定要注意一個概念,即存在於執行緒池中容器的一定是Thread物件,而不是您要求執行的任務(所以叫執行緒池而不叫任務池也不叫物件池,更不叫游泳池);您要求執行的任務將被執行緒池分配給某一個空閒的Thread執行。

從上圖中,我們可以看到構成執行緒池的幾個重要元素:

  • 等待佇列:顧名思義,就是您呼叫執行緒池物件的submit()方法或者execute()方法,要求執行緒池執行的任務(這些任務必須實現Runnable介面或者Callable介面)。但是出於某些原因執行緒池並沒有馬上執行這些任務,而是送入一個佇列等待執行(這些原因後文馬上講解)。

  • 核心執行緒:執行緒池主要用於執行任務的是“核心執行緒”,“核心執行緒”的數量是您建立執行緒時所設定的corePoolSize引數決定的。如果不進行特別的設定,執行緒池中始終會保持corePoolSize數量的執行緒數(不包括建立階段)。

  • 非核心執行緒:一旦任務數量過多(由等待佇列的特性決定),執行緒池將建立“非核心執行緒”臨時幫助執行任務。您設定的大於corePoolSize引數小於maximumPoolSize引數的部分,就是執行緒池可以臨時建立的“非核心執行緒”的最大數量。這種情況下如果某個執行緒沒有執行任何任務,在等待keepAliveTime時間後,這個執行緒將會被銷燬,直到執行緒池的執行緒數量重新達到corePoolSize

  • 要重點理解上一條描述中黑體字部分的內容。也就是說,並不是所謂的“非核心執行緒”才會被回收;而是誰的空閒時間達到keepAliveTime這個閥值,就會被回收。直到執行緒池中執行緒數量等於corePoolSize為止。

  • maximumPoolSize引數也是當前執行緒池允許建立的最大執行緒數量。那麼如果您設定的corePoolSize引數和您設定的maximumPoolSize引數一致時,執行緒池在任何情況下都不會回收空閒執行緒。keepAliveTime和timeUnit也就失去了意義。

  • keepAliveTime引數和timeUnit引數也是配合使用的。keepAliveTime引數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閒執行緒的回收閥值為1分鐘。

說完了執行緒池的邏輯結構,下面我們討論一下執行緒池是怎樣處理某一個執行任務的。下圖描述了一個完整的任務處理過程:

這裡寫圖片描述

1、首先您可以通過執行緒池提供的submit()方法或者execute()方法,要求執行緒池執行某個任務。執行緒池收到這個要求執行的任務後,會有幾種處理情況:

1.1、如果當前執行緒池中執行的執行緒數量還沒有達到corePoolSize大小時,執行緒池會建立一個新的執行緒執行您的任務,無論之前已經建立的執行緒是否處於空閒狀態

1.2、如果當前執行緒池中執行的執行緒數量已經達到設定的corePoolSize大小,執行緒池會把您的這個任務加入到等待佇列中。直到某一個的執行緒空閒了,執行緒池會根據您設定的等待佇列規則,從佇列中取出一個新的任務執行。

1.3、如果根據佇列規則,這個任務無法加入等待佇列。這時執行緒池就會建立一個“非核心執行緒”直接執行這個任務。注意,如果這種情況下任務執行成功,那麼當前執行緒池中的執行緒數量一定大於corePoolSize。

1.4、如果這個任務,無法被“核心執行緒”直接執行,又無法加入等待佇列,又無法建立“非核心執行緒”直接執行,且您沒有為執行緒池設定RejectedExecutionHandler。這時執行緒池會丟擲RejectedExecutionException異常,即執行緒池拒絕接受這個任務。(實際上丟擲RejectedExecutionException異常的操作,是ThreadPoolExecutor執行緒池中一個預設的RejectedExecutionHandler實現:AbortPolicy,這在後文會提到

2、一旦執行緒池中某個執行緒完成了任務的執行,它就會試圖到任務等待佇列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue介面,按照介面字面上的理解,這是一個可阻塞的佇列介面),它會呼叫等待佇列的poll()方法,並停留在哪裡。

3、當執行緒池中的執行緒超過您設定的corePoolSize引數,說明當前執行緒池中有所謂的“非核心執行緒”。那麼當某個執行緒處理完任務後,如果等待keepAliveTime時間後仍然沒有新的任務分配給它,那麼這個執行緒將會被回收。執行緒池回收執行緒時,對所謂的“核心執行緒”和“非核心執行緒”是一視同仁的,直到執行緒池中執行緒的數量等於您設定的corePoolSize引數時,回收過程才會停止。

3-3、不常用的設定

在ThreadPoolExecutor執行緒池中,有一些不常用的設定。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設定:

3-3-1、 allowCoreThreadTimeOut:

前文我們討論到,執行緒池回收執行緒只會發生在當前執行緒池中執行緒數量大於corePoolSize引數的時候;當執行緒池中執行緒數量小於等於corePoolSize引數的時候,回收過程就會停止。

allowCoreThreadTimeOut設定項可以要求執行緒池:將包括“核心執行緒”在內的,沒有任務分配的任何執行緒,在等待keepAliveTime時間後全部進行回收:

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));

poolExecutor.allowCoreThreadTimeOut(true);
  • 1
  • 2
  • 3

以下是設定前的效果:

這裡寫圖片描述

以下是設定後的效果:

這裡寫圖片描述

3-3-2 prestartAllCoreThreads

前文我們還討論到,當執行緒池中的執行緒還沒有達到您設定的corePoolSize引數值的時候,如果有新的任務到來,執行緒池將建立新的執行緒執行這個任務,無論之前已經建立的執行緒是否處於空閒狀態。這個描述可以用下面的示意圖表示出來:

這裡寫圖片描述

prestartAllCoreThreads設定項,可以線上程池建立,但還沒有接收到任何任務的情況下,先行建立符合corePoolSize引數值的執行緒數:

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));

poolExecutor.prestartAllCoreThreads();
  • 1
  • 2
  • 3

================================

(後文預告:執行緒基礎:執行緒池(6)——高階特性(下))

ThreadPoolExecutor類結構體系

使用ThreadFactory

執行緒池任務佇列(重點講解)

拒絕任務

擴充套件ThreadPoolExecutor執行緒池

    Hook methods 

    Queue maintenance 

工具類和後記

    Executors

    Apache中的擴充套件

    與spring結合