Java併發程式設計筆記4-執行緒池
我們使用執行緒的時候就去建立一個執行緒,但是就會有一個問題:
如果併發的執行緒數量非常多,而且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會導致大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?
執行緒池正好能解決這樣的問題。正如名稱所稱的那樣,執行緒池管理一個工作者執行緒的同構池。執行緒池是與工作佇列緊密繫結的。所謂工作佇列,其作用是持有所有等待執行的任務。
工作者執行緒的生活從此輕鬆起來:它從工作佇列中獲取下一個任務,執行它,然後回來等待另外一個執行緒。
這類似於企業應用程式中事務監聽器(transaction monitor)的角色:它將課執行事務的數量控制在一個合理的水平中,不會因過渡濫用事務而耗盡有限資源。
執行緒池中執行任務執行緒,這方法有很多“每任務每執行緒”無法筆記的優勢。重用存在的執行緒,而不是建立新的執行緒,這可以在處理多請求時抵消執行緒建立,消亡產生的開銷。還有一個好處就是,在請求到達時,工作者執行緒通常已經存在
,用於建立執行緒的等待時間並不會延遲任務的執行,因此提高響應性。通過適當地調整執行緒池的大小,你可以得到足夠多的執行緒以保持處理器忙碌,同時可以還防止過多的執行緒互相競爭資源,導致應用程式耗盡記憶體或者失敗。
每任務每執行緒例子如下:
public class ThreadPool { public static void main(String[] args) throwsIOException { ServerSocket serverSocket = new ServerSocket(80); while (true){ final Socket socket = serverSocket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(socket); } };new Thread(task).start(); } } private static void handleRequest(Socket socket) { } }
可以看到這個例子是一個粗製濫造的併發服務端,來一個使用者就建立一個執行緒,你根本就不知道有多少使用者來,要建立多少個執行緒。這樣頻繁建立執行緒就會導致大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間,
過渡濫建立執行緒而耗盡有限資源。
由於這原因,java中給我們提供Executor框架。通過Executors中的某個靜態工廠方法來建立一個執行緒池:
1.newFixedThreadPool 建立一個定長的執行緒池,每當提交一個任務就建立一個執行緒,知道達到池的最大長度,這時執行緒池會保持長度不再變化(如果一個執行緒由於非預期的Exception而結束,執行緒池會補充一個新的執行緒)。
下面用newFixedThreadPool 建立一個定長的執行緒池來改造上面的例子,如下:
public class ThreadPool { public static void main(String[] args) throws IOException { //newFixedThreadPool引數為執行緒池的大小 Executor executor = Executors.newFixedThreadPool(100); ServerSocket serverSocket = new ServerSocket(80); while (true){ final Socket socket = serverSocket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(socket); } }; //直接將任務丟進執行緒池來執行任務 executor.execute(task); } } private static void handleRequest(Socket socket) { } }
這樣就不會發生過渡濫建立執行緒而耗盡有限資源。
2.newSingleThreadExecutor建立一個單執行緒化的executor,他只建立唯一的工作者執行緒來執行任務,如果這個執行緒異常結束,會有另一個取代它。executor會保證任務依照任務佇列所規定的順序(FIFO,LIFO,優先順序)執行。
3.newCachedThreadPool建立一個可快取的執行緒池,如果當前執行緒的長度超過了處理的需要,它可以靈活的回收空閒的執行緒,當需求增加時,它可以靈活的增加新的執行緒,並不會對池的長度做任何限制。但是認為改執行緒池的長度沒有任何限制,有可能會把資源耗盡,
這需要自己很好的把控了。
4.newScheduledThreadPool建立一個定長的執行緒池,而且支援定時的以及週期性的任務執行,類似於Timer。
Executor的生命週期:
Executor實現通常知識為執行任務而建立執行緒。但是JVM會在所有(非後臺的,nondaemon)執行緒全部終止後才退出。因此,如果無法正確關閉Executor,將會阻止JVM的結束。
因為Executor是非同步地執行任務,所以在任何時間裡,所有之前提交的任務狀態都不能立即可見。這些任務中,有些可能已經完成,有些可能正在執行,其他的還可能在佇列中等待執行。關閉應用程式時,程式會出現很多中情況:從平緩關閉
到最突然的關閉,以及介於這兩種階段情況之間的各種可能。Executor是為應用服務提供服務的,他們理應可以關閉,無論是平緩還是突然。
注意:關閉操作還會影響到記錄應用程式任務狀態的反饋資訊。
Executor就是一個介面,原始碼如下圖:
我們可以進入Executors這個類的原始碼,如下:
可以看到newFixedThreadPool 建立一個定長的執行緒池,返回的是一個ExecutorService,但是我們上面例子接收的是Executor,為什麼Executor也可以接收呢?我們繼續進入ExecutorService原始碼如下:
可以看到原來ExecutorService繼承了Executor。ExecutorService擴充套件了Executor,並且添加了一些用於宣告週期管理的方法。
原始碼如下:
ExecutorService暗示了生命週期有3種狀態:執行、關閉、終止。ExecutorService最初建立後的初始狀態是執行狀態。
shutdown方法會啟動一個平緩的關閉過程:停止接受新的任務,同時等待已經提交的任務完成,包括尚未開始執行的任務。
shutdownNow方法會啟動要給強制的關閉過程:嘗試取消所有執行中的任務和排在佇列中尚未開始執行的任務is。
isShutdown方法:判斷執行緒池(即ExecutorService)是否關閉。
isTerminated方法:是執行緒池(即ExecutorService)是否進入終止狀態。
在關閉後提交到ExecutorService中的任務,會被拒絕執行處理器(rejected execution handler)處理。拒絕執行處理器是ExecutorService的一種實現,ThreadPoolExecutor提供的,ExecutorService介面中的方法並不提供拒絕執行處理器。拒絕執行處理器可能只是
簡單的放棄任務,也可能會引起execute丟擲一個未檢查的RejectedExecutionException。一旦所有任務全部完成後,ExecutorService會轉入終止狀態。
awaitTermination方法:等待ExecutorService到達終止狀態。
通常shutdown會緊隨awaitTermination之後,這樣可以產生同步地關閉ExecutorService的效果。
上面的Executor的例子的程式是沒辦法關閉執行緒池,會一直跑下去,那麼我們如何寫一個支援關閉的webserver呢?
明顯我們現在要用ExecutorService來改造上面的Executor的例子。虛擬碼如下:
public class ThreadPool { private static ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { //newFixedThreadPool引數為執行緒池的大小 ServerSocket serverSocket = new ServerSocket(80); //這裡就不再像上面的例子一樣無限的接受任務了,要根據我的執行緒池是否處於關閉狀態來決定 while (!executorService.isShutdown()){ final Socket socket = serverSocket.accept(); try{ executorService.execute(new Runnable() { public void run() { handleRequest(socket); } }); }catch (RejectedExecutionException e){ //如果拒絕服務不是因為我執行緒池關閉導致的,我們要在這裡列印一下日誌 if (!executorService.isShutdown()){ System.out.println("接受任務被拒絕"); throw e; } } } } //用一個公共的方法去關閉執行緒池 public void stop(){ executorService.shutdown(); } private static void handleRequest(Socket socket) { //獲取請求 Request req = readRequest(socket); //如果請求已經關閉 if (isShutdownRequest(req)){ //關閉執行緒池 stop(); }else { //請求轉發 dispatchRequest(req); } } }
經過改造,這服務端變的優雅多了。
延時的,並具有周期性的任務
在newScheduledThreadPool出來之前我們一般會用Timer和TimerTask來做。Timer在JDK裡面,是很早的一個API了。
但是Timer存在一些缺陷,Timer只建立唯一的執行緒來執行所有Timer任務。如果一個timer任務的執行很耗時,會導致其他TimerTask的時效準確性出問題。例如一個TimerTask每10秒執行一次,
而另外一個TimerTask每40ms執行一次,重複出現的任務會在後市的任務完成後快速連續的被呼叫4次,要麼完全“丟失”4次呼叫。
Timer的另外一個問題在於,如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時
已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。
現在我們看一下Timer的例子,如下:
public class Shedule { private static long start; public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() { System.out.println(System.currentTimeMillis()-start); try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); //啟動一個排程任務,1S鍾後執行 timer.schedule(task,1000); //啟動一個排程任務,3S鍾後執行 timer.schedule(task1,3000); } }
上面程式我們預想是第一個任務執行後,第二個任務3S後執行的,即輸出一個1000,一個3000.
實際執行結果如下:
實際執行結果並不如我們所願。世界結果,是過了4S後才輸出第二個任務,即4001約等於4秒。那部分時間時間到哪裡去了呢?那個時間是被我們第一個任務的sleep所佔用了。
現在我們在第一個任務中去掉Thread.sleep();這一行程式碼,執行是否正確了呢?如下:
public class Shedule { private static long start; public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() { System.out.println(System.currentTimeMillis()-start); } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); //啟動一個排程任務,1S鍾後執行 timer.schedule(task,1000); //啟動一個排程任務,3S鍾後執行 timer.schedule(task1,3000); } }
執行結果如下:
可以看到確實是第一個任務過了1S後執行,第二個任務在第一個任務執行完後過3S執行了。
這就說明了Timer只建立唯一的執行緒來執行所有Timer任務。如果一個timer任務的執行很耗時,會導致其他TimerTask的時效準確性出問題。
Timer存在一些缺陷,因此你應該考慮使用ScheduledThreadPoolExecutor作為替代品。你可以通過建構函式或者通過newScheduledThreadPool工廠方法建立一個ScheduledThreadPoolExecutor。
如下:
public class Shedule { private static long start; private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() { System.out.println(System.currentTimeMillis()-start); try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); //TimeUnit.MILLISECONDS指定毫秒為單位 executorService.schedule(task,1000, TimeUnit.MILLISECONDS); executorService.schedule(task1,3000, TimeUnit.MILLISECONDS); } }
執行結果如下:
可以看到執行結果符合預期。
可以看到如果一個timer任務的執行很耗時(例如Thread.sleep),ScheduledThreadPoolExecutor並不會導致其他TimerTask的時效準確性出問題。
還可以看到,這兩個TimerTask互不干擾。
互相干擾還有一個反面:
如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時
已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。
例子如下:
public class Shedule { private static long start; private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() { System.out.println(System.currentTimeMillis()-start); throw new RuntimeException(); } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); timer.schedule(task,1000); timer.schedule(task1,3000); } }
如果第一TimerTask出現未知異常,第二個TimerTask還能執行起來嗎?
結果如下:
明顯第一TimerTask出現未知異常,第二個TimerTask不能執行起來了。這就說明
如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時
已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。
ScheduledThreadPoolExecutor可以解決此問題,例子如下:
public class Shedule { private static long start; private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); public static void main(String[] args) { TimerTask task = new TimerTask() { public void run() {throw new RuntimeException(); } }; TimerTask task1 = new TimerTask() { @Override public void run() { System.out.println(System.currentTimeMillis()-start); } }; Timer timer = new Timer(); start = System.currentTimeMillis(); //TimeUnit.MILLISECONDS指定毫秒為單位 executorService.schedule(task,1000, TimeUnit.MILLISECONDS); executorService.schedule(task1,3000, TimeUnit.MILLISECONDS); } }
執行結果如下:
可以看到第一個執行緒掛了,第二個執行緒並沒有受到影響。這就說明了ScheduledThreadPoolExecutor可以解決了
如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時
已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了的問題
還要注意一點,Timer是和系統時間掛鉤的,如果當前伺服器的時間一改,Timer就不那麼靠譜了。
還要注意的是ThreadPoolExecutor。
原始碼如下圖:
可以看到new一個FixedThreadPool/newSingleThreadExecutor/newCachedThreadPool/newScheduledThreadPool實際上返回的都是是new ThreadPoolExecutor()。
我們再看一下ThreadPoolExecutor原始碼如下:
可以看到ThreadPoolExecutor配置的非常靈活,如果我們用普通的一個FixedThreadPool/newSingleThreadExecutor/newCachedThreadPool/newScheduledThreadPool沒辦法滿足你的需求了,你可以用
ThreadPoolExecutor靈活的指定引數來完成你的需求。這適合精確的任務執行。還不如說我們的任務被拒絕(RejecedExecutionHandler)後,我們可以用ThreadPoolExecutor靈活處理