Java併發--任務執行
大多數併發應用程式都是圍繞著任務進行管理的.
我們來看一小段程式碼:
package com.ivan.concurrent.charpter6;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 順序化的Web Server.
* @author root
* OS:Ubuntu 9.04
* Date:2010-6-19
*/
public class SingleThreadWebServer {
public static void main(String[] args) throws Exception {
ServerSocket server=new ServerSocket(8080);
while(true){
Socket socket=server.accept();
handleRequest(socket);
}
}
private static void handleRequest(Socket socket) {
/**
* 做相關的處理……, 比如請求運算與I/O
* 這將會導致出現阻塞, 會延遲當前請求的處理,
* 而且會產生非常嚴重的後果,比如: 假死。
* 那樣會極度考驗使用者的耐心,知道他忍無可忍的關閉瀏覽器。
* 同時,單執行緒在等待IO操作時,CPU處於閒置狀態,這樣也降低了資源的利用率
*
* 這樣的伺服器,缺乏良好的吞吐量和快速的響應性。
*/
}
}
上面的程式碼是順序地執行任務,主執行緒在不斷接受連線與處理請求之間交替執行。
一個Web請求會做相關的處理……, 比如請求運算與I/O
這將會導致出現阻塞, 會延遲當前請求的處理,
而且會產生非常嚴重的後果,比如: 假死。
那樣會極度考驗使用者的耐心,知道他忍無可忍的關閉瀏覽器。
同時,單執行緒在等待IO操作時,CPU處於閒置狀態,這樣也降低了資源的利用率
這樣的伺服器,缺乏良好的吞吐量和快速的響應性。
所以,基於上面程式碼的基礎上,我們需要給他作些小許的改進:
package com.ivan.concurrent.charpter6;
import java.net.ServerSocket;
import java.net.Socket;
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws Exception {
ServerSocket server=new ServerSocket(80);
while(true){
final Socket socket=server.accept();
new Thread(new Runnable(){
public void run() {
handleRequest(socket);
}
}).start();
}
}
protected static void handleRequest(Socket socket) {
/**
*相比較而言,這樣的處理方式有良好的改進:
* 1.執行人物的負載已經脫離主執行緒,讓主迴圈能更加迅速的重新開始等待下一個連線。提高了響應性
* 2.併發處理任務,多個請求可以同時得到處理,提高了吞吐性
* 3.任務處理程式碼必須要是執行緒安全的。防止出現併發性資料共享問題。
*
* 這個程式可能在開發階段執行良好,一旦部署,就可能出現致命的錯誤,
* 我們接著來分析:
*/
}
}
相比較而言,這樣的處理方式有良好的改進:
1.執行人物的負載已經脫離主執行緒,讓主迴圈能更加迅速的重新開始等待下一個連線。提高了響應性
2.併發處理任務,多個請求可以同時得到處理,提高了吞吐性
3.任務處理程式碼必須要是執行緒安全的。防止出現併發性資料共享問題。
這個程式可能在開發階段執行良好,一旦部署,就可能出現致命的錯誤,
我們接著來分析:
我們看到,上面的程式碼中,是為每個請求的到來,建立一個新的執行緒來處理, 那麼這樣就會有以下的問題出現:
無限建立執行緒的缺點:
1.執行緒生命週期的開銷
1.1.執行緒的建立與關閉並非是免費的,實際的開銷根據不同的OS有不同的處理.但是執行緒的建立的確需要時間,帶來處理請求的延遲.一般的Web Server的請求是很頻繁的,為每個請求建立一個執行緒,無非要耗費大量的資源.
2.資源消耗量
2.1. 活動的執行緒會消耗資源,尤其是記憶體.如果可執行的執行緒數多於可用的處理器數,執行緒將會空閒。大量的空閒執行緒佔用更多的記憶體,給垃圾回收器帶來壓力,而且,執行緒在競爭CPU的同時,也會帶來許多其他的效能開銷。所以,建議在有足夠多的執行緒讓CPU忙碌時,不要再建立多餘的執行緒.
3.應用的穩定性
3.1. 應該限制建立執行緒的數量,限制的數目根據不同的平臺而定,同時也受到JVM的啟動引數,Thread的建構函式中棧大小等因素的影響. 如果打破了這個限制,你很可能會得到一個OutOfMemoryError. 在一定範圍內增加執行緒可以提高系統的吞吐量,但是一旦超過這個範圍,再建立執行緒只會拖垮你的系統。甚至可能會導致應用程式的崩潰.
我們的解決辦法:
使用執行緒池,當然,你完全沒有必要自己寫一個執行緒池的實現(好吧,或許你跟我一樣,也希望能從重複創造輪子中,找到自己想要了解的東西),你可以利用 Executor框架來幫你處理,java.util.concurrent提供了一個靈活的執行緒池實現。在新的java類庫當中,任務執行的首要抽象不是Thread,而是Executor.
Executor僅僅是一個簡單的介面,但是它很強大,包括用於非同步任務的執行,支援不同型別的任務執行策略,為任務提交和任務執行之間的解藕,提供了標準的方式等等, 我們後續再重點討論。
Executor基於 生產者-消費者模式。提交任務的是生產者,執行任務的是消費者。 也就是說, 採用Executor框架實現 生產者-消費者模式,十分簡單。
package com.ivan.concurrent.charpter6;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class TaskExecutionWebServer{
private static final int NTHREADS=100;
//使用執行緒池來避免 為每個請求建立一個執行緒。
private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws Exception {
ServerSocket server=new ServerSocket(8011);
while(true){
final Socket socket=server.accept();
threadPool.execute(new Runnable(){
public void run() {
handleRequest(socket);
}
});
}
}
protected static void handleRequest(Socket socket) {
/**
*
*/
System.out.println(Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執行緒池:
執行緒池管理著一個工作者執行緒的同構池,執行緒池是與工作佇列緊密繫結的。工作佇列的作用就是持有所有等待執行的任務, 工作者佇列只需要從工作佇列中獲取到下一個任務,執行,然後回來等待下一個執行緒。
Java類庫中提供了以下幾種執行緒池:
1.newFixedThreadPool :建立定長的執行緒池,每當提交一個任務就建立一個執行緒,直到達到池的最大長度。
2.newCachedThreadPool:建立一個可快取的執行緒池,如果當前執行緒池的長度超過了處理的需要,它可以靈活的收回空閒執行緒,當需求增加時,它可以靈活新增新的執行緒,而並不對池的長度做任何限制
3.newSingleThreadExecutor:建立單執行緒化的executor,它只建立唯一的工作者執行緒來執行任務,如果這個執行緒異常結束,會有另外一個執行緒來取代它.它會保證任務按照任務佇列規定的順序來執行。
4.NewScheduledThreadPool:建立一個定長的執行緒池,而且支援定時的,以及週期性的任務執行,類似Timer.
Executor的生命週期:
它的建立已經說了,我們來看看它如何關閉, Executor 是為了執行任務而建立執行緒,而JVM通常會在所有非後臺執行緒退出後才退出,如果它無法正確的關閉,則會影響到JVM的結束。
這裡需要提一下,在我們瞭解如何關閉Executor的一些疑惑, 由於Executor是非同步執行任務,那麼這些任務的狀態不是立即可見的,換句話說,在任務時間裡,這些執行的任務中,有的可能已經完成,有的還可能在執行,其他的還可能在佇列裡面等待。 為了解決這些問題, Java引入了另外一個介面,它擴充套件了Executor,並增加一些生命週期的管理方法: ExecutorService.
ExecutorService表示生命週期有三種狀態: 執行,關閉,終止。
關閉和終止? 怎麼看上去是一個意思, 這裡我們先擱置著,留著後續來討論。
ExecutorService最初建立後的初始狀態就是執行狀態;
shutdown與shutdownNow方法,都是ExecutorService的關閉方法,區別在於:
shutdown:
會啟動一個平穩的關閉過程, 停止接受新任務,同時等待已經提交的任務完成(包括尚未開始執行的任務)
shutdownNow:
會啟動一個強制關閉的過程:嘗試取消所有執行中的任務和排在佇列中尚未開始的任務。
一旦所有任務全完成後,ExecutorService會轉到終止狀態, awaitTermination可以用來等待ExecutorService到達終止狀態,也可以輪詢isTerminated判斷ExecutorService是否已經終止。
package com.ivan.concurrent.charpter6;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
/**
* 執行緒池的生命週期是如何管理的?
* @author root
* OS:Ubuntu 9.04
* Date:2010-6-19
*/
public class LifeCycleWebServer {
private static final int NTHREADS=100;
private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
public void start() throws IOException{
ServerSocket server=new ServerSocket(8011);
while(exec.isShutdown()){
try {
final Socket socket=server.accept();
exec.execute(new Runnable(){
public void run() {
handleRequest(socket);
}
});
} catch (RejectedExecutionException e) {
if(!exec.isShutdown()){
//log.error(...)
}
}
}
}
protected void handleRequest(Socket socket) {
Request req=readRequest(socket);
if(isShutDown(req)){
stop();
}else{
dispatchRequest(req);
}
}
public void stop(){
exec.shutdown();
}
//~ Mock Object And Function..
private static class Request{
}
private Request readRequest(Socket socket) {
// TODO Auto-generated method stub
return null;
}
private boolean isShutDown(Request req) {
// TODO Auto-generated method stub
return false;
}
private void dispatchRequest(Request req) {
// TODO Auto-generated method stub
}
}
OK,瞭解了執行緒池的使用,這裡有必要介紹介紹執行策略,
執行策略:
簡單來說,就是任務執行的”What,When,Where,How”,包括:
1.任務在什麼執行緒中執行(what)
2.任務以什麼順序執行(fifo,lifo,優先順序)?
3.可以有多少個任務併發執行?(how many)
4.可以有多少個任務進入等待執行佇列
5.系統過載時,需要放棄一個任務,該挑選哪一個? 如何通知應用程式知道?
另外,java類庫中還提供有一種特別的任務,----可攜帶結果的任務:
Callable 和 Future
Runnable 作為任務的基本表達形式只是個相當有限的抽象; 它的侷限在於,不能返回一個值或者丟擲受檢查的異常。
通常,很多工都會引起嚴重的計算延遲,比如執行資料庫查詢,從網路下載資源,進行復雜的計算。對於這樣的任務,Callable是更佳的抽象: 它在主進入點,等待返回值,併為可能丟擲的異常預先作準備。
Runnable與Callable描述的都是首相的計算型任務,這些任務通常都是有限的。,任務的所生命週期分為4個階段: 建立、提交、開始和完成。
Future描述了任務的生命週期,並提供了相關的方法來獲取任務的結果、取消任務以及檢驗任務是否已經完成或者被取消。
Future的get方法取決於任務的狀態, 如果任務已經完成,get會立即返回或者丟擲異常,如果任務沒有完成,get會阻塞直到它的完成。
建立Future的方法有很多, ExecutorService的submit會返回一個Future,你可以將一個Callable或者Runnable提交給executor,然後得到一個Future,用它來重新獲得任務執行的結果,或者取消任務。
你也可以顯示的為給定的Callable和Runnable例項化一個FutureTask.
OK, 前面介紹了很多關於併發的理論知識,下面我們來看看,如果尋找可強化的併發性。
首先,我們從一個例子開始, 開始之前,簡單介紹一下這個例子所要表達的事情:
它的來源是瀏覽器程式中渲染頁面的那部分功能, 首先獲取HTML,並將它渲染到影象快取裡。為了簡單起見,我們假設HTML只有文字標籤。 OK, 開始吧。
首先,如果按照一般的處理方式,我們會這樣做:
1.遇到文字標籤,將它渲染到影象快取中
2.當遇到的是一個圖片標籤,我們通過網路獲取它,再將它放到快取裡面。
很明顯,這是最簡單的方式, 它很容易實現,但是,問題在於,你這樣做,是在考驗使用者的耐心,結果就是他會對著螢幕丟一句 ****.然後毫不猶豫的關掉瀏覽器.
另外一種方法:
它先渲染文字,併為影象預留出佔位符;在完成第一趟文字處理後,程式返回開始,並下載影象,將它們繪製到佔位符上去。 但是這樣的問題也很明顯, 需要最少2次的文件處理, 其效能與效率稍有提升,但是還不足解決使用者希望快速瀏覽頁面的需求。
為了使我們的渲染器具有更高的併發性,我們需要做的第一步就是, 將渲染過程分為兩部分: 一個用來渲染文字,一個用來下載所有影象。(一個受限於CPU,另外一個受限於IO, 即使在單CPU系統上,效率的提升也很明顯。)
Callable與Future可以用來表達所有協同工作的任務之間的互動。我們來看程式碼:
package com.ivan.concurrent.charpter6;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureRenderer {
private static final int NTHREADS=100;
private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
void renderPage(CharSequence source){
final List<ImageInfo> imageinfos=scanForImageInfo(source);
Callable<List<ImageData>> task=
new Callable<List<ImageData>>(){
public List<ImageData> call() throws Exception {
List<ImageData> result=new ArrayList<ImageData>();
for(ImageInfo imageinfo:imageinfos){
result.add(imageinfo.downloadImage());
}
return result;
}
};
Future<List<ImageData>> future=exec.submit(task);
//保證渲染文字與下載影象資料併發執行。
renderText(source);
try {
/**
* 到達需要所有影象的時間點時,主任務會等待future.get呼叫的結果,
* 幸運的話,我們請求的同時,下載已經完成,即使沒有,下載也已經預先開始了。
*
* 這裡還有一定的侷限性, 使用者可能不希望等待所有圖片下載完成後才可以看見,
* 他希望下載完成一張圖片後,就可以立即看到。 …… 這裡還待優化。
*/
List<ImageData> imageData=future.get();
for(ImageData data:imageData){
reanderImage(data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.cancel(true);//取消任務
}catch(ExecutionException e){
e.printStackTrace();
}
}
private void renderText(CharSequence source) {
// TODO Auto-generated method stub
}
private void reanderImage(ImageData data) {
// TODO Auto-generated method stub
}
private List<ImageInfo> scanForImageInfo(CharSequence source) {
// TODO Auto-generated method stub
return null;
}
}
CompletionService: 當executorService遇到BlockingQueue
CompletionService整合了Executor和BlockingQueue的功能,你可以將Callable任務提交給它去執行,然後使用類似於佇列中的take和poll方法,在結果完成可用時,獲得這個結果,像一個打包的Future.
我們利用它來為我們的渲染器需要優化的地方做些處理,程式碼如下:
package com.ivan.concurrent.charpter6;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureRenderer2 {
private static final int NTHREADS=100;
private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
void renderPage(CharSequence source){
final List<ImageInfo> imageinfos=scanForImageInfo(source);
CompletionService<ImageData> completionService=new ExecutorCompletionService<ImageData>(exec);
for(final ImageInfo imageinfo:imageinfos){
completionService.submit(new Callable<ImageData>(){
public ImageData call() throws Exception {
//提高效能點一: 將順序的下載,變成併發的下載,縮短下載時間
return imageinfo.downloadImage();
}
});
}
renderText(source);
try {
for(int i=0;i<imageinfos.size();i++){
Future<ImageData> f=completionService.take();
//提高效能點二: 下載完成一張圖片後,立刻渲染到頁面。
ImageData imagedata=f.get();
reanderImage(imagedata);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch(ExecutionException e){
e.printStackTrace();
}
}
private void renderText(CharSequence source) {
// TODO Auto-generated method stub
}
private void reanderImage(ImageData data) {
// TODO Auto-generated method stub
}
private List<ImageInfo> scanForImageInfo(CharSequence source) {
// TODO Auto-generated method stub
return null;
}
}