Java併發程式設計系列之二執行緒基礎
上篇文章對併發的理論基礎進行了回顧,主要是為什麼使用多執行緒、多執行緒會引發什麼問題及引發的原因,和怎麼使用Java中的多執行緒去解決這些問題。
正所謂,知其然知其所以然,這是學習一個知識遵循的原則。
推薦讀者先行檢視併發程式設計的理論知識,以便可以絲滑入戲。
本篇文章重點在於Java中怎麼去使用多執行緒,和多執行緒的一些相關概念和操作,及怎麼優化多執行緒。
在Java中每個物件都有其生命週期,執行緒同樣不例外,也有其生命週期。
一、執行緒生命週期
執行緒的幾種狀態轉換
1、新建(New)
新建立了一個執行緒物件,但還沒有呼叫start()方法。
2、就緒
當執行緒物件呼叫了start()方法後,該執行緒就進入就緒狀態。處於就緒狀態的執行緒位於執行緒佇列中,此時它只是具備了執行的條件,能否獲得CPU的使用權並開始執行,還需要等待系統的排程。
3、執行(Runnable)
如果處於就緒狀態的執行緒獲得了CPU的使用權,並開始執行run()方法中的執行緒執行體,則該執行緒處於執行狀態。
一個執行緒啟動後,它可能不會一直處於執行狀態,當執行狀態的執行緒使用完系統分配的時間後,系統就會剝奪該執行緒佔用的CPU資源,讓其他執行緒獲得執行的機會。需要注意的是,
只有處於就緒狀態的執行緒才可能轉換到執行狀態。
4、阻塞(Blocking)
等待獲取一個排它鎖,如果其執行緒釋放了鎖就會結束此狀態。
①無限期等待(Waiting)
等待其它執行緒顯式地喚醒,否則不會被分配 CPU 時間片。
進入方法 | 退出方法 |
---|---|
沒有設定 Timeout 引數的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
沒有設定 Timeout 引數的 Thread.join() 方法 | 被呼叫的執行緒執行完畢 |
LockSupport.park() 方法 | - |
②限期等待(Timed Waiting)
無需等待其它執行緒顯式地喚醒,在一定時間之後會被系統自動喚醒。
呼叫 Thread.sleep() 方法使執行緒進入限期等待狀態時,常常用“使一個執行緒睡眠”進行描述。
呼叫 Object.wait() 方法使執行緒進入限期等待或者無限期等待時,常常用“掛起一個執行緒”進行描述。
睡眠和掛起是用來描述行為,而阻塞和等待用來描述狀態。
阻塞和等待的區別在於,阻塞是被動的,它是在等待獲取一個排它鎖。而等待是主動的,通過呼叫 Thread.sleep() 和 Object.wait() 等方法進入。
進入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 時間結束 |
設定了 Timeout 引數的 Object.wait() 方法 | 時間結束 / Object.notify() / Object.notifyAll() |
設定了 Timeout 引數的 Thread.join() 方法 | 時間結束 / 被呼叫的執行緒執行完畢 |
LockSupport.parkNanos() 方法 | - |
LockSupport.parkUntil() 方法 | - |
5、死亡(Terminated)
如果執行緒呼叫stop()方法或nun()方法正常執行完畢,或者執行緒丟擲一個未捕獲的異常(Exception)錯誤(Error),執行緒就進入死亡狀態。一旦進入死亡狀態,執行緒將不再擁有執行的資格,也不能再轉換到其他狀態。
理解執行緒的五種狀態,在呼叫多執行緒的方法時,能清楚的知道當前處於哪個狀態。
我們舉一個簡單的例項來說明每個狀態。
public class MyThread extends Thread {
//執行狀態
public void run() {
// ...
}
public static void main(String[] args) {
MyThread mt = new MyThread(); //1、新建狀態
mt.start(); //就緒狀態
}
}
線上程控制章節有一些方法,如sleep()\join()方法,這些方法會讓執行緒處於阻塞狀態。
瞭解了執行緒的生成周期以後,接下來我們就需要掌握在Java中怎麼使用多執行緒。
在Java中有三種方式實現多執行緒。
二、建立執行緒的三種方式
有三種使用執行緒的方法:
- 實現 Runnable 介面;
- 實現 Callable 介面;
- 繼承 Thread 類。
實現 Runnable 和 Callable 介面的類只能當做一個可以線上程中執行的任務,不是真正意義上的執行緒,因此最後還需要通過 Thread 來呼叫。可以說任務是通過執行緒驅動從而執行的。
1、實現 Runnable 介面
需要實現 run() 方法。
通過 Thread 呼叫 start() 方法來啟動執行緒。
public class MyRunnable implements Runnable {
public void run() {
// 需要執行多執行緒的業務邏輯
}
}
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
thread.start();
}
2、 實現 Callable 介面
與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進行封裝。
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
3、繼承 Thread 類
同樣也是需要實現 run() 方法,因為 Thread 類也實現了 Runable 介面。
當呼叫 start() 方法啟動一個執行緒時,虛擬機器會將該執行緒放入就緒佇列中等待被排程,當一個執行緒被排程時會執行該執行緒的 run() 方法。
public class MyThread extends Thread {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
4、實現介面 VS 繼承 Thread
實現介面會更好一些,因為:
- Java 不支援多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實現多個介面;
- 類可能只要求可執行就行,繼承整個 Thread 類開銷過大。
三、執行緒控制
執行緒在使用過程中能對其靈活的控制,包含執行緒睡眠和執行緒讓步等。
在學習執行緒的一些控制方法前,有一個必須要了解的前置知識,線上程中分為守護程序和非守護程序。
1、Daemon
守護執行緒是程式執行時在後臺提供服務的執行緒,不屬於程式中不可或缺的部分。
當所有非守護執行緒結束時,程式也就終止,同時會殺死所有守護執行緒。
垃圾回收執行緒就是一個經典的守護執行緒,當我們的程式中不再有任何執行的Thread,程式就不會再產生垃圾,垃圾回收器也就無事可做,所以當垃圾回收執行緒是JVM上僅剩的執行緒時,垃圾回收執行緒會自動離開。它始終在低級別的狀態中執行,用於實時監控和管理系統中的可回收資源。
main() 屬於非守護執行緒。
非守護執行緒可以轉換為守護程序。
使用 setDaemon() 方法將一個執行緒設定為守護執行緒。
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}
2、sleep()
Thread.sleep(millisec) 方法會休眠當前正在執行的執行緒,millisec 單位為毫秒。
sleep() 可能會丟擲 InterruptedException,因為異常不能跨執行緒傳播回 main() 中,因此必須在本地進行處理。執行緒中丟擲的其它異常也同樣需要在本地進行處理。
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3、yield()
對靜態方法 Thread.yield() 的呼叫聲明瞭當前執行緒已經完成了生命週期中最重要的部分,可以切換給其它執行緒來執行。該方法只是對執行緒排程器的一個建議,而且也只是建議具有相同優先順序的其它執行緒可以執行。
public void run() {
Thread.yield();
}
4、join()
一旦這個執行緒執行了這個方法,只有這個執行緒處於死亡狀態其他執行緒才能執行。
public class MyThread extends Thread {
11
12 public MyThread() {
13 }
14
15 public MyThread(String name) {
16 super(name);
17 }
18
19 @Override
20 public void run() {
21 for (int i = 0; i < 10; i++) {
22 System.out.println(getName() + ":" + i);
23 }
24 }
25
26 public static void main(String[] args) {
27 // 1.建立MyThread類的物件
28 MyThread myThread1 = new MyThread("執行緒1");
29 MyThread myThread2 = new MyThread("執行緒2");
30 MyThread myThread3 = new MyThread("執行緒3");
31
32 // 2.啟動執行緒
33 myThread1.start();
34 try {
35 // 等待myThread1執行緒死亡,只有當該執行緒死亡之後才能繼續執行其它執行緒
36 myThread1.join();
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 }
40 myThread2.start();
41 myThread3.start();
42
43 }
44 }
5、wait()\notify()
wait\notify\notifyAll操作都是屬於Object類提供的方法,即所有的物件都具有該方法,他們是的一對的,呼叫的時候不能分開呦。
wait():呼叫wait方法的執行緒,當前持有鎖的該執行緒等待,直至該物件的另一個持鎖執行緒呼叫notify/notifyAll操作。
wait(long timeOut)、wait(long timeOut,int nanos)
執行緒狀態轉換是,當wait被喚醒或超時,並不是直接進入到執行或者就緒狀態,而是先進入到Block狀態,搶鎖成功後,才能進入到可執行狀態。
wait方法在呼叫進入阻塞之前會釋放鎖,而sleep或join是不會釋放鎖的
notify():通知持有該物件鎖的所有執行緒中的的隨意一個執行緒被喚醒
notifyAll():通知持有該物件鎖的所有執行緒被同時喚醒
我們形象的做一個比喻:
如果把多執行緒比喻成一個運動員,跑道就是CPU每次只能允許一個運動員進入跑道,運動員的後勤保障就是守護程序,通過setDaemon()方法,運動員就轉業為了後勤人員。
執行sleep()就是提前設定一個時間,讓運動員休息會。wait()方法是運動員無限期的睡著,直到教練殺出來一腳踹醒(執行notify方法)運動員才會喚醒。
yield()會把跑道讓給別的運動員。
join()方法會讓運動員擁有最高的跑道許可權,我不跑完,誰都不能進來。
四、執行緒同步
Java允許併發控制,當多個執行緒同時操作一個可共享的資源變數時(如資料的增刪改查), 將會導致資料不準確,相互之間產生衝突,因此加入同步鎖以避免在該執行緒沒有完成操作之前,被其他執行緒的呼叫, 從而保證了該變數的唯一性和準確性。
Java 提供了兩種鎖機制來控制多個執行緒對共享資源的互斥訪問,第一個是 JVM 實現的 synchronized,而另一個是 JDK 實現的 ReentrantLock。
1、synchronized
①. 同步一個程式碼塊
public void func() {
synchronized (this) {
// ...
}
}
它只作用於同一個物件,如果呼叫兩個物件上的同步程式碼塊,就不會進行同步。
對於以下程式碼,使用 ExecutorService 執行了兩個執行緒,由於呼叫的是同一個物件的同步程式碼塊,因此這兩個執行緒會進行同步,當一個執行緒進入同步語句塊時,另一個執行緒就必須等待。
public class SynchronizedExample {
public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
對於以下程式碼,兩個執行緒呼叫了不同物件的同步程式碼塊,因此這兩個執行緒就不需要同步。從輸出結果可以看出,兩個執行緒交叉執行。
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
②. 同步一個方法
public synchronized void func () {
// ...
}
它和同步程式碼塊一樣,作用於同一個物件。
③. 同步一個類
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}
作用於整個類,也就是說兩個執行緒呼叫同一個類的不同物件上的這種同步語句,也會進行同步。
public class SynchronizedExample {
public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
④. 同步一個靜態方法
public synchronized static void fun() {
// ...
}
作用於整個類。
2、ReentrantLock
ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 確保釋放鎖,從而避免發生死鎖。
}
}
}
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
3、比較
①. 鎖的實現**
synchronized 是 JVM 實現的,而 ReentrantLock 是 JDK 實現的。
②. 效能
新版本 Java 對 synchronized 進行了很多優化,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同。
③. 等待可中斷
當持有鎖的執行緒長期不釋放鎖的時候,正在等待的執行緒可以選擇放棄等待,改為處理其他事情。
ReentrantLock 可中斷,而 synchronized 不行。
④. 公平鎖
公平鎖是指多個執行緒在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖。
synchronized 中的鎖是非公平的,ReentrantLock 預設情況下也是非公平的,但是也可以是公平的。
⑤. 鎖繫結多個條件
一個 ReentrantLock 可以同時繫結多個 Condition 物件。
4、使用選擇
除非需要使用 ReentrantLock 的高階功能,否則優先使用 synchronized。
這是因為 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支援它,而 ReentrantLock 不是所有的 JDK 版本都支援。
並且使用 synchronized 不用擔心沒有釋放鎖而導致死鎖問題,因為 JVM 會確保鎖的釋放。
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
執行緒池就應用而生。
五、執行緒池
執行緒池圍繞著一個核心的類 java.uitl.concurrent.ThreadPoolExecutor,我們將它作為一個切入點揭開執行緒池的面紗。
1、核心執行緒類
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。
在ThreadPoolExecutor類中有四個構造方法。
其中三個最終都是呼叫了下面這個構造方法,限於篇幅就不在貼其他三個原始碼了,讀者可以進行求證。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面解釋下一下構造器中各個引數的含義:
-
corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
-
maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
-
keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
-
unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
-
workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,觀察傳入的workQueue 都是預設,即最大可新增Integer.MAX_VALUE個任務,所有在使用過程中要避免使用預設執行緒池。這裡的阻塞佇列有以下幾種選擇:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。
-
threadFactory:執行緒工廠,主要用來建立執行緒;
-
handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
以上對構造的七個引數進行了介紹,那麼這些引數是怎麼起作用的呢,我們接著看執行緒池的執行流程。
2、執行緒執行流程
- 當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
- 當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行
- 當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務
- 當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
- 當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,釋放空閒執行緒
- 當設定allowCoreThreadTimeOut(true)時,該引數預設false,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉
3、四種執行緒池及使用場景
Java通過Executors提供四種執行緒池,分別為
- newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
- newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
- newScheduledThreadPool 建立一個可定期或者延時執行任務的定長執行緒池,支援定時及週期性任務執行。
- newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newCachedThreadPool:
-
底層:返回ThreadPoolExecutor例項,corePoolSize為0;maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為60L;時間單位TimeUnit.SECONDS;workQueue為SynchronousQueue(同步佇列)
-
通俗:當有新任務到來,則插入到SynchronousQueue中,由於SynchronousQueue是同步佇列,因此會在池中尋找可用執行緒來執行,若有可以執行緒則執行,若沒有可用執行緒則建立一個執行緒來執行該任務;若池中執行緒空閒時間超過指定時間,則該執行緒會被銷燬。
-
適用:執行很多短期的非同步任務
/** * 1.建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒<br> * 2.當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務<br> * 3.此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小<br> */ public static void cacheThreadPool() { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 1; i <= 10; i++) { final int ii = i; try { Thread.sleep(ii * 1); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(()->out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行" + ii)); } } -----output------ 執行緒名稱:pool-1-thread-1,執行1 執行緒名稱:pool-1-thread-1,執行2 執行緒名稱:pool-1-thread-1,執行3 執行緒名稱:pool-1-thread-1,執行4 執行緒名稱:pool-1-thread-1,執行5 執行緒名稱:pool-1-thread-1,執行6 執行緒名稱:pool-1-thread-1,執行7 執行緒名稱:pool-1-thread-1,執行8 執行緒名稱:pool-1-thread-1,執行9 執行緒名稱:pool-1-thread-1,執行10
newFixedThreadPool:
-
底層:返回ThreadPoolExecutor例項,接收引數為所設定執行緒數量n,corePoolSize和maximumPoolSize均為n;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;WorkQueue為:new LinkedBlockingQueue
() 無界阻塞佇列 -
通俗:建立可容納固定數量執行緒的池子,每個執行緒的存活時間是無限的,當池子滿了就不再新增執行緒了;如果池中的所有執行緒均在繁忙狀態,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
-
適用:執行長期任務
/** * 1.建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小<br> * 2.執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒<br> * 3.因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字,和執行緒名稱<br> */ public static void fixTheadPoolTest() { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int ii = i; fixedThreadPool.execute(() -> { out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行" + ii); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } ------output------- 執行緒名稱:pool-1-thread-3,執行2 執行緒名稱:pool-1-thread-1,執行0 執行緒名稱:pool-1-thread-2,執行3 執行緒名稱:pool-1-thread-3,執行4 執行緒名稱:pool-1-thread-1,執行5 執行緒名稱:pool-1-thread-2,執行6 執行緒名稱:pool-1-thread-3,執行7 執行緒名稱:pool-1-thread-1,執行8 執行緒名稱:pool-1-thread-3,執行9
newSingleThreadExecutor:
-
底層:FinalizableDelegatedExecutorService包裝的ThreadPoolExecutor例項,corePoolSize為1;maximumPoolSize為1;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;workQueue為:new LinkedBlockingQueue
() 無解阻塞佇列 -
通俗:建立只有一個執行緒的執行緒池,當該執行緒正繁忙時,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
-
適用:按順序執行任務的場景
/** *建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行 */ public static void singleTheadPoolTest() { ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int ii = i; pool.execute(() -> out.println(Thread.currentThread().getName() + "=>" + ii)); } } -----output-------
執行緒名稱:pool-1-thread-1,執行0
執行緒名稱:pool-1-thread-1,執行1
執行緒名稱:pool-1-thread-1,執行2
執行緒名稱:pool-1-thread-1,執行3
執行緒名稱:pool-1-thread-1,執行4
執行緒名稱:pool-1-thread-1,執行5
執行緒名稱:pool-1-thread-1,執行6
執行緒名稱:pool-1-thread-1,執行7
執行緒名稱:pool-1-thread-1,執行8
執行緒名稱:pool-1-thread-1,執行9
NewScheduledThreadPool:
-
底層:建立ScheduledThreadPoolExecutor例項,該物件繼承了ThreadPoolExecutor,corePoolSize為傳遞來的引數,maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為0;時間單位TimeUnit.NANOSECONDS;workQueue為:new DelayedWorkQueue() 一個按超時時間升序排序的佇列
-
通俗:建立一個固定大小的執行緒池,執行緒池內執行緒存活時間無限制,執行緒池可以支援定時及週期性任務執行,如果所有執行緒均處於繁忙狀態,對於新任務會進入DelayedWorkQueue佇列中,這是一種按照超時時間排序的佇列結構
-
適用:執行週期性任務
/** * 建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行 */ public static void sceduleThreadPool() { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); Runnable r1 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:3秒後執行"); scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS); Runnable r2 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:延遲2秒後每3秒執行一次"); scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS); Runnable r3 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:普通任務"); for (int i = 0; i < 5; i++) { scheduledThreadPool.execute(r3); } } ----output------ 執行緒名稱:pool-1-thread-1,執行:普通任務 執行緒名稱:pool-1-thread-5,執行:普通任務 執行緒名稱:pool-1-thread-4,執行:普通任務 執行緒名稱:pool-1-thread-3,執行:普通任務 執行緒名稱:pool-1-thread-2,執行:普通任務 執行緒名稱:pool-1-thread-1,執行:延遲2秒後每3秒執行一次 執行緒名稱:pool-1-thread-5,執行:3秒後執行 執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次 執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次 執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次 執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次
5、使用例項
在ThreadPoolTaskExecutor的原理章節中,有一系列的方法,如果我們手動呼叫這些執行緒池方法實現方法是極其複雜的。
①、在java中的使用
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+
executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在執行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"執行完畢");
}
}
從執行結果可以看出,當執行緒池中執行緒的數目大於5時,便將任務放入任務快取佇列裡面,當任務快取佇列滿了之後,便建立新的執行緒。如果上面程式中,將for迴圈中改成執行20個任務,就會丟擲任務拒絕異常了。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立執行緒池:
Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //建立容量為1的緩衝池
Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
從它們的具體實現來看,它們實際上也是呼叫了ThreadPoolExecutor,只不過引數都已配置好了。
newFixedThreadPool建立的執行緒池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設定為0,將maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立執行緒執行,當執行緒空閒超過60秒,就銷燬執行緒。
實際中,如果Executors提供的三個靜態方法能滿足要求,就儘量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的引數有點麻煩,要根據實際任務的型別和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
②、在Spring中使用
以下為Java執行緒池在Spring中的使用,ThreadPoolTaskExecutor一個物件注入到Spring的容器中。
/**
* 執行緒池配置
*
* @author tcy
**/
@Configuration
public class ThreadPoolConfig {
// 核心執行緒池大小
private final int corePoolSize = 50;
// 最大可建立的執行緒數
private final int maxPoolSize = 200;
// 佇列最大長度
private final int queueCapacity = 1000;
// 執行緒池維護執行緒所允許的空閒時間
private final int keepAliveSeconds = 300;
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
// 執行緒池對拒絕任務(無執行緒可用)的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
在方法或者類上加 @Async註解,標明該方法或類為多執行緒方法,Spirng內部會自動呼叫多執行緒的拒絕策略、執行緒初始化等方法。