1. 程式人生 > >java 執行緒池 使用例項 03 建立執行緒的第3式 02 如何建立執行緒 執行緒併發與synchornized

java 執行緒池 使用例項 03 建立執行緒的第3式 02 如何建立執行緒 執行緒併發與synchornized

在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:

如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。

那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?

在Java中可以通過執行緒池來達到這樣的效果。

 

1 執行緒池做什麼

 

網路請求通常有兩種形式:

第一種,請求不是很頻繁,而且每次連線後會保持相當一段時間來讀資料或者寫資料,最後斷開,如檔案下載,網路流媒體等。

另一種形式是請求頻繁,但是連線上以後讀/寫很少量的資料就斷開連線。考慮到服務的併發問題,如果每個請求來到以後服務都為它啟動一個執行緒,那麼這對服務的資源可能會造成很大的浪費,特別是第二種情況。

因為通常情況下,建立執行緒是需要一定的耗時的,設這個時間為T1,而連線後讀/寫服務的時間為T2,當T1>>T2時,我們就應當考慮一種策略或者機制來控制,使得服務對於第二種請求方式也能在較低的功耗下完成。

通常,我們可以用執行緒池來解決這個問題,首先,在服務啟動的時候,我們可以啟動好幾個執行緒,並用一個容器(如執行緒池)來管理這些執行緒。

當請求到來時,可以從池中取一個執行緒出來,執行任務(通常是對請求的響應),當任務結束後,再將這個執行緒放入池中備用;

如果請求到來而池中沒有空閒的執行緒,該請求需要排隊等候。最後,當服務關閉時銷燬該池即可。

 

多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。
假設一個伺服器完成一項任務所需時間為:T1 建立執行緒時間,T2 線上程中執行任務的時間,T3 銷燬執行緒時間。
如果:T1 + T3 遠大於 T2,則可以採用執行緒池,以提高伺服器性

執行緒池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高伺服器程式效能的

它把T1,T3分別安排在伺服器程式的啟動和結束的時間段或者一些空閒的時間段,這樣在伺服器程式處理客戶請求時,不會有T1,T3的開銷了。

執行緒池不僅調整T1,T3產生的時間段,而且它還顯著減少了建立執行緒的數目,看一個例子:

假設一個伺服器一天要處理50000個請求,並且每個請求需要一個單獨的執行緒完成。線上程池中,執行緒數一般是固定的,所以產生執行緒總數不會超過執行緒池中執行緒的數目,

而如果伺服器不利用執行緒池來處理這些請求則執行緒總數為50000。一般執行緒池大小是遠小於50000。

所以利用執行緒池的伺服器程式不會為了建立50000而在處理請求時浪費時間,從而提高效率。

 

合理利用執行緒池能夠帶來三個好處:

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

        但是要做到合理的利用執行緒池,必須對其原理了如指掌。

 

2 執行緒池的繼承架構

程式啟動一個新執行緒成本是比較高的,因為它涉及到要與作業系統進行互動。而使用執行緒池可以很好的提高效能,尤其是當程式中要建立大量生存期很短的執行緒時,更應該考慮使用執行緒池。

執行緒池裡的每一個執行緒程式碼結束後,並不會死亡,而是再次回到執行緒池中成為空閒狀態,等待下一個物件來使用。

在JDK5之前,我們必須手動實現自己的執行緒池,從JDK5開始,Java內建支援執行緒池

 

Java裡面執行緒池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。

真正的執行緒池介面是ExecutorService。下面這張圖完整描述了執行緒池的類體系結構。

Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;

 

然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;

然後ThreadPoolExecutor繼承了類AbstractExecutorService。

 

標記一下比較重要的類:

ExecutorService:

真正的執行緒池介面。

ScheduledExecutorService

能和Timer/TimerTask類似,解決那些需要任務重複執行的問題。

ThreadPoolExecutor

ExecutorService的預設實現。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現。

 

要配置一個執行緒池是比較複雜的,尤其是對於執行緒池的原理不是很清楚的情況下,很有可能配置的執行緒池不是較優的,

因此在Executors類裡面提供了一些靜態工廠,生成一些常用的執行緒池。

 

newSingleThreadExecutor:建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。

                                           如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

newFixedThreadPool:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。

                                   執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

newCachedThreadPool:建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,

                                       當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。

                                       此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

newScheduledThreadPool:建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

newSingleThreadExecutor:建立一個單執行緒的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

 

這些方法的返回值是ExecutorService物件,該物件表示一個執行緒池,可以執行Runnable物件或者Callable物件代表的執行緒。

它提供瞭如下方法來提交一個任務:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

 Callable 與 Runable的相關內容參見:

03 建立執行緒的第3式 
02 如何建立執行緒 執行緒併發與synchornized
 
3 使用執行緒池步驟及案例

