執行緒基礎:執行緒池(5)——基本使用(上)
執行緒基礎:執行緒池(5)——基本使用(上)
:http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/50522458
1、概述
從本文開始,我將用兩篇文章的篇幅,為各位讀者呈現JAVA中原生的執行緒池技術。第一篇文章,我將講解JAVA原生執行緒池的基本使用,並由此延伸出JAVA中和執行緒管理相關的類結構體系,然後我們詳細描述JAVA原生執行緒池的結構和工作方式;第二篇文章,我們將繼續深入,講解JAVA原生執行緒池的高階特性,包括Thread工廠、佇列、拒絕原則、鉤子和相關工具類。
如果您是JAVA語言的初學者,請從本篇文章看起;如果您對執行緒池技術已有一定的瞭解,那麼可以只看下一篇文章;如果您是高手,請繞行;如果您對我的觀點有任何意見和建議,請留言,謝謝。^-^
2、為什麼要使用執行緒池
前文我們已經講到,執行緒是一個作業系統概念。作業系統負責這個執行緒的建立、掛起、執行、阻塞和終結操作。而作業系統建立執行緒、切換執行緒狀態、終結執行緒都要進行CPU排程——這是一個耗費時間和系統資源的事情(《作業系統知識回顧—程序執行緒模型》)
另一方面,目前大多數生產環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨建立一個執行緒,那麼物理機的所有資源基本上都被作業系統建立執行緒、切換執行緒狀態、銷燬執行緒這些操作所佔用,用於業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的執行緒數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理本身
另外,一些作業系統是有最大執行緒數量限制的。當執行的執行緒數量逼近這個值的時候,作業系統會變得不穩定。這也是我們要限制執行緒數量的原因。
3、執行緒池的基本使用方式
JAVA語言為我們提供了兩種基礎執行緒池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService介面(注意,ExecutorService介面本身和“執行緒池”並沒有直接關係,它的定義更接近“執行器”,而“使用執行緒管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。
3-1、簡單使用
首先我們來看看ThreadPoolExecutor類的最簡單使用方式:
package test.thread.pool;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class PoolThreadSimple {
static {
BasicConfigurator.configure();
}
public static void main(String[] args) throws Throwable {
/*
* corePoolSize:核心大小,執行緒池初始化的時候,就會有這麼大
* maximumPoolSize:執行緒池最大執行緒數
* keepAliveTime:如果當前執行緒池中執行緒數大於corePoolSize。
* 多餘的執行緒,在等待keepAliveTime時間後如果還沒有新的執行緒任務指派給它,它就會被回收
*
* unit:等待時間keepAliveTime的單位
*
* workQueue:等待佇列。這個物件的設定是本文將重點介紹的內容
* */
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>());
for(int index = 0 ; index < 10 ; index ++) {
poolExecutor.submit(new PoolThreadSimple.TestRunnable(index));
}
// 沒有特殊含義,只是為了保證main執行緒不會退出
synchronized (poolExecutor) {
poolExecutor.wait();
}
}
/**
* 這個就是測試用的執行緒
* @author yinwenjie
*/
private static class TestRunnable implements Runnable {
/**
* 日誌
*/
private static Log LOGGER = LogFactory.getLog(TestRunnable.class);
/**
* 記錄任務的唯一編號,這樣在日誌中好做識別
*/
private Integer index;
public TestRunnable(int index) {
this.index = index;
}
/**
* @return the index
*/
public Integer getIndex() {
return index;
}
@Override
public void run() {
/*
* 執行緒中,就只做一件事情:
* 等待60秒鐘的事件,以便模擬業務操作過程
* */
Thread currentThread = Thread.currentThread();
TestRunnable.LOGGER.info("執行緒:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行===");
synchronized (currentThread) {
try {
currentThread.wait(60000);
} catch (InterruptedException e) {
TestRunnable.LOGGER.error(e.getMessage(), e);
}
}
TestRunnable.LOGGER.info("執行緒:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成");
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
隨後的文章中我們將對執行緒池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler引數和一些常用/不常用的設定項進行逐一講解。
3-2、ThreadPoolExecutor邏輯結構和工作方式
在上面的程式碼中,我們建立執行緒池的時候使用了ThreadPoolExecutor中最簡單的一個建構函式:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
- 1
- 2
- 3
- 4
- 5
建構函式中需要傳入的引數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些引數(和後續將要介紹的引數)的含義,就首先要搞清楚ThreadPoolExecutor執行緒池的邏輯結構。
一定要注意一個概念,即存在於執行緒池中容器的一定是Thread物件,而不是您要求執行的任務(所以叫執行緒池而不叫任務池也不叫物件池,更不叫游泳池);您要求執行的任務將被執行緒池分配給某一個空閒的Thread執行。
從上圖中,我們可以看到構成執行緒池的幾個重要元素:
-
等待佇列:顧名思義,就是您呼叫執行緒池物件的submit()方法或者execute()方法,要求執行緒池執行的任務(這些任務必須實現Runnable介面或者Callable介面)。但是出於某些原因執行緒池並沒有馬上執行這些任務,而是送入一個佇列等待執行(這些原因後文馬上講解)。
-
核心執行緒:執行緒池主要用於執行任務的是“核心執行緒”,“核心執行緒”的數量是您建立執行緒時所設定的corePoolSize引數決定的。如果不進行特別的設定,執行緒池中始終會保持corePoolSize數量的執行緒數(不包括建立階段)。
-
非核心執行緒:一旦任務數量過多(由等待佇列的特性決定),執行緒池將建立“非核心執行緒”臨時幫助執行任務。您設定的大於corePoolSize引數小於maximumPoolSize引數的部分,就是執行緒池可以臨時建立的“非核心執行緒”的最大數量。這種情況下如果某個執行緒沒有執行任何任務,在等待keepAliveTime時間後,這個執行緒將會被銷燬,直到執行緒池的執行緒數量重新達到corePoolSize。
-
要重點理解上一條描述中黑體字部分的內容。也就是說,並不是所謂的“非核心執行緒”才會被回收;而是誰的空閒時間達到keepAliveTime這個閥值,就會被回收。直到執行緒池中執行緒數量等於corePoolSize為止。
-
maximumPoolSize引數也是當前執行緒池允許建立的最大執行緒數量。那麼如果您設定的corePoolSize引數和您設定的maximumPoolSize引數一致時,執行緒池在任何情況下都不會回收空閒執行緒。keepAliveTime和timeUnit也就失去了意義。
-
keepAliveTime引數和timeUnit引數也是配合使用的。keepAliveTime引數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閒執行緒的回收閥值為1分鐘。
說完了執行緒池的邏輯結構,下面我們討論一下執行緒池是怎樣處理某一個執行任務的。下圖描述了一個完整的任務處理過程:
1、首先您可以通過執行緒池提供的submit()方法或者execute()方法,要求執行緒池執行某個任務。執行緒池收到這個要求執行的任務後,會有幾種處理情況:
1.1、如果當前執行緒池中執行的執行緒數量還沒有達到corePoolSize大小時,執行緒池會建立一個新的執行緒執行您的任務,無論之前已經建立的執行緒是否處於空閒狀態。
1.2、如果當前執行緒池中執行的執行緒數量已經達到設定的corePoolSize大小,執行緒池會把您的這個任務加入到等待佇列中。直到某一個的執行緒空閒了,執行緒池會根據您設定的等待佇列規則,從佇列中取出一個新的任務執行。
1.3、如果根據佇列規則,這個任務無法加入等待佇列。這時執行緒池就會建立一個“非核心執行緒”直接執行這個任務。注意,如果這種情況下任務執行成功,那麼當前執行緒池中的執行緒數量一定大於corePoolSize。
1.4、如果這個任務,無法被“核心執行緒”直接執行,又無法加入等待佇列,又無法建立“非核心執行緒”直接執行,且您沒有為執行緒池設定RejectedExecutionHandler。這時執行緒池會丟擲RejectedExecutionException異常,即執行緒池拒絕接受這個任務。(實際上丟擲RejectedExecutionException異常的操作,是ThreadPoolExecutor執行緒池中一個預設的RejectedExecutionHandler實現:AbortPolicy,這在後文會提到)
2、一旦執行緒池中某個執行緒完成了任務的執行,它就會試圖到任務等待佇列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue介面,按照介面字面上的理解,這是一個可阻塞的佇列介面),它會呼叫等待佇列的poll()方法,並停留在哪裡。
3、當執行緒池中的執行緒超過您設定的corePoolSize引數,說明當前執行緒池中有所謂的“非核心執行緒”。那麼當某個執行緒處理完任務後,如果等待keepAliveTime時間後仍然沒有新的任務分配給它,那麼這個執行緒將會被回收。執行緒池回收執行緒時,對所謂的“核心執行緒”和“非核心執行緒”是一視同仁的,直到執行緒池中執行緒的數量等於您設定的corePoolSize引數時,回收過程才會停止。
3-3、不常用的設定
在ThreadPoolExecutor執行緒池中,有一些不常用的設定。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設定:
3-3-1、 allowCoreThreadTimeOut:
前文我們討論到,執行緒池回收執行緒只會發生在當前執行緒池中執行緒數量大於corePoolSize引數的時候;當執行緒池中執行緒數量小於等於corePoolSize引數的時候,回收過程就會停止。
allowCoreThreadTimeOut設定項可以要求執行緒池:將包括“核心執行緒”在內的,沒有任務分配的任何執行緒,在等待keepAliveTime時間後全部進行回收:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
poolExecutor.allowCoreThreadTimeOut(true);
- 1
- 2
- 3
以下是設定前的效果:
以下是設定後的效果:
3-3-2 prestartAllCoreThreads
前文我們還討論到,當執行緒池中的執行緒還沒有達到您設定的corePoolSize引數值的時候,如果有新的任務到來,執行緒池將建立新的執行緒執行這個任務,無論之前已經建立的執行緒是否處於空閒狀態。這個描述可以用下面的示意圖表示出來:
prestartAllCoreThreads設定項,可以線上程池建立,但還沒有接收到任何任務的情況下,先行建立符合corePoolSize引數值的執行緒數:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
poolExecutor.prestartAllCoreThreads();
- 1
- 2
- 3
================================
(後文預告:執行緒基礎:執行緒池(6)——高階特性(下))
ThreadPoolExecutor類結構體系
使用ThreadFactory
執行緒池任務佇列(重點講解)
拒絕任務
擴充套件ThreadPoolExecutor執行緒池
Hook methods
Queue maintenance
工具類和後記
Executors
Apache中的擴充套件
與spring結合