1. 程式人生 > >Android執行緒池原始碼解析

Android執行緒池原始碼解析

     上一篇部落格大概瞭解了下執行緒池是什麼,這篇部落格將在原始碼的基礎上去驗證上一篇部落格中提到的

Thread執行流程。我的部落格保證是一個字一個字敲出來的

   1.執行緒池原始碼解析

    在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,所以就先從execte方法開始

來看看執行緒池的執行流程:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 活動執行緒數 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啟動新的執行緒。第二個引數true:addWorker中會重新檢查workerCount是否小於corePoolSize
        if (addWorker(command, true))
            // 新增成功返回
            return;
        c = ctl.get();
    }
    // 活動執行緒數 >= corePoolSize
    // runState為RUNNING && 佇列未滿
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double check
        // 非RUNNING狀態 則從workQueue中移除任務並拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 採用執行緒池指定的策略拒絕任務
        // 執行緒池處於RUNNING狀態 || 執行緒池處於非RUNNING狀態但是任務移除失敗
        else if (workerCountOf(recheck) == 0)
            // 這行程式碼是為了SHUTDOWN狀態下沒有活動執行緒了,但是佇列裡還有任務沒執行這種特殊情況。
            // 新增一個null任務是因為SHUTDOWN狀態下,執行緒池不再接受新任務
            addWorker(null, false);

        // 兩種情況:
        // 1.非RUNNING狀態拒絕新的任務
        // 2.佇列滿了啟動新的執行緒失敗(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}
     註釋比較清楚了就不再解釋了,其中比較難理解的應該是addWorker(null, false);這一行,這要結合addWorker一起來看。 主

要目的是防止HUTDOWN狀態下沒有活動執行緒了,但是佇列裡還有任務沒執行這種特殊情況。

