1. 程式人生 > 其它 >java 執行緒池原始碼分析

java 執行緒池原始碼分析

  • java執行緒池幹了些什麼事
    • 主要是維持一些常駐的執行緒,避免每次執行任務新建執行緒的開銷
    • 將執行緒的執行和提交解耦,使用者只用關心執行緒的提交,而不用關心執行緒的執行
  • 我們通常怎麼使用執行緒池
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
        Future<?> future =  threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                int a = 1/0;
            }
        });
        future.get();
  • 從submit方法出發,看看都幹了些什麼事
    • submit方法是executorService介面的方法,在threadPoolExecutor的父類,abstractExecutorService中被實現。
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
  • 可以看到,先是將runnable轉換成一個futureTask。然後執行。
  • 在threadPoolExecutor中
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  • 如果當前worker的數量少於coreSize,直接新加一個worker, 否則,將任務加入佇列中。
  • 看下worker的結構。
 /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
  • worker 本身是一個runnable的例項,同時包含一個thread變數。在worker方法被執行緒執行時,會呼叫到run方法,然後看下runWorker。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 首先runWorker是一個死迴圈,不斷的從queue中取任務執行,由於是阻塞佇列,佇列中無任務時,阻塞等待。需要注意的是,如果這裡task拋了異常,會丟擲exception,然後執行workerExit邏輯
private void processWorkerExit(Worker w, boolean completedAbruptly) {
       if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
           decrementWorkerCount();

       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           completedTaskCount += w.completedTasks;
           workers.remove(w);
       } finally {
           mainLock.unlock();
       }

       tryTerminate();

       int c = ctl.get();
       if (runStateLessThan(c, STOP)) {
           if (!completedAbruptly) {
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               if (min == 0 && ! workQueue.isEmpty())
                   min = 1;
               if (workerCountOf(c) >= min)
                   return; // replacement not needed
           }
           addWorker(null, false);
       }
   }
  • 如果woker因為異常退出了,那這個worker和worker內部維護的thread都會被銷燬,從workset中移除之後,worker沒有引用,等待被gc。同時,看是不是要補充新的worker。

ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor相比普通的threadPoolExecutor幹了什麼事呢,無非是一個任務,隔一定間隔重複執行。
  • 我們怎麼使用ScheduledThreadPoolExecutor?
@Test
    public void scheduleExecutorService() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        executorService.scheduleWithFixedDelay(()->{
            System.out.println("aa");
        },2,2,TimeUnit.SECONDS);
        executorService.scheduleAtFixedRate(()->{
            System.out.println("aa");
        },2,2,TimeUnit.SECONDS);
    }
  • 看下scheduleAtFixedRate和scheduleWithFixedDelay的區別是什麼
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

  • 兩個方法的唯一區別是在構造ScheduledFutureTask時,scheduleWithFixedDelay傳入的是負值,看下怎麼處理這個時間的,
private void setNextRunTime() {
           long p = period;
           if (p > 0)
               time += p;
           else
               time = triggerTime(-p);
       }

/**
    * Returns the trigger time of a delayed action.
    */
   long triggerTime(long delay) {
       return now() +
           ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
   }
  • 對於scheduleAtFixedRate,p>0,下次執行時間是上次執行時間+p。 對於scheduleWithFixedDelay,p<0,下次執行時間為當前時間+p,即本次任務的執行結束時間+p
  • scheduleWithFixedDelay前一個任務完成之後,開始計時,到指定的delay之後,有機會執行,具體執行時間看排程
  • scheduleAtFixedRate, 從任務提交開始,計算時間,按照指定的間隔執行任務。
public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
  • 我們對於scheduleThreadPool的任務提交,提交之後,即加入佇列中。然後和ThreadPoolExecutor一樣,由worker不斷的從佇列中取任務出來執行。且執行完之後,再設定下一次的執行時間,扔到佇列裡。
  • 這裡的佇列是維護了一個最小堆,每次從堆頂拿要執行的任務。上面我們知道,worker從佇列中取任務時,是呼叫佇列的take方法。
public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
  • 如果佇列中沒有任務,則等待,如果隊首的任務已經到該執行的時間了,則直接返回,如果還沒到delay的時間,則等待delay時間。
  • 這裡leader是幹啥的呢?leader用於記載當前哪個執行緒是第一個來取隊首任務的執行緒,如果一個執行緒標記了,後面的執行緒就執行等待了,同時這個標記的執行緒只await delay的時間,這樣,當次執行緒醒來的時候,直接可以取到隊首的元素去執行了。如果不這樣的話,加入所有的執行緒都await(),後面大家還需要再競爭一次。這樣通過leader機制,減少了一次競爭。
  • 那如果再等待的時候,又來了一個更快執行的任務呢,
public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
  • 如果佇列裡通過offer又加入了一個新的task,且加在了隊首,那這時候,會呼叫 available.signal();,喚醒所有再等待的執行緒,大家再重新競爭,重新獲取隊首任務。