1. 程式人生 > >Java併發--任務執行

Java併發--任務執行

大多數併發應用程式都是圍繞著任務進行管理的.

 我們來看一小段程式碼:

package com.ivan.concurrent.charpter6;

import java.net.ServerSocket;
import java.net.Socket;

/**
 * 順序化的Web Server.
 * @author root
 * OS:Ubuntu 9.04
 * Date:2010-6-19
 */
public class SingleThreadWebServer {
 public static void main(String[] args) throws Exception {
  ServerSocket server=new ServerSocket(8080);
  while(true){
   Socket socket=server.accept();
   handleRequest(socket);
  }
 }

 private static void handleRequest(Socket socket) {
  /**
   * 做相關的處理……, 比如請求運算與I/O
   *   這將會導致出現阻塞,  會延遲當前請求的處理,
   *   而且會產生非常嚴重的後果,比如: 假死。
   *    那樣會極度考驗使用者的耐心,知道他忍無可忍的關閉瀏覽器。
   *   同時,單執行緒在等待IO操作時,CPU處於閒置狀態,這樣也降低了資源的利用率
   *  
   *  這樣的伺服器,缺乏良好的吞吐量和快速的響應性。
   */
 }
}

上面的程式碼是順序地執行任務,主執行緒在不斷接受連線與處理請求之間交替執行。
一個Web請求會做相關的處理……, 比如請求運算與I/O
 這將會導致出現阻塞,  會延遲當前請求的處理,
 而且會產生非常嚴重的後果,比如: 假死。
 那樣會極度考驗使用者的耐心,知道他忍無可忍的關閉瀏覽器。
 同時,單執行緒在等待IO操作時,CPU處於閒置狀態,這樣也降低了資源的利用率
 這樣的伺服器,缺乏良好的吞吐量和快速的響應性。

所以,基於上面程式碼的基礎上,我們需要給他作些小許的改進:

package com.ivan.concurrent.charpter6;

import java.net.ServerSocket;
import java.net.Socket;

public class ThreadPerTaskWebServer {
 public static void main(String[] args) throws Exception {
  ServerSocket server=new ServerSocket(80);
  while(true){
   final Socket socket=server.accept();
   new Thread(new Runnable(){
    public void run() {
     handleRequest(socket);
    }
   }).start();
  }
 }

 protected static void handleRequest(Socket socket) {
  /**
   *相比較而言,這樣的處理方式有良好的改進:
   * 1.執行人物的負載已經脫離主執行緒,讓主迴圈能更加迅速的重新開始等待下一個連線。提高了響應性
   * 2.併發處理任務,多個請求可以同時得到處理,提高了吞吐性
   * 3.任務處理程式碼必須要是執行緒安全的。防止出現併發性資料共享問題。
   *
   * 這個程式可能在開發階段執行良好,一旦部署,就可能出現致命的錯誤,
   * 我們接著來分析:
   */
 }
}

    相比較而言,這樣的處理方式有良好的改進:
    1.執行人物的負載已經脫離主執行緒,讓主迴圈能更加迅速的重新開始等待下一個連線。提高了響應性
     2.併發處理任務,多個請求可以同時得到處理,提高了吞吐性
     3.任務處理程式碼必須要是執行緒安全的。防止出現併發性資料共享問題。
    
 這個程式可能在開發階段執行良好,一旦部署,就可能出現致命的錯誤,
 我們接著來分析:

 我們看到,上面的程式碼中,是為每個請求的到來,建立一個新的執行緒來處理, 那麼這樣就會有以下的問題出現:


無限建立執行緒的缺點:
1.執行緒生命週期的開銷
1.1.執行緒的建立與關閉並非是免費的,實際的開銷根據不同的OS有不同的處理.但是執行緒的建立的確需要時間,帶來處理請求的延遲.一般的Web Server的請求是很頻繁的,為每個請求建立一個執行緒,無非要耗費大量的資源.
2.資源消耗量
2.1. 活動的執行緒會消耗資源,尤其是記憶體.如果可執行的執行緒數多於可用的處理器數,執行緒將會空閒。大量的空閒執行緒佔用更多的記憶體,給垃圾回收器帶來壓力,而且,執行緒在競爭CPU的同時,也會帶來許多其他的效能開銷。所以,建議在有足夠多的執行緒讓CPU忙碌時,不要再建立多餘的執行緒.
3.應用的穩定性
3.1. 應該限制建立執行緒的數量,限制的數目根據不同的平臺而定,同時也受到JVM的啟動引數,Thread的建構函式中棧大小等因素的影響. 如果打破了這個限制,你很可能會得到一個OutOfMemoryError. 在一定範圍內增加執行緒可以提高系統的吞吐量,但是一旦超過這個範圍,再建立執行緒只會拖垮你的系統。甚至可能會導致應用程式的崩潰.
  
