1. 程式人生 > 程式設計 >JDK執行緒池

JDK執行緒池

本文地址: juejin.im/post/5dba4a…

執行緒池用於存放執行緒,通過對執行緒的複用,很大程度上減少了頻繁建立和銷燬執行緒導致的資源損耗. 下面簡單地介紹一下 JDK(1.8)中的執行緒池.

執行緒池引數

在介紹JDK的4種執行緒池之前,先介紹一下執行緒池的幾個引數

  • corePoolSize 執行緒池的核心執行緒數量,
  • maximumPoolSize 執行緒池的最大執行緒數量
  • keepAliveTime 執行緒被回收的最大空閒時間
  • keepAliveTime 的單位(ms、s、...)
  • BlockingQueue 任務佇列,存放任務
  • ThreadFactory 執行緒工廠
  • RejectedExecutionHandler 執行緒池拒絕策略(當任務過多的時候,執行緒池拒絕任務的方式)
  • allowCoreThreadTimeOut 允許核心執行緒池被回收,預設 false,so 預設情況下 核心執行緒並不會被回收掉.
    // 用一個原子變數來儲存執行緒池狀態和執行緒數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
    // 32 - 3 = 29,int 的左邊三位表示執行緒池狀態,右邊29位表示執行緒數量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大執行緒數量: 000 & (29個1) = 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 執行緒池的幾個狀態 // 100 & (29個0),執行緒池執行狀態 private static final int RUNNING = -1 << COUNT_BITS; // 000 & (29個0),執行shutdown方法時,不接收新的任務,會先執行完當前已經接收的,任務佇列任務執行完 && 執行緒已經全部終止: state -> TIDYING private static
final int SHUTDOWN = 0 << COUNT_BITS; // 001 & (29個0),執行 shutdownNow() 方法時 private static final int STOP = 1 << COUNT_BITS; // 010 & (29個0) private static final int TIDYING = 2 << COUNT_BITS; // 011 & (29個0),執行緒池已經終止 private static final int TERMINATED = 3 << COUNT_BITS; 複製程式碼

JDK 提供的四種執行緒池介紹

當設定為無界佇列的時候,最大執行緒數並不會起作用,因為非核心執行緒只有在佇列滿了之後,才會被新增

newFixedThreadPool

固定執行緒池數量,核心執行緒數 = 最大執行緒數

任務佇列: LinkedBlockingQueue(Integer.MAX_VALUE) 無界佇列

適用於同時處理固定任務數的場景.

    public static ExecutorService newFixedThreadPool(int nThreads) {
	      // coreThreads = maxThreads
        return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
複製程式碼

newCachedThreadPool

核心執行緒數為0,最大為 Integer.MAX_VALUE,也就是當任務足夠多的時候,可以無限增加執行緒. 並且所有的執行緒空閒超過一段時間(呼叫 Executors 建立的預設 KeepAlive為 60s)就會被回收.

任務佇列: SynchronousQueue 預設傳入引數 fair=false,即處理任務非公平.

適用於處理小任務

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }
複製程式碼

SynchronousQueue

    public SynchronousQueue(boolean fair) {
        // fair = true 則會按照FIFO先入先出的順序執行 
        // fair = false(預設值) 則優先取出最新新增的任務,最早新增的任務最晚執行
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
複製程式碼

newSingleThreadExecutor

單個執行緒的執行緒池

任務佇列: LinkedBlockingQueue 同樣是無界佇列

適用於需要將任務按順序執行的時候

     public static ExecutorService newSingleThreadExecutor() {
        // 核心執行緒數=最大執行緒數=1
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1,1,new LinkedBlockingQueue<Runnable>()));
    }
複製程式碼

newScheduledThreadPool

固定核心執行緒數,執行緒數量不會再增長,maximumPoolSize 這個引數對定時執行緒池沒有作用.

oracle的api檔案是這麼寫的:

While this class inherits from ThreadPoolExecutor,a few of the inherited tuning methods are not useful for it. In particular,because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue,adjustments to maximumPoolSize have no useful effect.

任務佇列: DelayedWorkQueue 無界佇列,這是 ScheduledThreadPoolExecutor 的一個內部類

更適用於需要延時執行或者定時需求的場景

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize,0,NANOSECONDS,new DelayedWorkQueue());
    }
複製程式碼

執行緒池拒絕策略

AbortPolicy

拒絕新任務,並且丟擲異常,預設的拒絕策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
            // 直接丟擲異常 
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
複製程式碼

CallerRunsPolicy

當拒絕任務的時候,由呼叫執行緒處理該任務.

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
            // 如果執行緒池未停止
            if (!e.isShutdown()) {
	        // 當前執行緒直接呼叫任務的run方法
                r.run();
            }
        }
    }
