阻塞隊列和線程池
一、阻塞隊列
1.介紹
阻塞隊列會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素後,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。
2.實現
ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
二、線程池
1.介紹
使用線程池的好處有:1.創建/銷毀線程伴隨著系統開銷,過於頻繁的創建/銷毀線程,會很大程度上影響處理效率 2.線程並發數量過多,搶占系統資源從而導致阻塞 3.線程池可以對線程進行一些簡單的管理
ThreadPoolExecutor類是線程池中最核心的一個類, 在ThreadPoolExecutor類中提供了四個構造方法,其中 三個構造器都是調用的第四個構造器進行的初始化工作。
2.構造方法為:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
* corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有非常大的關系。在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
* maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
* keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
* unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
|
* workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裏的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
|
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
* threadFactory:線程工廠,主要用來創建線程;
* handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
|
3.線程池的執行策略
* 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
* 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
* 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
* 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麽核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
4..測試
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5 , 10 , 200 , TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>( 5 ));
for ( int i= 0 ;i< 15 ;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println( "線程池中線程數目:" +executor.getPoolSize()+ ",隊列中等待執行的任務數目:" +
executor.getQueue().size()+ ",已執行玩別的任務數目:" +executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask( int num) {
this .taskNum = num;
}
@Override
public void run() {
System.out.println( "正在執行task " +taskNum);
try {
Thread.currentThread().sleep( 4000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "task " +taskNum+ "執行完畢" );
}
}
|
這個程序中當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了之後,便創建新的線程。
5.使用Executors類中提供的幾個靜態方法來創建線程池
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池
Executors.newFixedThreadPool( int ); //創建固定容量大小的緩沖池(定長)
|
newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;1. 有且僅有一個工作線程執行任務2. 所有任務按照指定順序執行,即遵循隊列的入隊出隊規則
newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
三、Callable
創建線程在上一篇wiki中說明了兩種方法,分別是繼承Thread,重寫run方法和實現Runnable接口,重新run方法。現在介紹第三種實現Callable接口,重寫call方法。
區別:
-
Callable可以在任務結束的時候提供一個返回值Future對象,Runnable無法提供這個功能
-
Callable的call方法分可以拋出異常,而Runnable的run方法不能拋出異常。
-
Callable規定的方法是call(),而Runnable規定的方法是run().
測試:
/*
* FileName: TestCallable
* Author: aiguo.sun
* Date: 2019/3/28 20:06
* Description: 測試callable方法
*/
package JavaTest;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* 一、創建執行線程的方式三:實現 Callable 接口。 相較於實現 Runnable 接口的方式,方法可以有返回值,並且可以拋出異常。
*
* 二、執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。 FutureTask 是 Future 接口的實現類
*/
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//1.執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
//2.接收線程運算後的結果
try {
//判斷是否完成
if (!result.isDone())
{
System.out.println( "-sorry-----------------------" );;
}
//FutureTask 可用於 閉鎖 類似於CountDownLatch的作用,在所有的線程沒有執行完成之後這裏是不會執行的
Integer sum = result.get();
System.out.println(sum);
System.out.println( "------------------------------------" );
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0 ;
for ( int i = 0 ; i <= 100000000 ; i++) {
sum += i;
}
return sum;
}
}
|
阻塞隊列和線程池