我們的解決辦法:
    使用執行緒池,當然,你完全沒有必要自己寫一個執行緒池的實現(好吧,或許你跟我一樣,也希望能從重複創造輪子中,找到自己想要了解的東西),你可以利用 Executor框架來幫你處理,java.util.concurrent提供了一個靈活的執行緒池實現。在新的java類庫當中,任務執行的首要抽象不是Thread,而是Executor.
    Executor僅僅是一個簡單的介面,但是它很強大,包括用於非同步任務的執行,支援不同型別的任務執行策略,為任務提交和任務執行之間的解藕,提供了標準的方式等等, 我們後續再重點討論。
    Executor基於 生產者-消費者模式。提交任務的是生產者,執行任務的是消費者。 也就是說, 採用Executor框架實現 生產者-消費者模式,十分簡單。

package com.ivan.concurrent.charpter6;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class TaskExecutionWebServer{
 private static final int NTHREADS=100;
 //使用執行緒池來避免 為每個請求建立一個執行緒。
 private static final Executor threadPool=Executors.newFixedThreadPool(NTHREADS);
 
 public static void main(String[] args) throws Exception {
  ServerSocket server=new ServerSocket(8011);
  while(true){
   final Socket socket=server.accept();
   threadPool.execute(new Runnable(){
    public void run() {
     handleRequest(socket);
    }
   });
  }
 }

 protected static void handleRequest(Socket socket) {
  /**
   *
   */
  System.out.println(Thread.currentThread().getId());
  try {
   Thread.sleep(5000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 
 
}

執行緒池:
    執行緒池管理著一個工作者執行緒的同構池,執行緒池是與工作佇列緊密繫結的。工作佇列的作用就是持有所有等待執行的任務, 工作者佇列只需要從工作佇列中獲取到下一個任務,執行,然後回來等待下一個執行緒。
    Java類庫中提供了以下幾種執行緒池:
1.newFixedThreadPool :建立定長的執行緒池,每當提交一個任務就建立一個執行緒,直到達到池的最大長度。
2.newCachedThreadPool:建立一個可快取的執行緒池,如果當前執行緒池的長度超過了處理的需要,它可以靈活的收回空閒執行緒,當需求增加時,它可以靈活新增新的執行緒,而並不對池的長度做任何限制
3.newSingleThreadExecutor:建立單執行緒化的executor,它只建立唯一的工作者執行緒來執行任務,如果這個執行緒異常結束,會有另外一個執行緒來取代它.它會保證任務按照任務佇列規定的順序來執行。
4.NewScheduledThreadPool:建立一個定長的執行緒池,而且支援定時的,以及週期性的任務執行,類似Timer.

Executor的生命週期:
    它的建立已經說了,我們來看看它如何關閉, Executor 是為了執行任務而建立執行緒,而JVM通常會在所有非後臺執行緒退出後才退出,如果它無法正確的關閉,則會影響到JVM的結束。
    這裡需要提一下,在我們瞭解如何關閉Executor的一些疑惑,  由於Executor是非同步執行任務,那麼這些任務的狀態不是立即可見的,換句話說,在任務時間裡,這些執行的任務中,有的可能已經完成,有的還可能在執行,其他的還可能在佇列裡面等待。 為了解決這些問題, Java引入了另外一個介面,它擴充套件了Executor,並增加一些生命週期的管理方法: ExecutorService.


ExecutorService表示生命週期有三種狀態:  執行,關閉,終止。
    關閉和終止? 怎麼看上去是一個意思, 這裡我們先擱置著,留著後續來討論。
    
ExecutorService最初建立後的初始狀態就是執行狀態;
    shutdown與shutdownNow方法,都是ExecutorService的關閉方法,區別在於:
    shutdown:
        會啟動一個平穩的關閉過程, 停止接受新任務,同時等待已經提交的任務完成(包括尚未開始執行的任務)
    shutdownNow:
        會啟動一個強制關閉的過程:嘗試取消所有執行中的任務和排在佇列中尚未開始的任務。

    一旦所有任務全完成後,ExecutorService會轉到終止狀態, awaitTermination可以用來等待ExecutorService到達終止狀態,也可以輪詢isTerminated判斷ExecutorService是否已經終止。

package com.ivan.concurrent.charpter6;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

/**
 * 執行緒池的生命週期是如何管理的?
 * @author root
 * OS:Ubuntu 9.04
 * Date:2010-6-19
 */
public class LifeCycleWebServer {
 private static final int NTHREADS=100;
 private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
 
 public void start() throws IOException{
  ServerSocket server=new ServerSocket(8011);
  while(exec.isShutdown()){
   try {
    final Socket socket=server.accept();
    exec.execute(new Runnable(){
     public void run() {
      handleRequest(socket);
     }
    });
   } catch (RejectedExecutionException e) {
    if(!exec.isShutdown()){
     //log.error(...)
    }
   }
  }
 }
 
 
 protected void handleRequest(Socket socket) {
  Request req=readRequest(socket);
  if(isShutDown(req)){
   stop();
  }else{
   dispatchRequest(req);
  }
 }

 public void stop(){
  exec.shutdown();
 }
 
 
 //~ Mock Object And Function..
 private static class Request{
  
 }
 
 private Request readRequest(Socket socket) {
  // TODO Auto-generated method stub
  return null;
 }


 private boolean isShutDown(Request req) {
  // TODO Auto-generated method stub
  return false;
 }


 private void dispatchRequest(Request req) {
  // TODO Auto-generated method stub
  
 }
 
}

OK,瞭解了執行緒池的使用,這裡有必要介紹介紹執行策略,

執行策略:
    簡單來說,就是任務執行的”What,When,Where,How”,包括:
1.任務在什麼執行緒中執行(what)
2.任務以什麼順序執行(fifo,lifo,優先順序)?
3.可以有多少個任務併發執行?(how many)
4.可以有多少個任務進入等待執行佇列
5.系統過載時,需要放棄一個任務,該挑選哪一個? 如何通知應用程式知道?


另外,java類庫中還提供有一種特別的任務,----可攜帶結果的任務:
    Callable 和 Future
    Runnable 作為任務的基本表達形式只是個相當有限的抽象; 它的侷限在於,不能返回一個值或者丟擲受檢查的異常。
    通常,很多工都會引起嚴重的計算延遲,比如執行資料庫查詢,從網路下載資源,進行復雜的計算。對於這樣的任務,Callable是更佳的抽象: 它在主進入點,等待返回值,併為可能丟擲的異常預先作準備。
    Runnable與Callable描述的都是首相的計算型任務,這些任務通常都是有限的。,任務的所生命週期分為4個階段: 建立、提交、開始和完成。
    Future描述了任務的生命週期,並提供了相關的方法來獲取任務的結果、取消任務以及檢驗任務是否已經完成或者被取消。
    Future的get方法取決於任務的狀態, 如果任務已經完成,get會立即返回或者丟擲異常,如果任務沒有完成,get會阻塞直到它的完成。
    
    建立Future的方法有很多, ExecutorService的submit會返回一個Future,你可以將一個Callable或者Runnable提交給executor,然後得到一個Future,用它來重新獲得任務執行的結果,或者取消任務。
    你也可以顯示的為給定的Callable和Runnable例項化一個FutureTask.

    
OK, 前面介紹了很多關於併發的理論知識,下面我們來看看,如果尋找可強化的併發性。

首先,我們從一個例子開始, 開始之前,簡單介紹一下這個例子所要表達的事情:
    它的來源是瀏覽器程式中渲染頁面的那部分功能, 首先獲取HTML,並將它渲染到影象快取裡。為了簡單起見,我們假設HTML只有文字標籤。 OK, 開始吧。

    首先,如果按照一般的處理方式,我們會這樣做:
1.遇到文字標籤,將它渲染到影象快取中
2.當遇到的是一個圖片標籤,我們通過網路獲取它,再將它放到快取裡面。
    
    很明顯,這是最簡單的方式, 它很容易實現,但是,問題在於,你這樣做,是在考驗使用者的耐心,結果就是他會對著螢幕丟一句 ****.然後毫不猶豫的關掉瀏覽器.

    另外一種方法:
     它先渲染文字,併為影象預留出佔位符;在完成第一趟文字處理後,程式返回開始,並下載影象,將它們繪製到佔位符上去。 但是這樣的問題也很明顯, 需要最少2次的文件處理, 其效能與效率稍有提升,但是還不足解決使用者希望快速瀏覽頁面的需求。

    為了使我們的渲染器具有更高的併發性,我們需要做的第一步就是, 將渲染過程分為兩部分: 一個用來渲染文字,一個用來下載所有影象。(一個受限於CPU,另外一個受限於IO, 即使在單CPU系統上,效率的提升也很明顯。)
    Callable與Future可以用來表達所有協同工作的任務之間的互動。我們來看程式碼:

package com.ivan.concurrent.charpter6;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureRenderer {
 private static final int NTHREADS=100;
 private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
 
 void renderPage(CharSequence source){
  final List<ImageInfo> imageinfos=scanForImageInfo(source);
  Callable<List<ImageData>> task=
    new Callable<List<ImageData>>(){
     public List<ImageData> call() throws Exception {
      List<ImageData> result=new ArrayList<ImageData>();
      for(ImageInfo imageinfo:imageinfos){
       result.add(imageinfo.downloadImage());
      }
      return result;
     }
   
  };
  
  
  Future<List<ImageData>> future=exec.submit(task);
  //保證渲染文字與下載影象資料併發執行。
  renderText(source);
  try {
   /**
    * 到達需要所有影象的時間點時,主任務會等待future.get呼叫的結果,
    *  幸運的話,我們請求的同時,下載已經完成,即使沒有,下載也已經預先開始了。
    * 
    *  這裡還有一定的侷限性, 使用者可能不希望等待所有圖片下載完成後才可以看見,
    *   他希望下載完成一張圖片後,就可以立即看到。 …… 這裡還待優化。
    */
   List<ImageData> imageData=future.get();
   
   for(ImageData data:imageData){
    reanderImage(data);
   }
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   future.cancel(true);//取消任務
  }catch(ExecutionException e){
   e.printStackTrace();
   
  }
 }

 private void renderText(CharSequence source) {
  // TODO Auto-generated method stub
  
 }

 private void reanderImage(ImageData data) {
  // TODO Auto-generated method stub
  
 }

 private List<ImageInfo> scanForImageInfo(CharSequence source) {
  // TODO Auto-generated method stub
  return null;
 }
}

CompletionService: 當executorService遇到BlockingQueue
    CompletionService整合了Executor和BlockingQueue的功能,你可以將Callable任務提交給它去執行,然後使用類似於佇列中的take和poll方法,在結果完成可用時,獲得這個結果,像一個打包的Future.
  我們利用它來為我們的渲染器需要優化的地方做些處理,程式碼如下:

package com.ivan.concurrent.charpter6;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureRenderer2 {
 private static final int NTHREADS=100;
 private static final ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);
 
 void renderPage(CharSequence source){
  final List<ImageInfo> imageinfos=scanForImageInfo(source);
  
  CompletionService<ImageData> completionService=new ExecutorCompletionService<ImageData>(exec);
  
  for(final ImageInfo imageinfo:imageinfos){
   completionService.submit(new Callable<ImageData>(){
    public ImageData call() throws Exception {
     //提高效能點一: 將順序的下載,變成併發的下載,縮短下載時間
     return imageinfo.downloadImage();
    }
   });
  }
  renderText(source);
  try {
   for(int i=0;i<imageinfos.size();i++){
    Future<ImageData> f=completionService.take();
    //提高效能點二: 下載完成一張圖片後,立刻渲染到頁面。
    ImageData imagedata=f.get();
    reanderImage(imagedata);
   }
  } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
  }catch(ExecutionException e){
   e.printStackTrace();
   
  }
 }

 private void renderText(CharSequence source) {
  // TODO Auto-generated method stub
  
 }

 private void reanderImage(ImageData data) {
  // TODO Auto-generated method stub
  
 }

 private List<ImageInfo> scanForImageInfo(CharSequence source) {
  // TODO Auto-generated method stub
  return null;
 }
}