1. 程式人生 > >利用執行緒池進行for迴圈處理——執行緒池

利用執行緒池進行for迴圈處理——執行緒池

    專案之前使用for迴圈單執行緒查詢快取效率低下,就萌生了利用執行緒池進行for迴圈處理的想法。將每次迴圈查詢快取的執行緒利用執行緒池進行併發處理,可大大降低系統處理時間。下面是我寫的測試類:

package com.sinoway.cisp.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.sinoway.cisp.mongo.collection.domain.ApplyHistory;

public class ExecServiceTest {
	
	public static void main(String[] args) {
		
		List<ApplyHistory> applyHistoryList = new ArrayList<>();
		for (int i = 0; i < 20; i++) {
			ApplyHistory applyHistory = new ApplyHistory();
			applyHistory.setComment("a");
			applyHistoryList.add(applyHistory);
		}
		long currentTimeMillis = System.currentTimeMillis();
		ExecutorService pool = Executors.newFixedThreadPool(5);
		for(ApplyHistory applyHistory : applyHistoryList){
			Callable<ApplyHistory> run = new Callable<ApplyHistory>(){
				@Override
				public ApplyHistory call() throws InterruptedException{
					String comment = "A";
					applyHistory.setComment(comment);
					Thread.sleep(1000);
					System.out.println(applyHistory.getComment());
						
						return applyHistory;
				}
			};
			pool.submit(run); 
			
		}
		pool.shutdown();
		
		System.out.println("使用執行緒池一共執行:"+String.valueOf(System.currentTimeMillis()-currentTimeMillis)+"ms");
	}

}

執行結果


定義一個Callable執行緒,複寫call()方法(由於Thread.sleep(1000);需要異常處理,所以這裡使用Callable,因為Callable能丟擲異常以及返回值,Runnable則不行),每次迴圈就向執行緒池中提交一個執行緒,最後關閉執行緒池。

如上,設定執行緒池固定執行緒大小為5(一個執行緒佔用實體記憶體1M,這裡執行緒池佔用5M),然後執行20個執行緒,執行可看到控制檯每秒輸出5個執行緒,列印四次。這樣比執行緒序列執行效率高很多。但使用執行緒池,要注意記憶體開銷問題,防止記憶體溢位。

java.uitl.concurrent.ThreadPoolExecutor是執行緒池最核心類,


public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

解釋下構造器中各引數含義

corePoolSize:核心池大小。通常情況執行緒池沒有任何執行緒,等有任務來才會建立執行緒執行任務。當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行;

maximumPoolSize:執行緒池最大執行緒數。當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理;

keepAliveTime:執行緒沒有執行任務時保持多久時間會終止;

unit:keepAliveTime的時間單位;

workQueue:阻塞佇列,儲存等待執行的任務;

threadFactory:主要用來建立執行緒;

handler:拒絕處理任務策略。


ThreadPoolExecutor有幾個重要的方法execute()、submit()、shutdown()、shutdownNow():

execute()可向執行緒池提交一個任務,交由執行緒池執行,submit()同樣,但能返回任務執行結果;

剩下兩個方法用來關閉執行緒池。

雖然執行緒池是構建多執行緒應用程式的強大機制,但使用它並不是沒有風險的。用執行緒池構建的應用程式容易遭受任何其它多執行緒應用程式容易遭受的所有併發風險,諸如同步錯誤和死鎖,它還容易遭受特定於執行緒池的少數其它風險,諸如與池有關的死鎖、資源不足和執行緒洩漏。

常見幾種執行緒池

CachedThreadPool

是通過java.util.concurrent.Executor建立的ThreadPoolExecutor例項
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

需要注意的是使用的佇列是SynchronousQueue直接提交佇列,沒有容量,每插入一個操作都要等待一個相應的刪除操作。直接將任務提交給執行緒執行,若沒有空閒執行緒則建立執行緒,直到程序數量達到最大值,執行拒絕策略。但CachedThreadPool最大執行緒數MAX_VALUE,無窮大執行緒池,它總會迫使執行緒池增加新的執行緒執行任務。空閒執行緒會在指定時間(60秒)被回收。

若同時有大量任務被提交,而且任務執行又不快,那麼就會開啟大量執行緒,這樣會很快耗盡系統資源。

FixedThreadPool

也是ThreadPoolExecutor例項

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

這裡使用的是LinkedBlockingQueue無界任務佇列,除非資源耗盡,否則不存在任務入隊失敗。執行緒數小於corePoolSize時,執行緒池會生成新的執行緒執行任務,達到corePoolSize時,任務直接進入佇列等待,佇列會保持快速增長,直到耗盡記憶體。可看到corePoolSize與maximumPoolSize相等,可自定義執行緒池執行緒數。

SingleThreadPool

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

還有有界任務佇列ArrayBlockingQueue

同時還可自定義建立執行緒池

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());

FixedThreadPool 和 CachedThreadPool 兩者對高負載的應用都不是特別友好。

CachedThreadPool 要比 FixedThreadPool 危險很多。

如果應用要求高負載、低延遲,最好不要選擇以上兩種執行緒池:

  1. 任務佇列的無邊界:會導致記憶體溢位以及高延遲
  2. 長時間執行會導致 CachedThreadPool 線上程建立上失控

因為兩者都不是特別友好,所以推薦使用 ThreadPoolExecutor ,它提供了很多引數可以進行細粒度的控制。