1. 程式人生 > 實用技巧 >java執行緒池原始碼解析

java執行緒池原始碼解析

主要介紹執行緒池相關知識,關於執行緒池,首先我們思考下為什麼要用執行緒池。如果單純的使用執行緒,執行緒的建立和銷燬都是自己來完成,如果併發請求過多,可能造成資源耗盡。執行緒池可以對執行緒進行統一分配,調優和監控。本篇文章為《圖靈學院》課程筆記

  • 降低資源消耗(執行緒無限制地建立,然後使用完畢後銷燬)
  • 提高響應速度(無須建立執行緒)
  • 提高執行緒的可管理性

java是如何實現和管理執行緒池的,jdk5開始把工作單元和任務執行分離,工作單元包括callable、runnable,而執行機制由Executor提供,Executor的實現還提供了對執行緒生命週期的管理

相關介面

介面介紹

  • java.util.concurrent.Executor (執行器,執行方法)

  • java.util.concurrent.ExecutorService (執行服務) 包含服務的生命週期

  • java.util.concurrent.ScheduledExecutorService (排程相關的服務)

核心介面實現

  • java.util.concurrent.ThreadPoolExecutor (普通的的執行緒池實現類)
  • java.util.concurrent.ScheduledThreadPoolExecutor (排程的核心實現類)
名稱 方法 說明 型別
java.util.concurrent.
Executor
execute 執行介面 介面
java.util.concurrent.
ExecutorService
submit(java.util.concurrent.Callable) 提交介面 介面
java.util.concurrent.
AbstractExecutorService
submit(Callable task) 把執行和提交介面
進行合併區別:有
返回值和無返回值
抽象類
java.util.concurrent.
ThreadPoolExecutor
execute(Runnable command) 調 用
runwork 方 法
getTask(從佇列
拿資料)
實現類
java.util.concurrent.
ScheduledExecutorService
scheduleAtFixedRate、scheduleWithFixedDelay 定義方法 介面
java.util.concurrent.
ScheduledThreadPoolExecutor
delayedExecute 具體實現
add>task>addWo
rk
實現類

內部類分為兩種

  • policy 策略
  • worker 工作

內部工作原理(構造方法賦值)

  • corePool:核心執行緒池大小
  • maximumPool:最大執行緒池大小
  • BlockingQueue:任務工作佇列
  • keepAliveTime:執行緒活躍時間,如果執行緒數量大於核心執行緒數量,多餘執行緒空閒時間超時候被銷燬
  • RejectedExecutionHandler:當ThreadPoolExecutor關閉或最大執行緒池已經滿了,executor將呼叫的handler
  • ThreadFactory:使用ThreadFactory建立執行緒,預設使用defaultThreadFactory

執行緒池的執行思路

  • 如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;
  • 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;
  • 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
  • 如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止

拒接策略

 ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常(預設)
 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常
 ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
 ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

原始碼解析

執行緒池的執行原理

初始化構造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor#execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
  //判斷是否小於核心數量,是直接新增work成功後直接退出
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
          // 增加失敗後繼續獲取標記 
            c = ctl.get();
        }
  //判斷是執行狀態並且扔到workQueue裡成功後 
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
          //再次check判斷執行狀態如果是非執行狀態就移除出去&reject掉
            if (! isRunning(recheck) && remove(command))
                reject(command);
          //否則發現可能執行執行緒數是0那麼增加一個null的worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
         }
      //直接增加worker如果不成功直接reject
        else if (!addWorker(command, false))
            reject(command);
    }

ThreadPoolExecutor#addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary. 
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      // 兩種情況
      //1.如果非執行狀態 
      //2.不是這種情況(停止狀態並且是null物件並且workQueue不等於null)
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;// 判斷是否飽和容量了
        if (compareAndIncrementWorkerCount(c)) //增加一個work數量 然後跳出去
            break retry;
        c = ctl.get();  // Re-read ctl  增加work失敗後繼續遞迴
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
  //增加一個worker
    w = new Worker(firstTask);
    final Thread t = w.thread;
  //判斷是否 為null
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.  鎖定後並重新檢查下 是否存線上程工廠的失敗或者鎖定前的關閉
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();  
                workers.add(w);   //增加work
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) { //本次要是新增加work成功就呼叫start執行
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
    }

ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//1.取到當前執行緒
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
    while (task != null || (task = getTask()) != null) { //獲取任務 看看是否能拿到
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted.  This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();// 確保執行緒是能中斷的
        try {
            beforeExecute(wt, task); //開始任務前的鉤子
            Throwable thrown = null;
            try {
                task.run();//執行任務
            } catch (RuntimeException x) {
                thrown = x; throw x;
            } catch (Error x) {
                thrown = x; throw x;
            } catch (Throwable x) {
                thrown = x; throw new Error(x);
            } finally {
                afterExecute(task, thrown); //任務後的鉤子
            }
        } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}
}