複製程式碼

DiscardPolicy

拒絕新任務,靜悄悄的將新任務丟棄,而不通知(太坑了吧),具體看它的程式碼也是什麼事情都沒做,還真的就直接丟棄任務了.

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
          // 任務被拒絕後,啥事情都不幹
        }
    }
複製程式碼

DiscardOldestPolicy

當任務滿時,拋棄舊的未處理的任務,然後重新執行 execute 方法(此過程會重複),除非執行緒池停止執行,這種情況任務將被丟棄.具體看程式碼

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
	public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
  	    // 如果執行緒池停止,直接丟棄任務,不做任何處理
            if (!e.isShutdown()) {
               // 丟棄一個任務佇列中的任務
               e.getQueue().poll();
               // 重新執行被拒絕的任務,如果再次被拒絕,則會一直重複這個過程
               e.execute(r);
            }
        }
   }
複製程式碼

終止執行緒池

shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改狀態為 shutdown
            advanceRunState(SHUTDOWN);
            // 停止空閒的執行緒,有執行任務的不會停止
            interruptIdleWorkers();
            // ScheduledThreadPoolExecutor 定時執行緒池才用到這個方法
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        // 終止執行緒池裡的所有執行緒
        tryTerminate();
    }
複製程式碼

interruptIdleWorkers,從方法名可以看出,它會終止掉當前空閒的執行緒

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 因為執行任務 runWorker() 的時候,執行了 worker.lock()方法
                // 所以如果當前執行緒有任務在執行,則 tryLock不會成功,
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
複製程式碼

shutdownNow

這個方法的作用是立刻強制停止所有執行緒,即使該執行緒有正在執行的任務.

並且停止所有執行緒後,返回任務佇列中還未執行的任務.

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改狀態為 stop
            advanceRunState(STOP);
            // 強制停止所有執行緒,有任務在執行也不管
            interruptWorkers();
            // 返回還未執行的任務
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 終止執行緒池裡的所有執行緒
        tryTerminate();
        return tasks;
    }

    // 暫停所有 worker 執行緒 
	  private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

		void interruptIfStarted() {
        Thread t;
      	// 如果執行緒已經啟動則直接終止
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
          try {
            t.interrupt();
          } catch (SecurityException ignore) {
          }
        }
     }
複製程式碼

下面這段程式碼兩個shutdown方法都呼叫到
	final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下情況,取消終止:
            // 1.執行緒池是執行狀態 running
            // 2.runStateAtLeast(c,TIDYING) 執行緒池狀態是 tidying 或者  terminated
            // 3.執行緒池狀態為 shutdown && 任務佇列不為空,即是任務沒處理完 (shutdown狀態的時候會繼續處理已經新增的任務)
            if (isRunning(c) ||
                runStateAtLeast(c,TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            /**
             * worker 在 {@link #getTask()} 的時候,獲取到 null 表示 worker 此時需要停止
             * worker[] 移除當前worker後會呼叫這個方法將執行緒進行終止
             * 每個worker停止的時候,會呼叫這個方法將當前執行緒進行終止
             * so it is ONLY_ONE
             * {@link #processWorkerExit}
             */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 程式碼要執行到這裡到話,( state == stop || ( state == shutdown && workQueue 佇列為空) ) && 沒有正在執行的執行緒
                // 這兩種情況下,所有的執行緒都已經終止
                // cas 嘗試修改 ctl 的狀態,
                // 修改失敗,外層 for迴圈會再次執行
                if (ctl.compareAndSet(c,ctlOf(TIDYING,0))) {
                    try {
                        // 這裡不做任何事情
                        terminated();
                    } finally {
                        // 最後將狀態修改為 terminated,表示執行緒池完全停止
                        ctl.set(ctlOf(TERMINATED,0));
                        // 通知所有在等待鎖的執行緒
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS

        }
    }
複製程式碼

shutdown 和 shutdownNow 對比:

shutdown: 執行緒將執行完已經新增進佇列中的所有任務,不接受新任務.

shutdownNow: 立刻終止所有正在執行的執行緒,並且返回任務佇列中的任務.

最後

最後的最後,非常感謝你們能看到這裡!!你們的閱讀都是對作者的一次肯定!!!
覺得文章有幫助的看官順手點個贊再走唄(終於暴露了我就是來騙讚的(◒。◒)),你們的每個贊對作者來說都非常重要(異常真實),都是對作者寫作的一次肯定(double)!!!

這一篇的內容到這就結束了,期待下一篇 還能有幸遇見你!