1. 程式人生 > >ThreadPoolExecutor執行緒池的簡單應用

ThreadPoolExecutor執行緒池的簡單應用

一、什麼是執行緒池?
執行緒池,其實就是一個容納多個執行緒的容器,其中的執行緒可以反覆使用,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多資源。
二、執行緒池的優勢

  • 第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

  • 第二:提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。

  • 第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。

三、ThreadPoolExecutor引數認知

  1. corePoolSize : 執行緒池的基本大小,當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。

  2. runnableTaskQueue:任務對列,用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。

  • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。

  • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。

  • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。

  • PriorityBlockingQueue

    :一個具有優先順序得無限阻塞佇列。

  1. maximumPoolSize:執行緒池最大大小,執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。

  2. ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,Debug和定位問題時非常又幫助。

  3. RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。

  • CallerRunsPolicy:只用呼叫者所線上程來執行任務。

  • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。

  • DiscardPolicy:不處理,丟棄掉。

  • 當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。

  1. keepAliveTime :執行緒活動保持時間,執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。

  2. TimeUnit:執行緒活動保持時間的單位,可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

四、ThreadPoolExecutor使用demo
封裝Executors

package com.gm.thread.demo;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Executors {

	/**
	 * 可控最大併發數執行緒池: 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
	 * @param name 執行緒作用
	 * @param corePoolSize 執行緒池的基本大小
	 * @return
	 */
	public static ExecutorService newFixedThreadPool(String name, int corePoolSize) {
		return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 可回收快取執行緒池: 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
	 * @param name 執行緒作用 
	 * @param corePoolSize 執行緒池的基本大小
	 * @param maximumPoolSize 執行緒池最大大小
	 * @return
	 */
	public static ExecutorService newCachedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
		return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 180L, TimeUnit.SECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 有界執行緒池:此執行緒池一直增長,直到上限,增長後不收縮(因為池子裡面的執行緒是永生的)
	 * @param name 執行緒作用
	 * @param corePoolSize 執行緒池的基本大小
	 * @param maximumPoolSize 執行緒池最大大小
	 * @return
	 */
	public static ExecutorService newLimitedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
		return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2147483647L, TimeUnit.SECONDS, new SynchronousQueue(),
				new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
	}
	
	/**
	 * 單執行緒化執行緒池:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行
	 * @param name 執行緒作用
	 * @return
	 */
	public static ExecutorService newSingleThreadExecutor(String name) {
		return java.util.concurrent.Executors.newSingleThreadExecutor(new NamedThreadFactory(name, true));
	}
}

封裝自定義執行緒工廠

package com.gm.thread.demo;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {
	private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
	
	private final AtomicInteger mThreadNum = new AtomicInteger(1);
	private final String mPrefix;
	private final boolean mDaemo;
	private final ThreadGroup mGroup;

	public NamedThreadFactory() {
		this("pool-" + POOL_SEQ.getAndIncrement(), false);
	}

	public NamedThreadFactory(String prefix) {
		this(prefix, false);
	}

	public NamedThreadFactory(String prefix, boolean daemo) {
		this.mPrefix = (prefix + "-thread-");
		this.mDaemo = daemo;
		
		/*
		 * SecurityManager(安全管理器)應用場景 當執行未知的Java程式的時候,該程式可能有惡意程式碼(刪除系統檔案、重啟系統等),
		 * 為了防止執行惡意程式碼對系統產生影響,需要對執行的程式碼的許可權進行控制, 這時候就要啟用Java安全管理器。
		 */
		SecurityManager s = System.getSecurityManager();
		this.mGroup = (s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup());
	}

	public Thread newThread(Runnable runnable) {
		String name = this.mPrefix + this.mThreadNum.getAndIncrement();
		Thread ret = new Thread(this.mGroup, runnable, name, 0L);
		ret.setDaemon(this.mDaemo);
		return ret;
	}

	public ThreadGroup getThreadGroup() {
		return this.mGroup;
	}
}

封裝自定義RejectedExecutionHandler(飽和策略)

package com.gm.thread.demo;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
	protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
	private final String threadName;
	private static final String ERROR = "Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)!";

	public AbortPolicyWithReport(String threadName) {
		this.threadName = threadName;
	}

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		String msg = String.format(
				ERROR,
				new Object[] { this.threadName, 
						Integer.valueOf(e.getPoolSize()), 
						Integer.valueOf(e.getActiveCount()),
						Integer.valueOf(e.getCorePoolSize()), 
						Integer.valueOf(e.getMaximumPoolSize()),
						Integer.valueOf(e.getLargestPoolSize()), 
						Long.valueOf(e.getTaskCount()),
						Long.valueOf(e.getCompletedTaskCount()), 
						Boolean.valueOf(e.isShutdown()),
						Boolean.valueOf(e.isTerminated()), 
						Boolean.valueOf(e.isTerminating())});

		logger.warn(msg);
		throw new RejectedExecutionException(msg);
	}
}

使用demo

package com.gm.thread.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class ThreadPoolDemo {

	private int corePoolSize = 10;
	private int maximumPoolSize = 10;

	private ExecutorService exe = null;

	private Semaphore available = null;
	
	public void execute(){
		
		if(exe==null){
			exe = Executors.newCachedThreadPool("this thread is a demo", corePoolSize,
					maximumPoolSize);
		}
		
		if(available==null){
			available = new Semaphore(maximumPoolSize);
		}
		/*
		 * 模擬從資料庫中查詢出的資料,比如發生退款時,對接方沒有非同步通知,
		 * 所以需要從第三方獲取退款狀態
		 */
		List<Integer> list = new ArrayList<>();
//		for (int i = 0; i < 10; i++) {
//			list.add(i++);
//		}
		if (list != null) {
			for (Integer integer : list) {
				//方法acquireUninterruptibly()的作用是使等待進入acquire()方法的執行緒,不允許被中斷。
				available.acquireUninterruptibly();
				exe.execute(new Runnable() {
					
					@Override
					public void run() {
						System.out.println("當前執行緒"+Thread.currentThread().getName()+"請求第三方介面");
					}
					
				});
			}
		}
	}
	
	public static void main(String[] args) {
		new ThreadPoolDemo().execute();
	}
}

如此,我們一個小小的ThreadPoolExecutor的例子完成了。