執行緒池的好處:執行緒池裡的每一個執行緒程式碼結束後,並不會死亡,而是再次回到執行緒池中成為空閒狀態,等待下一個物件來使用。

如何實現執行緒的程式碼呢?

A:建立一個執行緒池物件,控制要建立幾個執行緒物件。

    public static ExecutorService newFixedThreadPool(int nThreads)

B:這種執行緒池的執行緒可以執行:

  可以執行Runnable物件或者Callable物件代表的執行緒

  做一個類實現Runnable介面。

C:呼叫如下方法即可

    Future<?> submit(Runnable task)

    <T> Future<T> submit(Callable<T> task)

D:我就要結束,可以嗎? 可以。

複製程式碼
 1 package com.jt.thread.demo05;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 class MyRunnable implements Runnable {
 7     @Override
 8     public void run() {
 9         for (int x = 0; x < 100; x++) {
10             System.out.println(Thread.currentThread().getName() + ":" + x);
11        }
12     }
13 }
14 
15 public class ExecutorServiceDemo {
16     public static void main(String[] args) {
17      // 建立一個執行緒池物件,控制要建立幾個執行緒物件。
18      // public static ExecutorService newFixedThreadPool(int nThreads)
19      ExecutorService pool = Executors.newFixedThreadPool(2);
20 
21      // 可以執行Runnable物件或者Callable物件代表的執行緒
22      pool.submit(new MyRunnable());
23      pool.submit(new MyRunnable());
24 
25     //結束執行緒池
26     pool.shutdown();
27    }
28 } 
複製程式碼

執行結果:

pool-1-thread-1:0

pool-1-thread-1:1

pool-1-thread-1:2

pool-1-thread-2:0

pool-1-thread-2:1

pool-1-thread-2:2

pool-1-thread-2:3

。。。

 

說明:

 

(1 newFixedThreadPool

是固定大小的執行緒池 有結果可見 我們指定2 在執行時就只有2個執行緒工作

若其中有一個執行緒異常  會有新的執行緒替代他

 

(2 shutdown方法有2個過載:

void shutdown() 啟動一次順序關閉,等待執行以前提交的任務完成,但不接受新任務。

List<Runnable> shutdownNow() 試圖立即停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

 

(3 submit 與 execute

3.1 submit是ExecutorService中的方法 用以提交一個任務

他的返回值是future物件  可以獲取執行結果

<T> Future<T> submit(Callable<T> task) 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。

Future<?> submit(Runnable task) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

<T> Future<T> submit(Runnable task, T result) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

 

3.2 execute是Executor介面的方法

他雖然也可以像submit那樣讓一個任務執行  但並不能有返回值

 

void execute(Runnable command)

在未來某個時間執行給定的命令。該命令可能在新的執行緒、已入池的執行緒或者正呼叫的執行緒中執行,這由 Executor 實現決定。

 

(4 Future

Future 表示非同步計算的結果。

它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

計算完成後只能使用 get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。

取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。

如果為了可取消性而使用 Future 但又不提供可用的結果,則可以宣告 Future<?> 形式型別、並返回 null 作為底層任務的結果。

 

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。

必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。 

也就是說Future提供了三種功能:

--判斷任務是否完成;

--能夠中斷任務;

--能夠獲取任務執行結果。

 

boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。

V get() 如有必要,等待計算完成,然後獲取其結果。

V get(long timeout, TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。

boolean isCancelled() 如果在任務正常完成前將其取消,則返回 true。

boolean isDone() 如果任務已完成,則返回 true。

 

4 執行緒池簡單使用案例2

java.util.concurrent.Executors類的API提供大量建立連線池的靜態方法:

 

複製程式碼
 1 import java.util.concurrent.Executors;
 2 import java.util.concurrent.ExecutorService;
 3 public class JavaThreadPool {
 4   public static void main(String[] args) {
 5     // 建立一個可重用固定執行緒數的執行緒池
 6     ExecutorService pool = Executors.newFixedThreadPool(2);
 7     // 建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面
 8     Thread t1 = new MyThread();
 9     Thread t2 = new MyThread();
10     Thread t3 = new MyThread();
11     Thread t4 = new MyThread();
12     Thread t5 = new MyThread();
13     // 將執行緒放入池中進行執行
14     pool.execute(t1);
15     pool.execute(t2);
16     pool.execute(t3);
17     pool.execute(t4);
18     pool.execute(t5);
19     // 關閉執行緒池
20     pool.shutdown();
21    }
22 }
23 class MyThread extends Thread {
24    @Override
25    public void run() {
26     System.out.println(Thread.currentThread().getName() + "正在執行… …");
27    }
28 }
複製程式碼

執行效果示例:

 

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-1正在執行… …

 

 

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-2正在執行… …

 

可見執行緒池中有2個執行緒在工作,可見 newFixedThreadPool 是固定大小的執行緒池

 

5 單任務執行緒池:

//建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。

ExecutorService pool = Executors.newSingleThreadExecutor(); 

案例:

複製程式碼
 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 public class SingleThreadPollDemo {
 5 
 6 public static void main(String[] args) {
 7    // 建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。
 8    ExecutorService pool = Executors.newSingleThreadExecutor();
 9 
10    Runnable task1 = new SingelTask();
11    Runnable task2 = new SingelTask();
12    Runnable task3 = new SingelTask();
13 
14    pool.execute(task3);
15    pool.execute(task2);
16    pool.execute(task1);
17 
18    // 等待已提交的任務全部結束 不再接受新的任務
19    pool.shutdown();
20   }
21 }
22 
23 class SingelTask implements Runnable{
24 
25 @Override
26 public void run() {
27   System.out.println(Thread.currentThread().getName() + "正在執行… …");
28   try {
29    Thread.sleep(3000);
30   } catch (InterruptedException e) {
31    e.printStackTrace();
32   }
33   System.out.println(Thread.currentThread().getName() + "執行完畢");
34  }
35 
36 }
複製程式碼

執行結果:

 

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

 

可見執行緒池中只有一個執行緒在執行任務

6 小結:

對於以上兩種連線池,大小都是固定的,當要加入的池的執行緒(或者任務)超過池最大尺寸時候,則入此執行緒池需要排隊等待。

一旦池中有執行緒完畢,則排隊等待的某個執行緒會入池執行。

 

其他執行緒池示例:

 

固定大小執行緒池

import java.util.concurrent.Executors; 

import java.util.concurrent.ExecutorService;

ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(t1);

pool.shutdown();

 

單任務執行緒池

ExecutorService pool = Executors.newSingleThreadExecutor();

 

可變尺寸執行緒池

ExecutorService pool = Executors.newCachedThreadPool();

 

延遲連線池

import java.util.concurrent.Executors; 

import java.util.concurrent.ScheduledExecutorService; 

import java.util.concurrent.TimeUnit;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

pool.schedule(t4, 10, TimeUnit.MILLISECONDS);

 

單任務延遲連線池

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

 

7 使用示例

複製程式碼
 1 public class Test {
 2      public static void main(String[] args) {   
 3          ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
 4                  new ArrayBlockingQueue<Runnable>(5));
 5           
 6          for(int i=0;i<15;i++){
 7              MyTask myTask = new MyTask(i);
 8              executor.execute(myTask);
 9              System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+
10              executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
11          }
12          executor.shutdown();
13      }
14 }
15  
16  
17 class MyTask implements Runnable {
18     private int taskNum;
19      
20     public MyTask(int num) {
21         this.taskNum = num;
22     }
23      
24     @Override
25     public void run() {
26         System.out.println("正在執行task "+taskNum);
27         try {
28             Thread.currentThread().sleep(4000);
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32         System.out.println("task "+taskNum+"執行完畢");
33     }
34 }
複製程式碼

執行結果:

  View Code

從執行結果可以看出,當執行緒池中執行緒的數目大於5時,便將任務放入任務快取佇列裡面,當任務快取佇列滿了之後,便建立新的執行緒。

如果上面程式中,將for迴圈中改成執行20個任務,就會丟擲任務拒絕異常了。

不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立執行緒池:

1 Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE
2 Executors.newSingleThreadExecutor();    //建立容量為1的緩衝池
3 Executors.newFixedThreadPool(int);      //建立固定容量大小的緩衝池

下面是這三個靜態方法的具體實現:

複製程式碼
 1 public static ExecutorService newFixedThreadPool(int nThreads) {
 2     return new ThreadPoolExecutor(nThreads, nThreads,
 3                                   0L, TimeUnit.MILLISECONDS,
 4                                   new LinkedBlockingQueue<Runnable>());
 5 }
 6 public static ExecutorService newSingleThreadExecutor() {
 7     return new FinalizableDelegatedExecutorService
 8         (new ThreadPoolExecutor(1, 1,
 9                                 0L, TimeUnit.MILLISECONDS,
10                                 new LinkedBlockingQueue<Runnable>()));
11 }
12 public static ExecutorService newCachedThreadPool() {
13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
14                                   60L, TimeUnit.SECONDS,
15                                   new SynchronousQueue<Runnable>());
16 }
複製程式碼

     從它們的具體實現來看,它們實際上也是呼叫了ThreadPoolExecutor,只不過引數都已配置好了。

  newFixedThreadPool建立的執行緒池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設定為0,將maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立執行緒執行,當執行緒空閒超過60秒,就銷燬執行緒。

  實際中,如果Executors提供的三個靜態方法能滿足要求,就儘量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的引數有點麻煩,要根據實際任務的型別和數量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

 

from: https://www.cnblogs.com/wihainan/p/4765862.html