1. 程式人生 > >java之執行緒池簡單實現

java之執行緒池簡單實現

以前做的東西,實現一個簡單的多執行緒機制,開始之前,現說說原理性的東西吧,下面是我在ibm開發者上搜到的內容

執行緒池的技術背景

  在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件, 以便能夠在物件銷燬後進行垃圾回收。所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用 已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。

  多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。但如果對多執行緒應用不當,會增加對單個任務的處理時間。可以舉一個簡單的例子:

  假設在一臺伺服器完成一項任務的時間為T

      T1 建立執行緒的時間
      T2 線上程中執行任務的時間,包括執行緒間同步所需時間
      T3 執行緒銷燬的時間		

  顯然T = T1+T2+T3。注意這是一個極度簡化的假設。

  可以看出T1,T3是多執行緒本身的帶來的開銷,我們渴望減少T1,T3所用的時間,從而減少T的時間。但一些執行緒的使用者並沒有注意到 這一點,所以在程式中頻繁的建立或銷燬執行緒,這導致T1和T3在T中佔有相當比例。顯然這是突出了執行緒的弱點(T1,T3),而不是優點(併發性)。

  執行緒池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高伺服器程式效能的。它把T1,T3分別安排在伺服器程式的啟動和結束的時間段或者一些空閒的時間段,這樣在伺服器程式處理客戶請求時,不會有T1,T3的開銷了。

  執行緒池不僅調整T1,T3產生的時間段,而且它還顯著減少了建立執行緒的數目。在看一個例子:

  假設一個伺服器一天要處理50000個請求,並且每個請求需要一個單獨的執行緒完成。我們比較利用執行緒池技術和不利於執行緒池技術的伺服器 處理這些請求時所產生的執行緒總數。線上程池中,執行緒數一般是固定的,所以產生執行緒總數不會超過執行緒池中執行緒的數目或者上限(以下簡稱執行緒池尺寸),而如果 伺服器不利用執行緒池來處理這些請求則執行緒總數為50000。一般執行緒池尺寸是遠小於50000。所以利用執行緒池的伺服器程式不會為了建立50000而在處 理請求時浪費時間,從而提高效率。

  這些都是假設,不能充分說明問題,下面我將討論執行緒池的簡單實現並對該程式進行對比測試,以說明執行緒技術優點及應用領域。

一般一個簡單執行緒池至少包含下列組成部分

  • 執行緒池管理器(ThreadPoolManager):用於建立並管理執行緒池
  • 工作執行緒(WorkThread): 執行緒池中執行緒
  • 任務介面(Task):每個任務必須實現的介面,以供工作執行緒排程任務的執行。
  • 任務佇列:用於存放沒有處理的任務。提供一種緩衝機制。

  執行緒池管理器至少有下列功能:建立執行緒池,銷燬執行緒池,新增新任務。下面就是小弟的實現,還是歡迎拍磚哈:

複製程式碼
public class ThreadPoolManager {
    private static ThreadPoolManager instance = null;
    private List<Upload> taskQueue = Collections.synchronizedList(new LinkedList<Upload>());//任務佇列    private WorkThread[] workQueue ;    //工作執行緒(真正執行任務的執行緒)    private static int worker_num = 5;    //工作執行緒數量(預設工作執行緒數量是5)    private static int worker_count = 0;
    
