ThreadPoolExecutor執行緒池的簡單應用
一、什麼是執行緒池?
執行緒池,其實就是一個容納多個執行緒的容器,其中的執行緒可以反覆使用,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多資源。
二、執行緒池的優勢
-
第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
-
第二:提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
-
第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。
三、ThreadPoolExecutor引數認知
-
corePoolSize : 執行緒池的基本大小,當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
-
runnableTaskQueue:任務對列,用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
-
ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
-
LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
-
SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
-
PriorityBlockingQueue
-
maximumPoolSize:執行緒池最大大小,執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
-
ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,Debug和定位問題時非常又幫助。
-
RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。
-
CallerRunsPolicy:只用呼叫者所線上程來執行任務。
-
DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
-
DiscardPolicy:不處理,丟棄掉。
-
當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
-
keepAliveTime :執行緒活動保持時間,執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
-
TimeUnit:執行緒活動保持時間的單位,可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
四、ThreadPoolExecutor使用demo
封裝Executors
package com.gm.thread.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Executors {
/**
* 可控最大併發數執行緒池: 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
* @param name 執行緒作用
* @param corePoolSize 執行緒池的基本大小
* @return
*/
public static ExecutorService newFixedThreadPool(String name, int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 可回收快取執行緒池: 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
* @param name 執行緒作用
* @param corePoolSize 執行緒池的基本大小
* @param maximumPoolSize 執行緒池最大大小
* @return
*/
public static ExecutorService newCachedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 180L, TimeUnit.SECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 有界執行緒池:此執行緒池一直增長,直到上限,增長後不收縮(因為池子裡面的執行緒是永生的)
* @param name 執行緒作用
* @param corePoolSize 執行緒池的基本大小
* @param maximumPoolSize 執行緒池最大大小
* @return
*/
public static ExecutorService newLimitedThreadPool(String name, int corePoolSize, int maximumPoolSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 2147483647L, TimeUnit.SECONDS, new SynchronousQueue(),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
/**
* 單執行緒化執行緒池:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行
* @param name 執行緒作用
* @return
*/
public static ExecutorService newSingleThreadExecutor(String name) {
return java.util.concurrent.Executors.newSingleThreadExecutor(new NamedThreadFactory(name, true));
}
}
封裝自定義執行緒工廠
package com.gm.thread.demo;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo) {
this.mPrefix = (prefix + "-thread-");
this.mDaemo = daemo;
/*
* SecurityManager(安全管理器)應用場景 當執行未知的Java程式的時候,該程式可能有惡意程式碼(刪除系統檔案、重啟系統等),
* 為了防止執行惡意程式碼對系統產生影響,需要對執行的程式碼的許可權進行控制, 這時候就要啟用Java安全管理器。
*/
SecurityManager s = System.getSecurityManager();
this.mGroup = (s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup());
}
public Thread newThread(Runnable runnable) {
String name = this.mPrefix + this.mThreadNum.getAndIncrement();
Thread ret = new Thread(this.mGroup, runnable, name, 0L);
ret.setDaemon(this.mDaemo);
return ret;
}
public ThreadGroup getThreadGroup() {
return this.mGroup;
}
}
封裝自定義RejectedExecutionHandler(飽和策略)
package com.gm.thread.demo;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private static final String ERROR = "Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)!";
public AbortPolicyWithReport(String threadName) {
this.threadName = threadName;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format(
ERROR,
new Object[] { this.threadName,
Integer.valueOf(e.getPoolSize()),
Integer.valueOf(e.getActiveCount()),
Integer.valueOf(e.getCorePoolSize()),
Integer.valueOf(e.getMaximumPoolSize()),
Integer.valueOf(e.getLargestPoolSize()),
Long.valueOf(e.getTaskCount()),
Long.valueOf(e.getCompletedTaskCount()),
Boolean.valueOf(e.isShutdown()),
Boolean.valueOf(e.isTerminated()),
Boolean.valueOf(e.isTerminating())});
logger.warn(msg);
throw new RejectedExecutionException(msg);
}
}
使用demo
package com.gm.thread.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
public class ThreadPoolDemo {
private int corePoolSize = 10;
private int maximumPoolSize = 10;
private ExecutorService exe = null;
private Semaphore available = null;
public void execute(){
if(exe==null){
exe = Executors.newCachedThreadPool("this thread is a demo", corePoolSize,
maximumPoolSize);
}
if(available==null){
available = new Semaphore(maximumPoolSize);
}
/*
* 模擬從資料庫中查詢出的資料,比如發生退款時,對接方沒有非同步通知,
* 所以需要從第三方獲取退款狀態
*/
List<Integer> list = new ArrayList<>();
// for (int i = 0; i < 10; i++) {
// list.add(i++);
// }
if (list != null) {
for (Integer integer : list) {
//方法acquireUninterruptibly()的作用是使等待進入acquire()方法的執行緒,不允許被中斷。
available.acquireUninterruptibly();
exe.execute(new Runnable() {
@Override
public void run() {
System.out.println("當前執行緒"+Thread.currentThread().getName()+"請求第三方介面");
}
});
}
}
}
public static void main(String[] args) {
new ThreadPoolDemo().execute();
}
}
如此,我們一個小小的ThreadPoolExecutor的例子完成了。