關於執行緒池的幾個問題
這裡以java中的 ThreadPoolExecutor 執行緒池來說明:
1:執行緒池的幾個重要的引數,這幾個引數有什麼作用?
線上程中有 核心池大小,最大池大小,任務佇列 這幾個比較重要的引數。
以下是幾個重要引數在原始碼中的體現
// 核心池大小 private volatile int corePoolSize; // 最大池大小 private volatile int maximumPoolSize; // 任務佇列 private final BlockingQueue<Runnable> workQueue;
作用:
corePoolSize:當需要執行的任務數量小於corePoolSize則新建立一個執行緒,並且執行任務,如果執行任務數大於corePoolSize則將任務,則將需要執行的任務放入到workQueue佇列中,以下是程式碼:
// 小於核心池大小,則新增一個執行緒然後執行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
// 當任務數大於等於核心池大小,則放到佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
maximumPoolSize:當佇列中的任務已經新增滿了,再往池中新增任務,如果 池中執行緒數<maximumPoolSize則會新建執行緒,反之則廢棄該任務
// 新增到佇列中失敗,則會新建執行緒去執行任務 else if(!addWorker(command, false)) reject(command);
workQueue:當池中的執行緒數大於等於核心池的大小,則將需要執行的任務放入到佇列workQueue中。
2:執行緒池的工作流程是什麼?
分為3中場景的流程:1:池中執行緒數<核心池數 2:池中執行緒數大於等於核心池數但小於最大池數 3:池中執行緒=最大池數
場景1:新建一個執行緒並且執行任務,進入到 addWorker 的方法中:
// 新增一個任務到workSet中,core 為true private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取執行緒池的執行狀態 int rs = runStateOf(c); …… for (;;) { // 執行緒池中執行的執行緒數 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 原子增加 執行執行緒的數量 if (compareAndIncrementWorkerCount(c)) // 跳出迴圈 進入迴圈體外的邏輯 break retry; …… } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 將任務封裝成Worker物件 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 這裡通過加鎖,保證執行緒安全 保證新增任務安全 mainLock.lock(); try { …… int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將任務放到 workers 集合中 在鎖中新增 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 新增成功後執行任務 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
下面看看具體的執行方法 即 t.start(); 方法的具體邏輯,程式碼如下:
//執行 start方法 會執行run 方法 public void run() { runWorker(this); }
下面是runWorker 的方法:task != null表示需要執行的任務不為空,在池中執行緒<核心池大小為true getTask():從佇列中獲取任務並且執行。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; try { // 進入while迴圈,使該執行緒可以一直處理任務,執行緒不會執行完一次任務就退出,線上程池退出前,執行緒一直工作,以免執行緒資源的浪費 while (task != null || (task = getTask()) != null) { w.lock(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 具體執行任務 task.run(); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
當這個執行緒執行完任務之後,並不會銷燬執行緒,而是不斷遍歷佇列中的任務,如果佇列中有任務,則執行相應的任務。這個是執行緒池的主要實現邏輯,即達到了執行緒複用的目的。
場景2:池中執行緒數大於等於核心池數但小於最大池數
// 這裡將任務放入到佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
放入到佇列中,具體的執行時刻是在,場景1中的 while迴圈中的getTask() 獲取到任務,然後執行
場景3:
// 將任務加到workerSet中,並且新建一個執行緒去執行,這裡的core標識為false, 其餘邏輯和場景1中是一致的 else if (!addWorker(command, false)) reject(command);
3:如何設定核心池的大小?如何計算?
如果是計算密集型,這時是不需要切換執行緒來計算的,開啟的執行緒數為 CPU個數
如果是IO阻塞密集,則根據實際的阻塞時間來進行計算,如:執行計算時間 為 a 阻塞時間為 b 這時執行緒池的大小應該設定為 (b/a+1)*cpu 核數 這個只是理論指導值