我們來看看addWorker(command,true)這個方法的內部實現:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);// 當前執行緒池狀態

            // Check if queue empty only if necessary.
            // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 滿足下列調價則直接返回false,執行緒建立失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時不再接受新的任務,且所有任務執行結束
            // rs = SHUTDOWN:firtTask != null 此時不再接受任務,但是仍然會執行佇列中的任務
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
            // false),任務為null && 佇列為空
            // 最後一種情況也就是說SHUTDONW狀態下,如果佇列不為空還得接著往下執行,為什麼?add一個null任務目的到底是什麼?
            // 看execute方法只有workCount==0的時候firstTask才會為null結合這裡的條件就是執行緒池SHUTDOWN了不再接受新任務
            // 但是此時佇列不為空,那麼還得建立執行緒把任務給執行完才行。
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.執行緒池狀態為RUNNING
            // 2.SHUTDOWN狀態,但佇列中還有任務需要執行
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))// 原子操作遞增workCount
                    break retry;// 操作成功跳出的重試的迴圈
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)// 如果執行緒池的狀態發生變化則重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // wokerCount遞增成功

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 併發的訪問執行緒池workers物件必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    // RUNNING狀態 || SHUTDONW狀態下清理佇列中剩餘的任務
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啟動的執行緒新增到執行緒集合中,主要為了計算執行緒池執行緒的數量
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啟動新新增的執行緒,這個執行緒首先執行firstTask,然後不停的從佇列中取任務執行
                // 當等待keepAlieTime還沒有任務執行則該執行緒結束。見runWoker和getTask方法的程式碼。
                if (workerAdded) {
                    t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法
                    workerStarted = true;
                }
            }
        } finally {
            // 執行緒啟動失敗,則從wokers中移除w並遞減wokerCount
            if (!workerStarted)
                // 遞減wokerCount會觸發tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

      程式碼有點複雜,但靜下來也還看得下去,原始碼都這樣枯燥但有味道。這個方法的後面根據提交的Runnable任務例項化了一個

Worker物件,在Worker構造方法中根據ThreadFactory執行緒工程建立一個Thread物件

   Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

       Worker類實現了Runnable介面,我們來看看Worker的程式碼,主要看run方法中的runWorker方法,這個方法非常重要

簡單來說它做的就是:

  • 第一次啟動會執行初始化傳進來的任務firstTask;
  • 然後會從workQueue中取任務執行,如果佇列為空則等待keepAliveTime這麼長時間。
     
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的建構函式中抑制了執行緒中斷setState(-1),所以這裡需要unlock從而允許中斷
        w.unlock();
        // 用於標識是否異常終止,finally中processWorkerExit的方法會有不同邏輯
        // 為true的情況:1.執行任務丟擲異常;2.被中斷。
        boolean completedAbruptly = true;
        try {
            // 這裡第一次進來task不為空會執行firstTask,然後設定firstTask為null,這樣      
            //task = getTask()的邏輯才會被執行,這樣會從佇列中去任務執行
            while (task != null || (task = getTask()) != null) { 
                w.lock(); 
                 // If pool is stopping, ensure thread is interrupted; 
                 // if not, ensure thread is not interrupted. This 
                 // requires a recheck in second case to deal with 
                 // shutdownNow race while clearing interrupt
                 if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted()) 
                    wt.interrupt(); 
                 try { 
                    // 任務執行前可以插入一些處理,子類過載該方法
                    beforeExecute(wt, task); T
                    Throwable thrown = null;
                 try {
                   task.run();// 執行使用者任務,run方法為同步任務,所以該任務執行完後才會去阻塞佇列取執行緒 
                 } 
                 catch (RuntimeException x) {
                    thrown = x; throw x; } 
                 catch (Error x) { 
                    thrown = x; throw x; }
                 catch (Throwable x) { 
                    thrown = x; throw new Error(x); 
                 } finally { 
                   // 和beforeExecute一樣,留給子類去過載 
                   afterExecute(task, thrown); } 
                 } 
                 finally {
                  //設定task為空,不然不會去執行getTask的邏輯 
                  task = null; 
                  w.completedTasks++;
                  w.unlock(); }
                 } 
                  completedAbruptly = false;
                 }
                 finally { 
                  // 結束執行緒的一些清理工作
                  processWorkerExit(w, completedAbruptly);
                 }
               }

 getTask()是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1.rs > SHUTDOWN 所以rs至少等於STOP,這時不再處理佇列中的任務
            // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時還需要處理佇列中的任務除非佇列為空
            // 這兩種情況都會返回null讓runWoker退出while迴圈也就是當前執行緒結束了,所以必須要decrement
            // wokerCount
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減workerCount值
                decrementWorkerCount();
                return null;
            }
            
            int wc = workerCountOf(c);

            // 判斷如果設定允許為核心池中的執行緒設定空閒存活時間或者前者設定為false,當前執行緒數量大於corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null,                
                //take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止                
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

       判斷如果當前執行緒池的是否設定allowCoreThreadTimeOut為true,如果為true,則呼叫

poll(keepAliveTime,TimeUnit.NANOSECONDS)去阻塞佇列取執行緒,如果取不到(佇列為空),那麼就會等到keepAliveTime單位時間後

返回null,並且關閉該執行緒,即時是核心執行緒,否則會使用take()方法去執行緒池取執行緒,take也是從佇列裡面取出任務如

果佇列是空的則阻塞保證執行緒池裡面的核心執行緒數量的執行緒一直存在

       好了整個流程下來可能有點蒙逼,我們回過頭整理一下整個流程,當我們呼叫ThreadPoolExecutor類的execute方法提交一個線

程任務時,會開啟一個執行緒去執行該任務,任務的執行方法中,會執行該任務,任務執行完後去阻塞佇列去第一個執行緒執行,執行緒

池如果設定allowCoreThreadTimeOut或者執行緒數量大於核心執行緒數量,那麼會呼叫poll(keepAliveTime,TimeUnit.NANOSECONDS)去

