1. 程式人生 > >阻塞隊列和線程池

阻塞隊列和線程池

auth sys tle repo click pro factory nano put

一、阻塞隊列

1.介紹
阻塞隊列會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素後,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。

2.實現
ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。

PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。註意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標誌),前面2種都是有界隊列。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

二、線程池

1.介紹
使用線程池的好處有:1.創建/銷毀線程伴隨著系統開銷,過於頻繁的創建/銷毀線程,會很大程度上影響處理效率 2.線程並發數量過多,搶占系統資源從而導致阻塞 3.線程池可以對線程進行一些簡單的管理
ThreadPoolExecutor類是線程池中最核心的一個類, 在ThreadPoolExecutor類中提供了四個構造方法,其中 三個構造器都是調用的第四個構造器進行的初始化工作。

2.構造方法為:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

七個參數的含義:
* corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有非常大的關系。在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;

* maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;

* keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;

* unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒


* workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裏的阻塞隊列有以下幾種選擇:

ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;


ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

* threadFactory:線程工廠,主要用來創建線程;

* handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

3.線程池的執行策略
* 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
* 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
* 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
* 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麽核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

4..測試

public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在執行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"執行完畢"); } }


這個程序中當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了之後,便創建新的線程。

5.使用Executors類中提供的幾個靜態方法來創建線程池

Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池(定長)


  newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;1. 有且僅有一個工作線程執行任務2. 所有任務按照指定順序執行,即遵循隊列的入隊出隊規則
  newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
  實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

三、Callable

創建線程在上一篇wiki中說明了兩種方法,分別是繼承Thread,重寫run方法和實現Runnable接口,重新run方法。現在介紹第三種實現Callable接口,重寫call方法。
區別:

  • Callable可以在任務結束的時候提供一個返回值Future對象,Runnable無法提供這個功能

  • Callable的call方法分可以拋出異常,而Runnable的run方法不能拋出異常。

  • Callable規定的方法是call(),而Runnable規定的方法是run().


測試:

/* * FileName: TestCallable * Author: aiguo.sun * Date: 2019/3/28 20:06 * Description: 測試callable方法 */ package JavaTest; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * 一、創建執行線程的方式三:實現 Callable 接口。 相較於實現 Runnable 接口的方式,方法可以有返回值,並且可以拋出異常。 * * 二、執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。 FutureTask 是 Future 接口的實現類 */ public class TestCallable { public static void main(String[] args) { ThreadDemo td = new ThreadDemo(); //1.執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。 FutureTask<Integer> result = new FutureTask<>(td); new Thread(result).start(); //2.接收線程運算後的結果 try { //判斷是否完成 if(!result.isDone()) { System.out.println("-sorry-----------------------");; } //FutureTask 可用於 閉鎖 類似於CountDownLatch的作用,在所有的線程沒有執行完成之後這裏是不會執行的 Integer sum = result.get(); System.out.println(sum); System.out.println("------------------------------------"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class ThreadDemo implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 100000000; i++) { sum += i; } return sum; } }

阻塞隊列和線程池