ThreadPoolExecutor#processWorkerExit

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);  //移除work
} finally {
    mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) { //判斷是否還有任務
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}
}

執行緒池排程原理

排程核心構造器

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor#delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
    reject(task);
else {
    super.getQueue().add(task);//增加任務
    if (isShutdown() &&
        !canRunInCurrentRunState(task.isPeriodic()) &&
        remove(task))
        task.cancel(false);
    else
        ensurePrestart();
}
}

通過DelayedWorkQueue 延遲佇列實現 offer獲取物件的延遲

ScheduledThreadPoolExecutor.DelayedWorkQueue#offer

public boolean offer(Runnable x) {
if (x == null)
    throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; //當前物件
final ReentrantLock lock = this.lock;
lock.lock();
try {
    int i = size;
    if (i >= queue.length) //擴容
        grow();
    size = i + 1;
    if (i == 0) {
        queue[0] = e;
        setIndex(e, 0); //第一個直接設定索引和下標0
    } else {
        siftUp(i, e); //篩選到上邊
    }
    if (queue[0] == e) {
        leader = null;
        available.signal(); //喚醒所有的被擠壓的wait執行緒
    }
} finally {
    lock.unlock();
}
return true;
}

ScheduledThreadPoolExecutor.DelayedWorkQueue#siftUp

 private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
    int parent = (k - 1) >>> 1; 
    RunnableScheduledFuture<?> e = queue[parent];
    if (key.compareTo(e) >= 0)
        break; 
    queue[k] = e;
    setIndex(e, k);
    k = parent;
}
queue[k] = key;
setIndex(key, k);
 }

ScheduledThreadPoolExecutor.ScheduledFutureTask#compareTo


 public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
    return 0;
if (other instanceof ScheduledFutureTask) {
    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    long diff = time - x.time; //判斷time
    if (diff < 0)
        return -1;
    else if (diff > 0)
        return 1;
    else if (sequenceNumber < x.sequenceNumber)
        return -1;
    else
        return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
   }

ThreadPoolExecutor#ensurePrestart

確保有work執行

 void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
   }

ScheduledThreadPoolExecutor.DelayedWorkQueue#take

work執行的時候呼叫queue的take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
    for (;;) {
        RunnableScheduledFuture<?> first = queue[0];//獲取第一個物件
        if (first == null)
            available.await();
        else {
            long delay = first.getDelay(NANOSECONDS);//延遲時間
            if (delay <= 0)//到時間了
                return finishPoll(first);
            first = null; // don't retain ref while waiting
            if (leader != null)
                available.await();//因為沒有執行執行緒初始化,所以等等什麼時候有了自己被他人喚醒
            else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                    available.awaitNanos(delay); //各種condition的awaitNanos
                } finally {
                    if (leader == thisThread)
                        leader = null;
                }
            }
        }
    }
} finally {
    if (leader == null && queue[0] != null)
        available.signal();
    lock.unlock();
}

        }

ScheduledThreadPoolExecutor.DelayedWorkQueue#finishPoll

 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];  //重排序佇列
queue[s] = null;
if (s != 0)
    siftDown(0, x);
setIndex(f, -1);
return f
}

ScheduledThreadPoolExecutor.ScheduledFutureTask#run

public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
    cancel(false);
else if (!periodic)
    ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//有period的要執行成功設定下次執行時間和增加額外任務
    setNextRunTime();
    reExecutePeriodic(outerTask);
}
}

非同步結果原始碼分析

怎麼拿到的非同步任務結果?

FutureTask#awaitDone

  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
    if (Thread.interrupted()) { //check執行緒中斷
        removeWaiter(q);
        throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {  //判斷是否完成
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();  //生成一個waint物件
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);//連結串列的物件下一個置成當前的waitNode
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos); //等待時間阻塞
    }
    else
        LockSupport.park(this); //一直阻塞
}
  }

什麼時候回填的結果

FutureTask#run

 public void run() {
   if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                 null, Thread.currentThread())) //如果狀態不是new 或者 runner狀態置不成功直接退出
    return;
try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
            result = c.call();//執行ok 的時候返回result
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)//正常成功set result物件
            set(result);
    }
} finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

 }

FutureTask#cancel

 public boolean cancel(boolean mayInterruptIfRunning) {
   if (!(state == NEW &&
      UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
          mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //CAS 置成stateOffset 的中斷或者取消
    return false;
try {    // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {  //如果執行緒執行中,可能中斷
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
} finally {
    finishCompletion();
}
return true;

 }

原文地址

http://cbaj.gitee.io/blog/2020/08/22/java%E7%BA%BF%E7%A8%8B%E6%B1%A0%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/#more