佇列取執行緒任務,如果佇列為空則等待keepAliveTime單位時間後返回null並且關閉該執行緒,否則會呼叫take()方法去取執行緒,這個方法會阻塞該執行緒,知道阻塞佇列不為空,利用該方法可以保證核心執行緒數量的執行緒一直存在;當執行緒數量大於核心執行緒數量時會把執行緒新增到阻塞佇列,當阻塞佇列滿了,並且正在執行的執行緒數量+阻塞佇列中的執行緒數量<最大執行緒數量,會繼續呼叫addWorker執行上面的操作,否則會執行拒絕操作,丟擲異常

     2.一個模擬ThreadPoolExecutor的例子

 public final class ThreadPool {
     // 執行緒池中預設執行緒的個數為5
     private static int worker_num = 5;
     // 工作執行緒
     private WorkThread[] workThreads;
    
     // 任務佇列,作為一個緩衝,List執行緒不安全
     private List<Runnable> taskQueue = new LinkedList<Runnable>();

     private static ThreadPool threadPool;

     // 建立具有預設執行緒個數的執行緒池
     private ThreadPool() {
          this(5);
     }
    
     // 建立執行緒池,worker_num為執行緒池中工作執行緒的個數
     private ThreadPool(int worker_num) {
          ThreadPool.worker_num = worker_num;
          workThreads = new WorkThread[worker_num];
          for (int i = 0; i < worker_num; i++) {
               workThreads[i] = new WorkThread();
               workThreads[i].start();// 開啟執行緒池中的執行緒
          }
     }
      
     // 單態模式,獲得一個預設執行緒個數的執行緒池
     public static ThreadPool getThreadPool() {
          return getThreadPool(ThreadPool.worker_num);
     }
      
     // 單態模式,獲得一個指定執行緒個數的執行緒池,worker_num(>0)為執行緒池中工作執行緒的個數
     // worker_num<=0建立預設的工作執行緒個數
     public static ThreadPool getThreadPool(int worker_num1) {
          if (threadPool == null)
               threadPool = new ThreadPool(worker_num1);
          return threadPool;
     }
     
     // 執行任務,其實只是把任務加入任務佇列,什麼時候執行有執行緒池管理器覺定
     public void addTask(Runnable task) {
          synchronized (taskQueue) {
               
               taskQueue.add(task);
               taskQueue.notifyAll();
          }
     }
       
     // 銷燬執行緒池,該方法保證在所有任務都完成的情況下才銷燬所有執行緒,否則等待任務完成才銷燬
     public void destroy() {
          while (!taskQueue.isEmpty()) {// 如果還有任務沒執行完成,就先睡會吧
               try {
                    Thread.sleep(10);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
          }
          // 工作執行緒停止工作,且置為null
          for (int i = 0; i < worker_num; i++) {
               workThreads[i].stopWorker();
               workThreads[i] = null;
          }
          threadPool=null;
          taskQueue.clear();// 清空任務佇列
     }
      
     /**
      * 內部類,工作執行緒
      */
     private class WorkThread extends Thread {
          // 該工作執行緒是否有效,用於結束該工作執行緒
          private boolean isRunning = true;

          /*
           * 關鍵所在啊,如果任務佇列不空,則取出任務執行,若任務佇列空,則等待
           */
          @Override
          public void run() {
               Runnable r = null;
               while (isRunning) {// 注意,若執行緒無效則自然結束run方法,該執行緒就沒用了
                    synchronized (taskQueue) {
                         while (isRunning && taskQueue.isEmpty()) {// 佇列為空
                              try {
                                   taskQueue.wait(20);
                              } catch (InterruptedException e) {
                                   e.printStackTrace();
                              }
                         }
                         if (!taskQueue.isEmpty())
                              r = taskQueue.remove(0);// 取出任務
                    }
                    if (r != null) {
                         r.run();// 執行任務
                    }
                    r = null;
               }
          }
          
          // 停止工作,讓該執行緒自然執行完run方法,自然結束
          public void stopWorker() {
               isRunning = false;
          }
     }
}
               
     這個模擬的例子跟真正的執行緒池執行步驟還是有點出路,但是原理很像,可以幫助我們更好的理解

下一章將學習一下執行緒池具體的使用

相關推薦

Android執行原始碼解析

     上一篇部落格大概瞭解了下執行緒池是什麼,這篇部落格將在原始碼的基礎上去驗證上一篇部落格中提到的 Thread執行流程。我的部落格保證是一個字一個字敲出來的    1.執行緒池原始碼解析     在ThreadPoolExecutor類中,最核心的任務提交方法是e

併發程式設計的藝術-執行原始碼解析

執行緒池的作用: 1,降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。 2,提搞響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。 3,提高系統的客觀理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定

Java執行原始碼解析及高質量程式碼案例

引言 本文為Java高階程式設計中的一些知識總結,其中第一章對Jdk 1.7.0_25中的多執行緒架構中的執行緒池ThreadPoolExecutor原始碼進行架構原理介紹以及原始碼解析。第二章則分析了幾個違反Java高質量程式碼案例以及相應解決辦法。如有總結

執行原始碼解析

<span style="font-family: "microsoft yahei"; font-size: 12px; background-color: rgb(255, 255, 255);">轉載:</span> http://bl

ThreadPoolExecutor執行原始碼解析

關鍵構造屬性:    volatile int runState;                  保證了多執行緒的共享可見性    static final int RUNNING    = 0;          static final int SHUTDOWN   

java併發程式設計之執行,執行,ansync執行原始碼解析

前言 java開源長了, 程式碼久了,網上對於執行緒那是眾說紛紜,一直縈繞我心頭的,jdk執行緒池好還是spring執行緒池好

Dubbo 執行原始碼解析

本文首發於個人微信公眾號《andyqian》,期待你的關注! 前言 之前文章《Java執行緒池ThreadPool

Android執行(四)ThreadPoolExecutor類原始碼解析

使用ThreadPoolExecutor private final int CORE_POOL_SIZE = 4;//核心執行緒數 private final int MAX_POOL_SIZE = 5;//最大執行緒數 priv

一步步動手實現簡單的執行 —— 生動有趣解析 Java 執行原始碼

零、引子 某天小奈與小夥伴肥宅埋的日常技(cai)術(ji)研(hu)討(zhuo)中聊起了執行緒池。 自詡十分熟悉併發程式設計

Java之執行原始碼深入理解

在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題: 如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行

Java高階應用:執行全面解析

  什麼是執行緒池?   很簡單,簡單看名字就知道是裝有執行緒的池子,我們可以把要執行的多執行緒交給執行緒池來處理,和連線池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的複用。   執行緒池的好處 &n

Java多執行——FutureTask原始碼解析

一個很常見的多執行緒案例是,我們安排主執行緒作為分配任務和彙總的一方,然後將計算工作切分為多個子任務,安排多個執行緒去計算,最後所有的計算結果由主執行緒進行彙總。比如,歸併排序,字元頻率的統計等等。 我們知道Runnable是不返回計算結果的,如果想利用多執行緒的話,只能儲

四種Java執行用法解析

本文為大家分析四種Java執行緒池用法,供大家參考,具體內容如下 http://www.jb51.net/article/81843.htm 1、new Thread的弊端 執行一個非同步任務你還只是如下new Thread嗎? new Thread(new Runn

Android執行學習

在學習執行緒池之前需要先了解幾個java的執行緒池 1.newCachedThreadPool   建立一個可快取執行緒池,根據長度靈活回收,若無空閒執行緒,則新建執行緒 2.newFixedThreadPool  建立一個定長執行緒池,可控制執行緒最大併

017.多執行-執行原理解析以及合理配置

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveT

Android 執行模擬多執行併發下載任務

廢話不多,直接上原始碼 自定義一個Adapter public class MyAdapter extends BaseAdapter { private Context context; private List<Progress> list

Java 多執行ThreadPoolExecutor解析及Executors類中提供的靜態方法來建立執行

上面的程式碼可能看起來不是那麼容易理解,下面我們一句一句解釋:   首先,判斷提交的任務command是否為null,若是null,則丟擲空指標異常;   接著是這句,這句要好好理解一下: if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(c

Android 執行的使用

執行緒池優點 提到執行緒池就必須先說一下執行緒池的優點,執行緒池的優點可以概括為以下四點: * 重用執行緒池中的執行緒,避免因為執行緒的建立和銷燬所帶來的效能開銷; * 執行緒池旨線上程的複用,就避免了建立執行緒和銷燬執行緒所帶來的時間消耗,減少執行

android 執行的使用以及Executors的優缺點

android開發,大家最熟悉的肯定是主執行緒,也就是ui執行緒,也都知道在非ui執行緒更新介面會報錯提示不允許在子執行緒更新ui。但是耗時操作還是需要使用子執行緒,例如: new Thread(new Runnable() { @Override

執行原理解析

執行緒池的原始碼及原理[JDK1.6實現] 1.執行緒池的包含的內容 2.執行緒池的資料結構【核心類ThreadPoolExecutor】: worker:工作類,一個worker代表啟動了一個執行緒,它啟動後會迴圈執行workQueue裡面的所有任務 workQueu