1. 程式人生 > >Core Java 談談 ThreadPoolExecutor

Core Java 談談 ThreadPoolExecutor

oos pre worker 靜態方法 idle ica eal catch linked

  說起Java 7的Executors框架的線程池,同學們能想到有幾種線程池,它們分別是什麽?

  一共有四個,它們分別是Executors的 newSingleThreadPool(), newCachedThreadPool(), newFixedThreadPool(),newScheduledThread(),四個靜態方法,當然在java 8中還有一個newWorkStealingThreadPool()。

  但今天這些這些不是咱們今天要說的重點,今天要說的重點是裏邊所使用的ThreadPoolExecutor, 這個是咱們要說的重點,咱們打開newSingleThreadPool()的源碼,這個定一個線程的線程池。

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newFixedThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * 
@return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())); }

  重點來了,裏邊有5個參數,分別是核心線程數,也就是啟動的時候線程池裏需要有的線程數,如果設置allowsCoreThreadTimeOut,那麽核心線程數就會在KeepAliveTime之後核心線程空閑的也會停止掉;第二個maximumPoolSize是最大線程數,註意,最大線程數是包括核心線程數的;KeepAliveTime是那些超過了核心線程的其他線程在單位時間內停止掉;TimeUnit是時間單位;BlockingQueue是阻塞隊列,用於存放那些超過了核心執行線程之外的任務。現在有一個問題,如果任務數超過了核心線程數,那麽多余的任務是進入maximumPoolSize,還是進入組則隊列呢?

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  我寫了一個測試的例子,讓大家更好的理解它的原理。線程池滿了之後會有一個rejection的策略,所以我提前寫了簡單的一個。

package com.hqs.core;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedFullHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " I am rejected!");
    }

}

  然後寫線程池測試類,為了大家的閱讀方便,我添加了一些註釋。

package com.hqs.core;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorTest implements Runnable {
    private int i;
    private CountDownLatch cdl;
    public ThreadPoolExecutorTest(int i, CountDownLatch cdl) {
        this.i = i;
        this.cdl = cdl;
    }
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(3); 
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i + " is running");
        cdl.countDown();
    }
    @Override
    public String toString() {
        return "i:" + i;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        BlockingQueue queue = new ArrayBlockingQueue<>(2); //定義一個阻塞隊列,長度為2
        CountDownLatch cdl = new CountDownLatch(5); //設置一個減門閂,用於遞減動作
     //定義了2個核心線程,3個最大線程,空閑線程持續5秒鐘,阻塞隊列,拒絕處理執行類
     ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 5, TimeUnit.SECONDS, queue, new RejectedFullHandler());
     
        //threadPool.allowCoreThreadTimeOut(true); //設置是否將空閑的核心線程超時後停止

        for(int i = 1; i <= 6; i++ ) {
            ThreadPoolExecutorTest t1 = new ThreadPoolExecutorTest(i, cdl);
            threadPool.execute(t1);
//            FutureTask future = (FutureTask)threadPool.submit(t1); //此處註釋,因為submit傳的參數是Runnable而不是Callable,所以返回結果為null
            System.out.println("queue content:" + queue.toString()); //將queue的內容打印出來
            
        }
        try {
            cdl.await(); //所有線程執行完之後,主線程繼續往下進行。
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        try {
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
            
        System.out.println("poosize :" + threadPool.getPoolSize()); //輸出PoolSize
        threadPool.shutdown();  //註意,所以線程池的最後都必須要顯示的關閉
        
    }
}


queue content:[]
queue content:[]
queue content:[i:3]
queue content:[i:3, i:4]
queue content:[i:3, i:4]
i:6 I am rejected!
queue content:[i:3, i:4]
1 is running
2 is running
5 is running
3 is running
4 is running
poosize :2

  我簡單解釋一下這個輸出結果,首先線程池初始化的時候啟動兩個線程,這時1,2進入到池子進行執行,但是1,2程序還沒有執行完,緊接著3,4被放到了Queue隊列裏邊,程序發現最大線程池是3個,目前2個核心線程池正在執行,還沒有達到最大值,所以啟用一個線程,執行5。當6進來的發現池子已經滿了,隊列也滿了,此時只能reject掉了,所以會走reject的異常處理。

  在此,程序執行的順序為 corePoolSize -> workQueue -> maximumPoolSize-corePoolSize -> RejectExecutionHandler (如果池子滿了)

  還有一個比較重要的點,也是同學們想知道的點,那就是execute()和submit()的區別是什麽? 

  1. execute方法是void的,沒有返回值,而submit()方法返回的是Future對象。當然還有一些拋出的異常會不一致,這裏就不詳述了。

  2. execute方法是在Executor的service中定義的,而submit方法是在ExecutorService中定義的。

  3. execute方法只支持Runnable參數,而submit方法除了支持Runnable外還支持Callable參數,也就意味著通過submit執行的結果是可以返回的,通過Future.get()方法,也就意味著並發計算的結果是可以返回的(這就是兩者為什麽區別的根本原因)

  那麽很多同學也會問,兩個方法都是異步執行的,那什麽情況下用execute或submmit方法呢?

  比如多線程並行執行,不需要執行結果返回的時候一般使用execute方法,如果需要多線程並行計算,並且都需要返回結果的時候,需要submmit方法,當然submmit一般情況下是blocking的,如果想在制定的時間取回結果,如果取不到就會拋異常是通過get()方法設置參數,一般同時處理大量文件的時候,將開啟多個線程對文件內容分別處理並將結果返回的再統計或匯總的時候比較方便。

  

Core Java 談談 ThreadPoolExecutor