    private ThreadPoolManager(){
        this(5);
    }
    private ThreadPoolManager(int num){
        worker_num = num;
        workQueue = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++){
            workQueue[i] = new WorkThread(i);
        }
    }
    
    public static synchronized ThreadPoolManager getInstance(){
        if(instance==null)
            instance = new ThreadPoolManager();
        return instance;
    }
    
    public void addTask(Upload task){
        //對任務佇列的操作要上鎖        synchronized (taskQueue) {
            if(task!=null){
                taskQueue.add(task);
                taskQueue.notifyAll();
                System.out.println("task id "+task.getInfo() + " submit!");
            }
                
        }
    }
    
    public void BatchAddTask(Upload[] tasks){
        //對任務佇列的修改操作要上鎖        synchronized (taskQueue) {
            for(Upload e:tasks){
                if(e!=null){
                    taskQueue.add(e);
                    taskQueue.notifyAll();
                    System.out.println("task id "+e.getInfo() + " submit!");
                }
            }        
        }
    }
    
    public void destory(){
        System.out.println("pool begins to destory ...");
        for(int i = 0;i<worker_num;i++){
            workQueue[i].stopThread();
            workQueue[i] = null;
        }
        //對任務佇列的操作要上鎖        synchronized (taskQueue) {
            taskQueue.clear();
        }
        
        System.out.println("pool ends to destory ...");
    }
    
    private class WorkThread extends Thread{
        private int taksId ;
        private boolean isRuning = true;
        private boolean isWaiting = false;
        
        
         
        public WorkThread(int taskId){
            this.taksId= taskId;
            this.start();
        }
        
        public boolean isWaiting(){
            return isWaiting;
        }
        // 如果任務進行中時,不能立刻終止執行緒,需要等待任務完成之後檢測到isRuning為false的時候,退出run()方法        public void stopThread(){
            isRuning = false;
        }
        
        @Override
        public void run() {
            while(isRuning){
                Upload temp = null;
                //對任務佇列的操作要上鎖                synchronized (taskQueue) {
                    //任務佇列為空,等待新的任務加入                    while(isRuning&&taskQueue.isEmpty()){
                        try {
                            taskQueue.wait(20);
                        } catch (InterruptedException e) {
                            System.out.println("InterruptedException occre...");
                            e.printStackTrace();
                        }
                    }
                    if(isRuning)
                        temp = taskQueue.remove(0);
                }
                //當等待新任務加入時候,終止執行緒(呼叫stopThread函式)造成 temp = null                if(temp!=null){
                    System.out.println("task info: "+temp.getInfo()+ " is begining");
                    isWaiting = false;
                    temp.uploadPic();
                    isWaiting = true;
                    System.out.println("task info: "+temp.getInfo()+ " is finished");
                }    
            }
        }
    }
}
複製程式碼

  然後定義任務介面(Task):這裡我定義的是上傳圖片的功能介面(這裡用抽象類或者介面隨你自己).

複製程式碼
public abstract class Upload {
    protected String info;
    abstract boolean uploadPic();
    public String getInfo(){
        return info;
    }
}
複製程式碼

  然後定義具體任務類:我這裡簡單,讓它睡眠2s。當然你也可以定義很多實現Upload的任務類。

複製程式碼
public class TaskUpload extends Upload {
    
    public TaskUpload(String info){
        this.info = info;
    }
    public String getInfo(){
        return info;
    }
    @Override
    public boolean uploadPic()  {
        // TODO Auto-generated method stub        System.out.println(info+"sleep begin ....");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block            e.printStackTrace();
        }
        System.out.println(info+"sleep end ....");
        return false;
    }
}
複製程式碼

  最後,測試這個簡單的執行緒池:

複製程式碼
public class ThreadPoolManagerTest {


    public static void main(String[] args) {
        // TODO Auto-generated method stub        Upload[] tasks = createBatchTask(7);
        ThreadPoolManager pool = ThreadPoolManager.getInstance();
        pool.BatchAddTask(tasks);
        pool.destory();
    }
    private static Upload[] createBatchTask(int n){
        Upload[] tasks = new TaskUpload[n];
        for(int i = 0;i<n ;i++ ){
            tasks[i] = new TaskUpload("task id is "+ i);
        }
        return tasks;
    }
}
複製程式碼

執行緒池技術適用範圍及應注意的問題

  執行緒池的應用範圍:

  1. 需要大量的執行緒來完成任務,且完成任務的時間比較短。 WEB伺服器完成網頁請求這樣的任務,使用執行緒池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點選次數。 但對於長時間的任務,比如一個Telnet連線請求,執行緒池的優點就不明顯了。因為Telnet會話時間比執行緒的建立時間大多了。
  2. 對效能要求苛刻的應用,比如要求伺服器迅速相應客戶請求。
  3. 接受突發性的大量請求,但不至於使伺服器因此產生大量執行緒的應用。突發性大量客戶請求,在沒有執行緒池情況下,將產生大量執行緒,雖然理論上大部分作業系統執行緒數目最大值不是問題,短時間內產生大量執行緒可能使記憶體到達極限,並出現"OutOfMemory"的錯誤。