執行緒(4)-執行緒池
目錄
1.1JDK對執行緒池的支援
為了更好的能夠控制多執行緒,JDK提供了一套Executor框架,其本質是一個執行緒池。
上面是Executor的框架圖。
如圖Executors類扮演著執行緒池工廠的角色,通過Executors可以取得一個擁有特定功能的執行緒池。從圖中看到ThreadPoolExecutor類實現了Executor介面,因此通過這個介面任何Runnable的物件都可以呼叫。
Executor提供了以下工廠方法:
- newFixedThreadPool():該方法返回一個固定執行緒數量的執行緒池。執行緒池裡若有空閒執行緒則立即執行,否則暫存在一個任務佇列中,等待空閒執行緒。
- newSingleThreadExecutor():該方法返回了一個只有一個執行緒的執行緒池。
- newCachedThreadPool():該方法返回了一個可根據實際情況調整執行緒數量的執行緒。執行緒池裡若有空閒執行緒則立即執行,否則會建立一個新的執行緒。
- newSingleThreadScheduleExecutor():該方法返回一個ScheduleExecutorService物件,執行緒池大小為1。ScheduleExecutorService介面在ExecutorService介面之上,擴充套件了在給定時間執行某任務的功能。
- newScheduleThreadPool():該方法也返回一個ScheduleExecutorService介面,但該執行緒池可以指定執行緒池數量。
ScheduleExecutorService物件主要的一些方法:
- schedule():會在給定時間,對任務進行一次排程。
- scheduleAtFixedRate():建立一個週期任務。任務開始於給定初始延時。後續的任務按照給定的週期執行:後續第一個任務會在initialDelay+period時執行,後續第二個任務會在initialDelay+2*period時執行,以此類推。
- scheduleWithFixedDelay():建立並執行一個週期性任務。任務開始於初始延時時間,後續任務將會按照給定的延時進行,即上一個任務結束到下一個任務開始的時間差進行。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleExecutorServoceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 1, 2, TimeUnit.SECONDS);
}
}
執行結果:
1537947034 1537947036 1537947038 1537947040 1537947042
可以看到這裡時間間隔是2秒。
如果執行時間超過排程時間會發生什麼事情。將上述Thread.sleep(1000);改為
Thread.sleep(3000);
執行結果:
1537947606 1537947609 1537947612 1537947615
發現執行任務的執行週期為3秒。也就是說週期小於執行時間,那麼任務就會在上一個任務結束後立即被呼叫。
如果採用scheduleWithFixedDelay(),執行3秒,排程兩秒,那麼時間間隔為5秒。
1.2核心執行緒池的內部實現
首先了解一下ThreadPoolExecutor最重要的建構函式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:指定了執行緒池中的執行緒數量。
- maximumPoolSize:制定了執行緒池中最大執行緒的數量。
- keepAliveTime:當執行緒池數量超過corePoolSize時,多餘的空閒執行緒的存活時間。即,超過corePoolSize的空閒執行緒,在多長時間內被銷燬。
- unit:keepAliveTime的單位。
-
workQueue:任務佇列,被提交但尚未被執行的任務。
workQueue,是一個BlockingQueue介面的物件,僅用於存放Runnable物件。在ThreadPoolExecutor的建構函式中可食用以下幾種BlockingQueue。
- 直接提交的佇列:該功能由SynchronousQueue物件提供。SynchronousQueue是一個特殊的BlockingQueue。它沒有容量,每一個插入操作都要等待一個相應的刪除操作,反之每一個刪除操作都要等待一個對應的插入操作。使用SynchronousQueue,提交的任務不會被真實儲存,而總是將新任務交給執行緒執行,如果沒有空閒執行緒,則建立新執行緒,如果執行緒數量達到最大值,則執行拒絕策略。
- 有界的任務佇列:該佇列可由ArrayBlockingQueue實現。它的建構函式必須帶一個容量引數。表示該佇列最大容量。有界隊列當在任務執行緒滿的時候才能將執行緒數提升到corePoolSize以上。
- 無界的任務佇列:無界的任務佇列可通過LinkedBlockingQueue類實現。與有界任務佇列相比,除非系統資源耗盡,否則不會加入失敗。
- 優先任務佇列:優先任務佇列是帶有執行優先順序的任務佇列。他通過PriorityBlockingQueue實現,可以控制任務的執行先後順序。它是一個特殊的無界佇列。
-
threadFactory:用於建立執行緒,一般用預設的即可。
-
handler:拒絕策略。當任務太多來不及處理,如何拒絕任務。
ThreadPoolExecutor的核心排程程式碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
程式碼第五行workerCountOf()函式取得了當前執行緒池的執行緒總數。當前執行緒總數小於corePoolSize時,將會通過addWorker()方法直接排程執行。否則在第十行程式碼處(workQueue.offer())進入等待佇列。如果進入等待佇列失敗,則會執行第17行,將任務直接交給執行緒池。如果當前執行緒數已達到maximumPoolSize,則提交失敗,就執行18行的拒絕策略。
上圖是ThreadPoolExecutor的任務排程邏輯
1.3拒絕策略
JDK內建的拒絕策略如下。
- AbortPolicy策略:該策略會直接丟擲異常,組織系統工作。
- CallerRunsPolicy策略:只要執行緒池關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是任務提交的效能會急劇下降。
- DiscardOldestPolicy策略:該策略丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
- DiscardPolicy策略:該策略默默丟棄無法處理的任務,不予任何處理。