unity 2D 學習筆記(1)
1、簡介
在Java中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源,且虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。所以如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立銷燬會大大降低系統性能。執行緒池的目的就是將執行緒複用,統一管理,以減少這類消耗,從而提高效能。
2、利用ThreadPoolExecutor建立和使用執行緒池
2.1 ThreadPoolExecutor的構造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException();this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
建立一個執行緒池需要輸入幾個引數:
1、corePoolSize:核心執行緒的數量。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法來預建立執行緒,在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。
2、maximumPoolSize:執行緒池中最大執行緒數量,執行緒池允許建立的最大執行緒數。這個引數大部分時候都與corePoolSize數值相同,具體還得是業務場景需求而定。
3、keepAliveTime:表示執行緒空閒多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即預設只作用於池中的非核心執行緒。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0。
4、TimeUnit:為引數keepAliveTime的時間單位。
5、workQueue:用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
5.1、ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
5.2、LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。
5.3、SynchronousQueue:一個不儲存元素的阻塞佇列。因為沒有儲存功能,因此put和take會一直阻塞,直到有另一個執行緒已經準備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者準備好獲取交付的工作時,才適合使用同步佇列。
5.4、PriorityBlockingQueue:一個具有優先順序得無限阻塞佇列,在建構函式需傳入comparator,用於插入元素時繼續排序,若沒有傳入comparator,則插入的元素必須實現Comparatable介面。
6、ThreadFactory:執行緒工廠,主要用來建立執行緒。
7、RejectedExecutionHandler:飽和策略,當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
7.1、AbortPolicy:表示無法處理新任務時丟擲異常,預設策略。
7.2、CallerRunsPolicy:直接在 execute 方法的呼叫執行緒中執行被拒絕的任務。
7.3、DiscardOldestPolicy:丟棄佇列最前面的任務,然後嘗試執行當前任務。
7.4、DiscardPolicy:不處理直接丟棄掉。
2.2 向執行緒池提交任務及原始碼分析
1、使用execute提交任務,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行,但是execute方法沒有返回值,所以無法判斷任務知否被執行緒池執行成功。
2、使用submit提交任務,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果。
具體看下execute:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
這裡說明下兩個點:
- Worker和Task的概念:兩者都是runnable,Worker是當前執行緒池中的執行緒,可以看做為工作人員,而Task雖然是runnable,但是並沒有真正執行,而只是被Worker呼叫了run方法,相當於工作人員的任務。
- maximumPoolSize,corePoolSize和workQueue的概念:corePoolSize是核心執行緒池的大小,決定了執行緒池裡的“常駐工作人員”Worker的數量;workQueue是任務佇列,相當於“工作人員”Worker的任務量;maximumPoolSize是執行緒池最大容量,就是池裡能容納“工作人員”Worker的最大數量,上面說了往往它和 corePoolSize一樣大,但如果在一段併發高峰的時段,這時maximumPoolSize設定比corePoolSize大的作用就有了體現,在所有核心執行緒滿負荷工作,及workQueue任務堆積滿了的情況下,執行緒池可以建立一定數量(maximumPoolSize-corePoolSize)“臨時工”Woker去處理溢位的任務,等過了高峰時段,“臨時工”Woker過了空閒時間就會被回收,從而不會過多佔用系統資源。
執行緒池執行流程:
下面來看execute核心方法:addWorker
Worker的增加和Task的獲取以及終止都是在此方法中實現的,也就是這一個方法裡面包含了很多東西。在addWorker方法中提到了runState的概念,runState是執行緒池的核心概念,我們先看下相關宣告及方法:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
程式碼中可以看到,原子量ctl包含了:runState(執行緒池當前狀態)和workerCount(工作執行緒worker的數量),其中ctl的前三位代表runState,後29位代表workerCount。通過runStateOf和workerCountOf能分別獲取執行緒池狀態和工作執行緒worker的數量。
runState代表整個執行緒池的執行生命週期,有如下取值:
1.RUNNING:可以新加任務,同時可以處理queue中的任務。
2.SHUTDOWN:不增加新任務,但是處理queue中的任務。
3.STOP:不增加新任務,同時不處理queue中的任務。
4.TIDYING:所有的任務都終止處理了(queue中),同時workerCount為0,那麼此時進入TIDYING
5.terminated():方法結束,變為TERMINATED
繼續看addWorker的程式碼:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
按流程下來,先是判斷runState是否為running,不是的話在判斷是否為shutdown,若是則firstTask與workQueue是否為空,這一系列判斷對應了上面提到的執行緒池處於什麼狀態所執行任務的邏輯。
接著就是判斷沒有到corePoolSize 或maximumPoolSize上限時,那麼允許新增工作執行緒,CAS增加Worker的數量後,跳出迴圈。
後面就是例項化Worker,並且將workers是HashSet執行緒不安全的,那麼此時需要加鎖,所以mainLock.lock(); 之後重新檢查執行緒池的狀態,如果狀態不正確,那麼呼叫addWorkerFailed減小Worker的數量。如果狀態正常,那麼新增Worker到workers。新增成功後即呼叫Worker中Thread的start方法,以執行Worker中的runWorker方法。
接下來看runWorker方法的程式碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
這個方法就是讓工作執行緒Worker處理任務,任務來源有兩處:一是firstTask,就是新建Worker時提交的優先任務;二是通過getTask方法去workQueue獲取任務。
這裡說下beforeExecute與afterExecute,這兩個方法適用於拓展的,通過自己繼承ThreadPoolExecutor實現執行緒池類,重寫這兩個方法,可以作日誌列印,執行情況統計等功能。
最後就是重點的getTask:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
這裡開始也是照常判斷runState的狀態,再判斷Worker是否需要過期,true的話使用poll取任務。如果正常返回,那麼返回取到的task。如果超時,證明Worker空閒,同時Worker超過了corePoolSize,需要刪除。false的話則是take取任務,有的話返回,沒有獲取到時將一直阻塞,知道獲取到或者中斷。
2.3、執行緒池的關閉
有兩個方法可以提供執行緒池的關閉,分別是shutDown 和 shutDownNow。但是它們的實現原理不同,shutdown的原理是隻是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。shutdownNow的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表。
3、如何構建健壯的執行緒池
3.1 如何合理的估算執行緒池大小
在建立執行緒池時,幾乎應該都碰見過這問題,設定核心執行緒池多大合適,阻塞佇列設定多大合適。
《Java併發程式設計實戰》中有提到:如果是CPU密集型應用,則執行緒池大小設定為N+1;如果是IO密集型應用,則執行緒池大小設定為2N+1;N為CPU數目。當然這只是初略的估算。書中還提到了更為詳細的計算公式:
N[CPU數目]*P[CPU利用率]*(1+I[等待時間] / C[計算時間])
其中等待時間I就是執行緒中發生IO等阻塞操作是花費的時間,計算時間C則是執行緒執行計算花費的時間,可以看出,等待時間越長,為了充分利用cpu,執行緒池大小就應該越大。
3.2 輔助計算執行緒池大小程式
在網上找到的一個估算方法PoolSizeCalculator ,先看程式碼(來源:https://github.com/nschlimm/playground/blob/master/java7-playground/src/main/java/com/schlimm/java7/nio/threadpools/PoolSizeCalculator.java):
/** * Calculates the boundaries of a thread pool for a given {@link Runnable}. * * @param targetUtilization * the desired utilization of the CPUs (0 <= targetUtilization <= 1) * @param targetQueueSizeBytes * the desired maximum work queue size of the thread pool (bytes) */ protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) { calculateOptimalCapacity(targetQueueSizeBytes); Runnable task = creatTask(); start(task); start(task); // warm up phase long cputime = getCurrentThreadCPUTime(); start(task); // test intervall cputime = getCurrentThreadCPUTime() - cputime; long waittime = (testtime * 1000000) - cputime; calculateOptimalThreadCount(cputime, waittime, targetUtilization); }
這裡方法的兩個入參,分別是期望的CPU利用率(0-1),及期望的阻塞任務佇列的記憶體大小(byte)。calculateOptimalCapacity是計算預期佇列的任務數量。
private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) { long mem = calculateMemoryUsage(); BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP); System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes); System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem + " bytes in a queue"); System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem); System.out.println("* Recommended queue capacity (bytes): " + queueCapacity); } public long calculateMemoryUsage() { BlockingQueue<Runnable> queue = createWorkQueue(); for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) { queue.add(creatTask()); } long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); queue = null; collectGarbage(15); mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); queue = createWorkQueue(); for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) { queue.add(creatTask()); } collectGarbage(15); mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); return (mem1 - mem0) / SAMPLE_QUEUE_SIZE; }
start與getCurrentThreadCPUTime是為了計算執行緒執行的等待時間與計算時間,後者可以通過ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime()實現
public void start(Runnable task) { long start = 0; int runs = 0; do { if (++runs > 5) { throw new IllegalStateException("Test not accurate"); } expired = false; start = System.currentTimeMillis(); Timer timer = new Timer(); timer.schedule(new TimerTask() { public void run() { expired = true; } }, testtime); while (!expired) { task.run(); } start = System.currentTimeMillis() - start; timer.cancel(); } while (Math.abs(start - testtime) > EPSYLON); collectGarbage(3); }
最後說下使用及效果,這裡cpu利用率設定的是0.5,佇列大小為100000byte:
public class SimplePoolSizeCaculatorImpl extends PoolSizeCalculator { @Override protected Runnable creatTask() { return new AsyncIOTask(); } @Override protected BlockingQueue createWorkQueue() { return new LinkedBlockingQueue(1000); } @Override protected long getCurrentThreadCPUTime() { return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime(); } public static void main(String[] args) { PoolSizeCalculator poolSizeCalculator = new SimplePoolSizeCaculatorImpl(); poolSizeCalculator.calculateBoundaries(new BigDecimal(0.5), new BigDecimal(100000)); } }
任務樣例:
class AsyncIOTask implements Runnable { @Override public void run() { HttpURLConnection connection = null; BufferedReader reader = null; try { String getURL = "http://baidu.com"; URL getUrl = new URL(getURL); connection = (HttpURLConnection) getUrl.openConnection(); connection.connect(); reader = new BufferedReader(new InputStreamReader( connection.getInputStream())); String line; while ((line = reader.readLine()) != null) { // System.out.println(line); } } catch (IOException e) { } finally { if(reader != null) { try { reader.close(); } catch(Exception e) { } } connection.disconnect(); } } }
執行結果,上面*號開頭的是計算出來的佇列大小,下面*號計算出來的是執行緒池大小:
注:如果一直執行失敗,可嘗試把PoolSizeCalculator 中的精準度EPSYLON屬性設定大點,如50。
參考資料:
深入理解java執行緒池—ThreadPoolExecutor
https://www.jianshu.com/p/ade771d2c9c0
如何合理地估算執行緒池大小
http://ifeve.com/how-to-calculate-threadpool-size/
Threading stories: about robust thread pools
http://niklasschlimm.blogspot.com/2012/03/threading-stories-about-robust-thread.html#more
Determining Memory Usage in Java