高併發Java 六 JDK併發包2
1. 執行緒池的基本使用
1.1.為什麼需要執行緒池
平時的業務中,如果要使用多執行緒,那麼我們會在業務開始前建立執行緒,業務結束後,銷燬執行緒。但是對於業務來說,執行緒的建立和銷燬是與業務本身無關的,只關心執行緒所執行的任務。因此希望把儘可能多的cpu用在執行任務上面,而不是用在與業務無關的執行緒建立和銷燬上面。而執行緒池則解決了這個問題,執行緒池的作用就是將執行緒進行復用。
1.2.JDK為我們提供了哪些支援
JDK中的相關類圖如上圖所示。
其中要提到的幾個特別的類。
Callable類和Runable類相似,但是區別在於Callable有返回值。
ThreadPoolExecutor是執行緒池的一個重要實現。
而Executors是一個工廠類。
1.3.執行緒池的使用
1.3.1.執行緒池的種類
- new FixedThreadPool 固定數量的執行緒池,執行緒池中的執行緒數量是固定的,不會改變。
- new SingleThreadExecutor 單一執行緒池,執行緒池中只有一個執行緒。
- new CachedThreadPool 快取執行緒池,執行緒池中的執行緒數量不固定,會根據需求的大小進行改變。
- new ScheduledThreadPool 計劃任務排程的執行緒池,用於執行計劃任務,比如每隔5分鐘怎麼樣,
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從方法上來看,顯然 FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是ThreadPoolExecutor的不同例項,只是引數不同。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
下面來簡述下 ThreadPoolExecutor建構函式中引數的含義。
- corePoolSize 執行緒池中核心執行緒數的數目
- maximumPoolSize 執行緒池中最多能容納多少個執行緒
- keepAliveTime 當現線上程數目大於corePoolSize時,超過keepAliveTime時間後,多出corePoolSize的那些執行緒將被終結。
- unit keepAliveTime的單位
- workQueue 當任務數量很大,執行緒池中執行緒無法滿足時,提交的任務會被放到阻塞佇列中,執行緒空閒下來則會不斷從阻塞佇列中取資料。
這樣在來看上面所說的FixedThreadPool,它的執行緒的核心數目和最大容納數目都是一樣的,以至於在工作期間,並不會建立和銷燬執行緒。當任務數量很大,執行緒池中的執行緒無法滿足時,任務將被儲存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。這就意味著,任務不斷地新增,會使記憶體消耗越來越大。
而CachedThreadPool則不同,它的核心執行緒數量是0,最大容納數目是Integer.MAX_VALUE,它的阻塞佇列是SynchronousQueue,這是一個特別的佇列,它的大小是0。由於核心執行緒數量是0,所以必然要將任務新增到SynchronousQueue中,這個佇列只有一個執行緒在從中新增資料,同時另一個執行緒在從中獲取資料時,才能成功。獨自往這個佇列中新增資料會返回失敗。當返回失敗時,則執行緒池開始擴充套件執行緒,這就是為什麼CachedThreadPool的執行緒數目是不固定的。當60s該執行緒仍未被使用時,執行緒則被銷燬。
1.4.執行緒池使用的小例子
1.4.1.簡單執行緒池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + "Thread ID:"
+ Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
es.submit(myTask);
}
}
}
由於使用的newFixedThreadPool(5),但是啟動了10個執行緒,所以每次執行5個,並且 可以很明顯的看到執行緒的複用,ThreadId是重複的,也就是前5個任務和後5個任務都是同一批執行緒去執行的。
這裡用的是
es.submit(myTask);
還有一種提交方式:
es.execute(myTask);
區別在於submit會返回一個Future物件,這個將在以後介紹。
1.4.2.ScheduledThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
//如果前面的任務還未完成,則排程不會啟動。
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (Exception e) {
// TODO: handle exception
}
}
}, 0, 2, TimeUnit.SECONDS);//啟動0秒後執行,然後週期2秒執行一次
}
}
輸出:
1454832514
1454832517
1454832520
1454832523
1454832526
...
由於任務執行需要1秒,任務排程必須等待前一個任務完成。也就是這裡的每隔2秒的意思是,前一個任務完成後2秒再開啟新的一個任務。
2. 擴充套件和增強執行緒池
2.1.回撥介面
執行緒池中有一些回撥的api來給我們提供擴充套件的操作。
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準備執行");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("執行完成");
}
@Override
protected void terminated() {
System.out.println("執行緒池退出");
}
};
我們可以通過實現ThreadPoolExecutor的子類去覆蓋ThreadPoolExecutor的beforeExecute,afterExecute,terminated方法來實現線上程執行前後,執行緒池退出時的日誌管理或其他操作。
2.2.拒絕策略
有時候,任務非常繁重,導致系統負載太大。在上面說過,當任務量越來越大時,任務都將放到FixedThreadPool的阻塞佇列中,導致記憶體消耗太大,最終導致記憶體溢位。這樣的情況是應該要避免的。因此當我們發現執行緒數量要超過最大執行緒數量時,我們應該放棄一些任務。丟棄時,我們應該把任務記下來,而不是直接丟掉。
ThreadPoolExecutor中還有另一個建構函式。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
threadFactory我們在後面再介紹。
而handler就是拒絕策略的實現,它會告訴我們,如果任務不能執行了,該怎麼做。
共有以上4種策略。
AbortPolicy:如果不能接受任務了,則丟擲異常。
CallerRunsPolicy:如果不能接受任務了,則讓呼叫的執行緒去完成。
DiscardOldestPolicy:如果不能接受任務了,則丟棄最老的一個任務,由一個佇列來維護。
DiscardPolicy:如果不能接受任務了,則丟棄任務。
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is discard");
}
});
當然我們也可以自己實現RejectedExecutionHandler介面來自己定義拒絕策略。
2.3.自定義ThreadFactory
剛剛已經看到了,在ThreadPoolExecutor的建構函式中可以指定threadFactory。
執行緒池中的執行緒都是由執行緒工廠創建出來,我們可以自定義執行緒工廠。
預設的執行緒工廠:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
3. ForkJoin
3.1.思想
就是分而治之的思想。
fork/join類似MapReduce演算法,兩者區別是:Fork/Join 只有在必要時如任務非常大的情況下才分割成一個個小任務,而 MapReduce總是在開始執行第一步進行分割。看來,Fork/Join更適合一個JVM內執行緒級別,而MapReduce適合分散式系統。
4.2.使用介面
RecursiveAction:無返回值RecursiveTask:有返回值
4.3.簡單例子
import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if(canCompute)
{
for (long i = start; i <= end; i++) {
sum = sum + i;
}
}else
{
//分成100個小任務
long step = (start + end)/100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if(lastOne > end )
{
lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();//把子任務推向執行緒池
}
for (CountTask t : subTasks) {
sum += t.join();//等待所有子任務結束
}
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
long res = result.get();
System.out.println("sum = " + res);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
上述例子描述了一個累加和的任務。將累加任務分成100個任務,每個任務只執行一段數字的累加和,最後join後,把每個任務計算出的和再累加起來。
4.4.實現要素
4.4.1.WorkQueue與ctl
每一個執行緒都會有一個工作佇列
static final class WorkQueue
在工作佇列中,會有一系列對執行緒進行管理的欄位
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
int nsteals; // number of steals
int hint; // steal index hint
short poolIndex; // index of this queue in pool
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
這裡要注意的是,JDK7和JDK8在ForkJoin的實現上有了很大的差別。我們這裡介紹的是JDK8中的。 線上程池中,有時不是所有的執行緒都在執行的,部分執行緒會被掛起,那些掛起的執行緒會被存放到一個棧中。內部通過一個連結串列表示。
nextWait會指向下一個等待的執行緒。
poolIndex執行緒線上程池中的下標索引。
eventCount 在初始化時,eventCount與poolIndex有關。總共32位,第一位表示是否被啟用,15位表示被掛起的次數eventCount,剩下的表示poolIndex。用一個欄位來表示多個意思。
工作佇列WorkQueue用ForkJoinTask<?>[] array來表示。而top,base來表示佇列的兩端,資料在這兩者之間。
在ForkJoinPool中維護著ctl(64位long型)
volatile long ctl;
* Field ctl is a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16 bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: poolIndex of top of Treiber stack of waiters (16 bits)
AC表示活躍的執行緒數減去並行度(大概就是CPU個數)
TC表示總的執行緒數減去並行度
ST表示執行緒池本身是否是啟用的
EC表示頂端等待執行緒的掛起數
ID表示頂端等待執行緒的poolIndex
很明顯ST+EC+ID就是我們剛剛所說的 eventCount 。那麼為什麼明明5個變數,非要合成一個變數呢。其實用5個變數佔用容量也差不多。
用一個變數程式碼的可讀性上會差很多。
那麼為什麼用一個變數呢?其實這點才是最巧妙的地方,因為這5個變數是一個整體,在多執行緒中,如果用5個變數,那麼當修改其中一個變數時,如何保證5個變數的整體性。那麼用一個變數則就解決了這個問題。如果用鎖解決,則會降低效能。
用一個變數則保證了資料的一致性和原子性。
在ForkJoin中隊ctl的更改都是使用CAS操作,在前面系列的文章中已經介紹過,CAS是無鎖的操作,效能很好。
由於CAS操作也只能針對一個變數,所以這種設計是最優的。
4.4.2.工作竊取
接下來要介紹下整個執行緒池的工作流程。
每個執行緒都會呼叫runWorker
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
for (int r = w.hint; scan(w, r) == 0; ) {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
scan()函式是掃描是否有任務要做。
r是一個相對隨機的數字。
private final int scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
long c = ctl; // for consistency check
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
for (int j = m + m + 1, ec = w.eventCount;;) {
WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
if ((q = ws[(r - j) & m]) != null &&
(b = q.base) - q.top < 0 && (a = q.array) != null) {
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null) {
if (ec < 0)
helpRelease(c, ws, w, q, b);
else if (q.base == b &&
U.compareAndSwapObject(a, i, t, null)) {
U.putOrderedInt(q, QBASE, b + 1);
if ((b + 1) - q.top < 0)
signalWork(ws, q);
w.runTask(t);
}
}
break;
}
else if (--j < 0) {
if ((ec | (e = (int)c)) < 0) // inactive or terminating
return awaitWork(w, c, ec);
else if (ctl == c) { // try to inactivate and enqueue
long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
w.nextWait = e;
w.eventCount = ec | INT_SIGN;
if (!U.compareAndSwapLong(this, CTL, c, nc))
w.eventCount = ec; // back out
}
break;
}
}
}
return 0;
}
我們接下來看看scan方法,scan的一個引數是WorkQueue,上面已經說過,每個執行緒都會擁有一個WorkQueue,那麼多個執行緒的WorkQueue就會儲存在workQueues裡面,r是一個隨機數,通過r來找到某一個WorkQueue,在WorkQueue裡面有所要做的任務。
然後通過WorkQueue的base,取得base的偏移量。
b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..
然後通過偏移量得到最後一個的任務,執行這個任務
t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..
通過這個大概的分析理解了過程,我們發現,當前執行緒呼叫scan方法後,不會執行當前的WorkQueue中的任務,而是通過一個隨機數r,來得到其他 WorkQueue的任務。這就是ForkJoinPool的主要的一個機理。
當前執行緒不會只著眼於自己的任務,而是優先完成其他任務。這樣做來,防止了飢餓現象的發生。這樣就預防了某些執行緒因為卡死或者其他原因而無法及時完成任務,或者某個執行緒的任務量很大,其他執行緒卻沒事可做。
然後來看看runTask方法
final void runTask(ForkJoinTask<?> task) {
if ((currentSteal = task) != null) {
ForkJoinWorkerThread thread;
task.doExec();
ForkJoinTask<?>[] a = array;
int md = mode;
++nsteals;
currentSteal = null;
if (md != 0)
pollAndExecAll();
else if (a != null) {
int s, m = a.length - 1;
ForkJoinTask<?> t;
while ((s = top - 1) - base >= 0 &&
(t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
top = s;
t.doExec();
}
}
if ((thread = owner) != null) // no need to do in finally clause
thread.afterTopLevelExec();
}
}
有一個有趣的命名:currentSteal,偷得的任務,的確是剛剛解釋的那樣。
task.doExec();
將會完成這個任務。
完成了別人的任務以後,將會完成自己的任務。
通過得到top來獲得自己任務第一個任務
while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null)
{
top = s;
t.doExec();
}
接下來,通過一個圖來總結下剛剛執行緒池的流程
拿其他執行緒的任務都是從base開始拿的,自己拿自己的任務是從top開始拿的。這樣可以減少衝突
如果沒有找到其他任務
else if (--j < 0) {
if ((ec | (e = (int)c)) < 0) // inactive or terminating
return awaitWork(w, c, ec);
else if (ctl == c) { // try to inactivate and enqueue
long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
w.nextWait = e;
w.eventCount = ec | INT_SIGN;
if (!U.compareAndSwapLong(this, CTL, c, nc))
w.eventCount = ec; // back out
}
break;
}
那麼首先會通過一系列執行來改變ctl的值,獲得了nc,然後用CAS將新的值賦值。然後就呼叫awaitWork()將執行緒進入等待狀態(呼叫的 前面系列文章中提到的unsafe的park方法)。
這裡要說明的是改變ctl值這裡,首先是將ctl中的AC-1,AC是佔ctl的前16位,所以不能直接-1,而是通過AC_UNIT(0x1000000000000)來達到使ctl的前16位-1的效果。
前面說過eventCount中有儲存poolIndex,通過poolIndex以及WorkQueue中的nextWait,就能遍歷所有的等待執行緒。