1. 程式人生 > >執行緒池----ThreadPoolExecutor機制

執行緒池----ThreadPoolExecutor機制

ThreadPoolExecutor機制 
一、概述
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部執行緒池的形式對外提供管理任務執行,執行緒排程,執行緒池管理等等服務; 
2、Executors方法提供的執行緒服務,都是通過引數設定來實現不同的執行緒池機制。 
3、先來了解其執行緒池管理的機制,有助於正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的執行緒池


二、核心構造方法講解
下面是ThreadPoolExecutor最核心的構造方法
Java程式碼  收藏程式碼
  1. public
     ThreadPoolExecutor(int corePoolSize,  
  2.                               int maximumPoolSize,  
  3.                               long keepAliveTime,  
  4.                               TimeUnit unit,  
  5.                               BlockingQueue<Runnable> workQueue,  
  6.                               ThreadFactory threadFactory,  
  7.                               RejectedExecutionHandler handler) {  
  8.         if (corePoolSize < 0 ||  
  9.             maximumPoolSize <= 0 ||  
  10.             maximumPoolSize < corePoolSize ||  
  11.             keepAliveTime < 0)  
  12.             throw new IllegalArgumentException();  
  13.         if
     (workQueue == null || threadFactory == null || handler == null)  
  14.             throw new NullPointerException();  
  15.         this.corePoolSize = corePoolSize;  
  16.         this.maximumPoolSize = maximumPoolSize;  
  17.         this.workQueue = workQueue;  
  18.         this.keepAliveTime = unit.toNanos(keepAliveTime);  
  19.         this.threadFactory = threadFactory;  
  20.         this.handler = handler;  
  21.     }  

構造方法引數講解
引數名 作用
corePoolSize 核心執行緒池大小
maximumPoolSize 最大執行緒池大小
keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間
TimeUnit keepAliveTime時間單位
workQueue 阻塞任務佇列
threadFactory 新建執行緒工廠
RejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理


重點講解:
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係。 

1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。 
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行 
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務 
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理 
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒 
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉


執行緒管理機制圖示:


三、Executors提供的執行緒池配置方案

1、構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理
Java程式碼  收藏程式碼
  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.         return new ThreadPoolExecutor(nThreads, nThreads,  
  3.                                       0L, TimeUnit.MILLISECONDS,  
  4.                                       new LinkedBlockingQueue<Runnable>());  
  5.     }  

2、構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬
Java程式碼  收藏程式碼
  1. public static ExecutorService newCachedThreadPool() {  
  2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3.                                       60L, TimeUnit.SECONDS,  
  4.                                       new SynchronousQueue<Runnable>());  
  5.     }  

3、構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行
Java程式碼  收藏程式碼
  1. public static ExecutorService newSingleThreadExecutor() {  
  2.         return new FinalizableDelegatedExecutorService  
  3.             (new ThreadPoolExecutor(11,  
  4.                                     0L, TimeUnit.MILLISECONDS,  
  5.                                     new LinkedBlockingQueue<Runnable>()));  
  6.     }  

4、構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的
Java程式碼  收藏程式碼
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
  2.         return new ScheduledThreadPoolExecutor(corePoolSize);  
  3.     }  
  4. public static ScheduledExecutorService newScheduledThreadPool(  
  5.             int corePoolSize, ThreadFactory threadFactory) {  
  6.         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  
  7.     }  
  8. public ScheduledThreadPoolExecutor(int corePoolSize,  
  9.                              ThreadFactory threadFactory) {  
  10.         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,  
  11.               new DelayedWorkQueue(), threadFactory);  
  12.     }  


四、定製屬於自己的非阻塞執行緒池
Java程式碼  收藏程式碼
  1. import java.util.concurrent.ArrayBlockingQueue;  
  2. import java.util.concurrent.ExecutorService;  
  3. import java.util.concurrent.RejectedExecutionHandler;  
  4. import java.util.concurrent.ThreadFactory;  
  5. import java.util.concurrent.ThreadPoolExecutor;  
  6. import java.util.concurrent.TimeUnit;  
  7. import java.util.concurrent.atomic.AtomicInteger;  
  8. public class CustomThreadPoolExecutor {  
  9.     private ThreadPoolExecutor pool = null;  
  10.     /** 
  11.      * 執行緒池初始化方法 
  12.      *  
  13.      * corePoolSize 核心執行緒池大小----10 
  14.      * maximumPoolSize 最大執行緒池大小----30 
  15.      * keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間----30+單位TimeUnit 
  16.      * TimeUnit keepAliveTime時間單位----TimeUnit.MINUTES 
  17.      * workQueue 阻塞佇列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞佇列 
  18.      * threadFactory 新建執行緒工廠----new CustomThreadFactory()====定製的執行緒工廠 
  19.      * rejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時, 
  20.      *                          即當提交第41個任務時(前面執行緒都沒有執行完,此測試方法中用sleep(100)), 
  21.      *                                任務會交給RejectedExecutionHandler來處理 
  22.      */  
  23.     public void init() {  
  24.         pool = new ThreadPoolExecutor(  
  25.                 10,  
  26.                 30,  
  27.                 30,  
  28.                 TimeUnit.MINUTES,  
  29.                 new ArrayBlockingQueue<Runnable>(10),  
  30.                 new CustomThreadFactory(),  
  31.                 new CustomRejectedExecutionHandler());  
  32.     }  
  33.     public void destory() {  
  34.         if(pool != null) {  
  35.             pool.shutdownNow();  
  36.         }  
  37.     }  
  38.     public ExecutorService getCustomThreadPoolExecutor() {  
  39.         return this.pool;  
  40.     }  
  41.     private class CustomThreadFactory implements ThreadFactory {  
  42.         private AtomicInteger count = new AtomicInteger(0);  
  43.         @Override  
  44.         public Thread newThread(Runnable r) {  
  45.             Thread t = new Thread(r);  
  46.             String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);  
  47.             System.out.println(threadName);  
  48.             t.setName(threadName);  
  49.             return t;  
  50.         }  
  51.     }  
  52.     private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {  
  53.         @Override  
  54.         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
  55.             // 記錄異常  
  56.             // 報警處理等  
  57.             System.out.println("error.............");  
  58.         }  
  59.     }  
  60.     // 測試構造的執行緒池  
  61.     public static void main(String[] args) {  
  62.         CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();  
  63.         // 1.初始化  
  64.         exec.init();  
  65.         ExecutorService pool = exec.getCustomThreadPoolExecutor();  
  66.         for(int i=1; i<100; i++) {  
  67.             System.out.println("提交第" + i + "個任務!");  
  68.             pool.execute(new Runnable() {  
  69.                 @Override  
  70.                 public void run() {  
  71.                     try {  
  72.                         Thread.sleep(3000);  
  73.                     } catch (InterruptedException e) {  
  74.                         e.printStackTrace();  
  75.                     }  
  76.                     System.out.println("running=====");  
  77.                 }  
  78.             });  
  79.         }  
  80. 相關推薦

    執行----ThreadPoolExecutor機制

    ThreadPoolExecutor機制  一、概述 1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部執行緒池的形式對外提供管理任務執行,執行緒排程,執行緒池管理等等服務;  2、Executors方法提

    【Java】執行ThreadPoolExecutor實現原理

    引言 執行緒池:可以理解為緩衝區,由於頻繁的建立銷燬執行緒會帶來一定的成本,可以預先建立但不立即銷燬,以共享方式為別人提供服務,一來可以提供效率,再者可以控制執行緒無線擴張。合理利用執行緒池能夠帶來三個好處: 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造

    Java執行ThreadPoolExecutor詳解

      1、執行緒池的工作原理?   執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行它們。 當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:

    Java多執行 - 執行 ThreadPoolExecutor類的使用

    ThreadPoolExecutor類可以非常方便的建立執行緒池物件,而不需要程式設計師設計大量的new去例項化Thread相關的程式碼 最常用的構造方法 (不過threadFactory可以視情況設或不設) ThreadPoolExecutor(   &nb

    執行的建立與執行ThreadPoolExecutor,Executors

                        執行緒的建立與執行緒池及執行緒池工具類 1.執行緒的建立方式 1.1繼承Thread類重寫run方法 public class Test { p

    一心多用多執行-執行ThreadPoolExecutor-看這篇就夠了

    首先先寫一下執行緒池的概念: 執行緒池:執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,然後在建立執行緒後自動啟動這些任務。執行緒池執行緒都是後臺執行緒。每個執行緒都使用預設的堆疊大小,以預設的優先順序執行,並處於多執行緒單元中。如果某個執行緒在託管程式碼中空閒(如正在等待某個

    執行 -- ThreadPoolExecutor

    執行緒池的實現原理 1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟 需要獲取全域性鎖)。 2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。 3)如果無法將任務

    關於執行ThreadPoolExecutor引數設定那些事

           由於近期工作需要,最近需要測試開發某元件(該元件中用到了ThreadPoolExecutor)的效能,測試工具是soapUI,由於以前對測試效能方面接觸較少。所以藉此機會再網上查閱了相關資料,總結如下。      

    執行ThreadPoolExecutor配置、引數詳解及例子

         對於執行緒池,我僅限於簡單的使用,對其原理和引數並不是很理解。以前只是在網上找個執行緒池的例子,然後模仿寫一下而已,最近專案中又再次用的執行緒池,做了一些研究,現記錄下,以備以後使用。     我以前利用執行緒池只會這樣用:

    Java多執行-執行ThreadPoolExecutor構造方法和規則 Java多執行-執行ThreadPoolExecutor構造方法和規則

    Java多執行緒-執行緒池ThreadPoolExecutor構造方法和規則 2017年05月03日 17:15:37 閱讀數:40542 為什麼用執行緒池 部落格地址 http://blog.csdn.ne

    執行--ThreadPoolExecutor

    執行緒池的實現原理 1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟 需要獲取全域性鎖)。 2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。 3)如果無法將任務加入BlockingQueue(

    Java 進階——多執行優化之執行 ThreadPoolExecutor的核心容器阻塞佇列詳解(一)

    #引言 多執行緒我想無論是後端開發,還是對於App開發者來說都不會陌生,何況Android強制要求不能在主執行緒中做網路請求,於是乎,在很多初學者或者App的原始碼中會出現會多的new Thread…的方式,這樣的程式碼是不優雅而且存在很多的隱患,假如說在使用者

    Java多執行-執行ThreadPoolExecutor構造方法和規則

    為什麼用執行緒池 有時候,系統需要處理非常多的執行時間很短的請求,如果每一個請求都開啟一個新執行緒的話,系統就要不斷的進行執行緒的建立和銷燬,有時花在建立和銷燬執行緒上的時間會比執行緒真正執行的時間還長。而且當執行緒數量太多時,系統不一定能受得了。 使用執

    python執行ThreadPoolExecutor與程序ProcessPoolExecutor

    1 import time 2 from concurrent.futures import ThreadPoolExecutor 3 4 def get_thread_time(times): 5 time.sleep(times) 6 return times 7 8

    Java執行ThreadPoolExecutor

    執行緒池    ThreadPoolExecutor 執行緒池是ExecutorService的實現,可以通過Executors執行工廠構造不同型別的執行服務(執行緒池)。    ThreadPoolExecutor執行緒池使用的是阻塞佇列BlockingQueue。

    java執行ThreadPoolExecutor類使用詳解

    在《阿里巴巴java開發手冊》中指出了執行緒資源必須通過執行緒池提供,不允許在應用中自行顯示的建立執行緒,這樣一方面是執行緒的建立更加規範,可以合理控制開闢執行緒的數量;另一方面執行緒的細節管理交給執行緒池處理,優化了資源的開銷。而執行緒池不允許使用Executors去建立,而要通過ThreadPoolExe

    JAVA併發程式設計:執行 ThreadPoolExecutor

    生活 前期追深度,否則會華而不實,後期追廣度,否則會坐井觀天; 前言 在前面,我們已經對Thread有了比較深入的瞭解,並且已經學會了通過new Thread()來建立一個執行緒,並通過start方法來啟動一個執行緒,這種方法非常簡單,同樣也存在弊端: 1、每次通過new Thr

    執行ThreadPoolExecutor分析: 執行是什麼時候建立執行的,佇列中的任務是什麼時候取出來的?

      帶著幾個問題進入原始碼分析: 執行緒池是什麼時候建立執行緒的? 任務runnable task是先放到core到maxThread之間的執行緒,還是先放到佇列? 佇列中的任務是什麼時候取出來的? 什麼時候會觸發reject策略? core到maxThread之間的執行緒什麼時候會di

    java執行ThreadPoolExecutor和阻塞佇列BlockingQueue,Executor, ExecutorService

    ThreadPoolExecutor 引數最全的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

    關於執行ThreadPoolExecutor的初學

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 8, 3, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); 這個執行緒池一共有5個引數: