JDK併發包(concurrent)
多執行緒的軟體設計,確實可以最大限度的發揮現代多核處理器的計算能力,提高系統的吞吐量和效能。但是,若不加控制和管理的隨意使用執行緒,對系統的效能反而會產生不利的影響。
一、什麼是執行緒池
為了避免系統頻繁的建立和銷燬執行緒,我們可以讓建立的執行緒複用。如果你寫過資料庫方便的程式碼,那你肯定知道資料庫連線池。或者是傳送http請求時候的,http連線池。亦或者是傳送mq訊息時候的連線池。執行緒池於他們的作用是一樣的,執行緒池中,總是有幾個活躍的執行緒。當需要執行緒的時候,從執行緒池中拿一個空閒的來用,用完了再放進去。
二、JDK對執行緒池的支援
為了能夠更好的控制多執行緒,JDK提供了一套Executor框架,幫助開發人員進行有效的執行緒控制,其本質就是一個執行緒池。
Executor框架提供了各種型別的執行緒池,主要有一下幾種:
- newFixedThreadPool():該方法返回一個固定執行緒數量的執行緒池。該執行緒池中的執行緒數量始終保持不變。當有一個新的任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則新的任務被暫存在一個佇列中,待有執行緒空閒時,便處理在任務佇列中的任務。
- newSingleThreadExecutor():該方法返回只有一個執行緒的執行緒池,若有多於一個的任務,被提交到執行緒池時,任務將會被暫存在一個佇列中,待執行緒池空閒時,按先入先出的順序執行佇列中的任務。
- newCachedThreadPool():該方法返回一個可根據實際情況調整執行緒數量的執行緒池。執行緒池的數量不確定,但若有空閒執行緒可以複用,則會優先使用可複用執行緒。若所有執行緒均在工作,又有新任務提交,則會建立新的執行緒處理任務。所有執行緒,在當前任務執行完畢後,將返回執行緒池進行復用。
- newSingleThreadScheduledExecutor() :該方法返回一個ScheduledExecutorService物件,執行緒池大小為1。ScheduledExecutorService介面在ExecutorService介面之上進行了擴充套件,在給定時間,執行某任務的功能,例如在固定的,某個延時之後執行,或者週期性執行某個任務。
- newScheduledThreadPool():該方法也返回一個ScheduledExecutorService物件,但該執行緒池,可以指定執行緒數量。
1. 固定大小的執行緒池
這裡以newFixedThreadPool()為例,簡單展示下用法
package com.example.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by mazhenhua on 2017/3/9.
*/
public class ThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++){
es.submit(task);
}
}
}
輸出結果:
從這個結果中,我們看到,一共是5個執行緒,共執行了10次,當打出前5個執行緒的時候,停頓了1秒,然後又打印出了5個,並且全部列印完,程式並沒有結束,說明執行緒池還有活躍的執行緒。
2. 計劃任務
另外一個值得注意的方法是newScheduledThreadPool(),它返回一個ScheduledExecutorService物件,可以根據時間需要對執行緒進行排程。
ScheduledExecutorService與其他幾個執行緒池不同,它並不會立即安排執行任務。它其實是起到了定時任務的作用。它會在指定的時間,對任務進行排程。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
schedule()方法會在指定的時間,對任務進行一次排程。scheduleAtFixedRate()和scheduleWithFixedDelay()方法,會對任務進行週期性的呼叫,兩者的不同點是,FixedRate是從上一個任務開始後的period時間啟動下一次任務,而FixedDelay則是從上一個執行緒執行結束後delay時間後啟動下一個執行緒。一個是從上一個執行緒的開始時間計時,一個是從上一個執行緒的結束時間計時。
package com.example.thread;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by mazhenhua on 2017/3/9.
*/
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}, 10, 2, TimeUnit.SECONDS);
}
}
執行上面程式碼可以發現,先停頓了10秒,然後每2秒列印一次。
上面的程式碼中,執行時sleep了1秒,所以週期為2秒一次的打印出來了,那麼如果sleep5秒會怎麼樣呢?
這裡不再囉嗦,直接給出結論,程式碼可以自己手動改,結論:會以5秒為週期的列印,任務並不會重疊而是在上次執行完畢後立即執行下一個任務。
如果是,scheduleWithFixedDelay方法就不一樣了,因為他本身就是上一個任務執行完畢才開始計時的,所以,如果是scheduleWithFixedDelay方法的話,那就會是7秒一個週期的列印,
排程程式實際上並不能保證任務會無限期的被持續呼叫。如果任務本身丟擲了異常,那麼後續的所有執行都會被中斷,因此,如果你想讓你任務持續穩定的執行,那麼一定要處理好異常。
3. 執行緒池的內部實現
對於核心的幾個執行緒池,無論是newFixedThreadPool(),newSingleThreadExecutor()還是newCachedThreadPool(),雖然看起來建立執行緒有著不同的特點,但是其內部實現均使用了,ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:指定了執行緒池中的執行緒數量。
- maximumPoolSize:指定了執行緒池中的最大執行緒數量。
- keepAliveTime:當執行緒池數量超過corePoolSize時,對於的空閒執行緒的存活時間。即,超過corePoolSize的執行緒,在多長時間內會被銷燬。
- unit:keepAliveTime的時間單位。
- workQueue:任務佇列,被提交但尚未被執行的佇列。
- threadFactory:執行緒工廠,用於建立執行緒,一般用預設的即可。
- handler:拒絕策略。當任務太多來不及處理時,如何拒絕任務。
這裡詳細說明一下workQueue和handler兩個引數,
引數workQueue指被提交,但未執行的任務佇列,他是一個BlockingQueue介面物件,僅存放Runable物件。根據佇列功能分類,在ThreadPoolExecutor的建構函式中,可以使用以下幾種BlockingQueue。
直接提交佇列:該功能由SynchronousQueue物件提供。SynchronousQueue是一個特殊的BlockingQueue。SynchronousQueue沒有容量,每一個插入操作,都要等待相應的刪除操作,反之,每一個刪除操作都要等待相應的插入操作。如果使用SynchronousQueue,提交的任務不會被真正的儲存,而總是將新任務提交給執行緒執行,如果沒有空執行緒,則嘗試建立執行緒,如果執行緒數量已經達到最大值,則執行拒絕策略。因此使用SynchronousQueue佇列,通常要設定maximumPoolSize值,否則很容執行拒絕策略。
有界的任務佇列:有界的任務佇列可以使用ArrayBlockingQueue實現,ArrayBlockingQueue的建構函式必須帶一個容量引數,表示該佇列的最大容量,當使用有界的任務佇列時,若有新的任務需要執行,如果執行緒池的實際執行緒數小於corePoolSize,則會優先建立新執行緒,若大於corePoolSize,則會將新任務加入等待佇列,若等待佇列已滿,無法加入,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒執行任務,若大於maximumPoolSize,則執行拒絕策略。可見,有界佇列僅當任務佇列裝滿時,才能將執行緒數提升到corePoolSize之上,換句話說,除非系統非常繁忙,否則確保核心執行緒數維持在corePoolSize
無界的任務佇列:無界任務佇列可以通過LinkedBlockingQueue類來實現。與有界佇列相比,除非系統資源耗盡,否則無界的任務佇列不存在任務入隊失敗的情況。當有新的任務到來,系統的執行緒數小於corePoolSize時,執行緒池會生成新的執行緒執行任務,但當系統的執行緒數達到corePoolSize後,就不會繼續增加。若後續又有任務繼續提交,而有沒有可用的執行緒時,則任務直接進入等待。若任務的建立和處理速度相差很大,無界佇列會保持快速增長,知道耗盡系統記憶體。
優先任務佇列:優先任務佇列是帶有執行優先順序的佇列。他通過PriorityBlockingQueue實現,可以控制任務的執行先後順序。他是一個特殊的無界佇列。無論是有界佇列ArrayBlockingQueue還是LinkedBlockingQueue無界佇列,都是按照先進先出的演算法處理任務的。而PriorityBlockingQueue則可以根據自身的優先順序順序先後執行,在確保系統性能的同時,也能有很好的質量保證。
newFixedThreadPool()方法返回了一個corePoolSize和maximumPoolSize一樣大小的,並使用了LinkedBlockingQueue無邊界的任務佇列執行緒池。因為對於固定大小的執行緒池,不存線上程數量大小的變化。
newSingleThreadExecutor()與newFixedThreadPool()類似只是corePoolSize和maximumPoolSize設定為了1
newCachedThreadPool()方法返回一個corePoolSize為0,maximumPoolSize無限大的執行緒池,這意味著,在沒有任務時,執行緒池內是沒有執行緒的,當有執行緒提交時,該執行緒池會使用空閒的執行緒執行任務,若無空閒執行緒,則將任務加入SynchronousQueue佇列,SynchronousQueue佇列是直接提交的佇列,它總會破使執行緒池增加新的執行緒執行任務,當執行完畢後,由於corePoolSize為0,因此空執行緒會在指定的時間(60s)內被回收。
下面是ThreadPoolExecutor的一段執行的核心程式碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
程式碼中workerCountOf()方法獲取了當前執行緒池的執行緒總數。當執行緒總數小於corePoolSize核心執行緒數時,會將任務通過addWorker()方法直接排程執行。否則在workQueue.offer(command)方法進入等待佇列。如果等待佇列失敗(比如有界佇列達到上限,或者使用SynchronousQueue佇列)則會直接將任務提交給執行緒池,如果提交失敗,則執行拒絕策略。
4. 拒絕策略
上面多次提到了拒絕策略,那麼拒絕策略到底是什麼呢,請往下看。
拒絕策略就是當任務數量超過系統實際承載能力時,該如何處理。
JDK內建提供了四種策略:
- AbortPolicy策略:該策略會直接丟擲異常,阻止系統正常工作
- CallerRunsPolicy策略:只要執行緒池未關閉,該策略直接在呼叫執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交執行緒的效能極有可能會急劇下降。
- DiscardPolicy策略:該策略,默默的丟棄無法處理的任務,不予任何處理。如果允許任務丟失,可以採用這個。
- DiscardOldestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試,再次提交這個任務。
以上策略都實現了RejectedExecutionHandler介面,若上述策略無法滿足需要,可以自己擴充套件RejectedExecutionHandler介面,
package com.example.thread;
import java.util.concurrent.*;
/**
* Created by mazhenhua on 2017/3/13.
*/
public class RejectedThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread ID:" +
Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is disCard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i ++){
es.submit(task);
Thread.sleep(10);
}
}
}
輸出結果:
上述程式碼中,執行緒池中有5個常駐執行緒,並且最大執行緒數也是5個,但是他卻擁有隻有10個容量的等待佇列。
MyTask的執行時間是100毫秒。所以當任務被源源不斷的提交時,拒絕策略就開始生效了。
5. 自定義執行緒建立:ThreadFactory
看了那麼多關於執行緒池的介紹,有沒有考慮一個基本的問題,執行緒池中的執行緒是哪裡來的?
執行緒池的作用是為了執行緒的複用,避免頻繁的建立執行緒,但是最開始的執行緒是哪裡開的呢?答案是:ThreadFactory
ThreadFactory介面只有一個方法建立執行緒Thread newThread(Runnable r),當需要建立執行緒時,就呼叫這個方法。自定義執行緒,可以幫助我們做不少事。我們可以追蹤執行緒池在何時建立了多少個執行緒,也可以定義執行緒的組,優先順序,名字等等。也可以任性的將所有執行緒都設定為守護執行緒。
package com.example.thread;
import java.util.concurrent.*;
/**
* Created by mazhenhua on 2017/3/13.
*/
public class RejectedThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
//System.out.println(System.currentTimeMillis() + ": Thread ID:" +
// Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 拒絕策略
/* public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is disCard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i ++){
es.submit(task);
Thread.sleep(10);
}
}*/
public static void main(String[] args) throws InterruptedException{
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create : " + t);
return t;
}
});
for (int i = 0; i < 5; i ++){
es.submit(task);
}
Thread.sleep(2000);
}
}
執行結果:
從輸出結果中可以看出,建立了5個守護執行緒,並且當主執行緒main結束後,所有的執行緒都結束了。
6. 擴充套件執行緒池
雖然JDK已經幫我們實現了這個穩定的高效能執行緒池。但有的時候我們需要對這個執行緒池做一些擴充套件,比如,想要監控每個任務的執行開始時間結束時間,或者列印一些關鍵引數等等。
ThreadPoolExecutor提供了beforeExecute(),afterExecute(),terminated()三個介面對執行緒池進行控制,有點類似於spring的AOP
package com.example.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by mazhenhua on 2017/3/14.
*/
public class ExtThreadPool {
public static class MyTask implements Runnable {
public String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在執行:Thread ID:" + Thread.currentThread().getId()
+ ", Task Name = " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("正在準備執行:" + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("執行完成:" + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("執行緒池退出");
}
};
for (int i = 0; i < 5; i ++){
MyTask task = new MyTask("TASK-" + i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
輸出結果:
相關推薦
JDK併發包--重入鎖
重入鎖基本使用: 使用java.util.concurrent.locks.ReentrantLock 類來實現,可以替代synchronized關鍵字。如下例,重入鎖有著顯示的操作過程,開發者可以手動指定在哪裡加鎖,在哪裡釋放(退出臨界區時必須釋放,不然其他執行緒沒有機會
JDK併發包--Condition,Semaphore,ReadWriteLock
ps:從北京到杭州可以有不同的路線,這些路線就好比多個執行緒,在路上可能因為各種原因停停走走,這就好比是執行緒的等待和通知。Condition條件: 使用sychronized關鍵字來保證執行緒的同步時,需要wait()和notify()來保證執行緒間的通訊;而使用重入鎖時
JDK併發包-執行緒複用:執行緒池
為了避免系統頻繁地建立和銷燬執行緒,我們可以讓建立的執行緒進行復用。執行緒池中,總有那麼幾個活躍執行緒。當你需要使用執行緒時,可以從池子中隨便拿一個空閒執行緒,當完成工作時,並不急著關閉執行緒,而是將整個執行緒退回到池子,方便其他人使用。 1.1 JDK對執行緒池的支援 JDK提供一
JDK併發包使用
JDK併發包簡單使用 JUC 變數與執行緒安全 併發容器 併發佇列 併發工具類 執行緒池 JUC 在 Java 5.0 提供了 java.util.concurrent(JUC)併發包,提供併發程式設計中很常用的工
Java高併發程式設計 JDK併發包(上)
1. 重入鎖 重入鎖可以完全替代synchronized關鍵字。使用java.util.concurrent.locks.ReentrantLock類實現,下面是一個重入鎖的簡單例子: package cn.net.bysoft.java.concurrency.desi
java執行緒學習(六):JDK併發包之重入鎖-ReentrantLock
本章將學習重入鎖的以下方法: lock(): 獲得鎖,如果鎖已被佔用,則等待; lockInterruptbly(): 獲得鎖,但優先響應中斷; tryLock(): 嘗試獲得鎖,如果成功,返回true,反之返回false,該方法不會等待,執行則立即返回,可用tryL
JDK併發包溫故而知新系列(一)—— 競態條件與記憶體可見性
併發問題 併發問題是指多執行緒讀寫共享記憶體的時候,產生結果不可預期的情況,併發問題的產生的原因可以歸結為兩種,一是因為競態條件,二是因為記憶體可見性、 競態條件 什麼是競態條件 競態條件,官方解釋是如果程式執行順序的改變會影響最終結果,這就是一個競態條件。 這句話有點抽象,描述的有點抽象,我個人對競態條件
JDK併發包溫故知新系列(四)—— CAS原理與JDK8的優化
public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next))
JDK併發包溫故知新系列(五)—— 顯式鎖與顯式條件
ReentrantReadWriteLock 兩把鎖共享一個等待佇列,兩把鎖的狀態都由一個原子變量表示,特有的獲取鎖和釋放鎖邏輯。 ReentrantReadWriteLock的基本原理: 讀鎖的獲取,只要求寫鎖沒有被執行緒持有就可以獲取,檢查等待佇列,逐個喚醒等待讀鎖執行緒,遇到等待寫鎖執行緒則停止.
JDK併發包溫故知新系列(三)—— 執行緒的中斷
RUNNABLE狀態-執行緒呼叫了start()方法,處於等待系統排程或在執行中 這種情況下只設置中斷標誌位。 WAITING/TIMED_WAITING-等待狀態 當呼叫如下方法時進入等待狀態,包括的方法有: WATING:呼叫了鎖資源的wait方法,或呼叫了join方法。 TIMED_WAITING:w
java.util.concurrent java併發包中部分工具的使用
java.util.concurrent.Semaphore(訊號量) https://www.cnblogs.com/nullzx/p/5270233.html https://my.oschina.net/cloudcoder/blog/362974
集合及concurrent併發包總結
1.集合包 集合包最常用的有Collection和Map兩個介面的實現類,Colleciton用於存放多個單物件,Map用於存放Key-Value形式的鍵值對。 Collection中最常用的又分為兩種型別的介面:List和Set,兩者最明顯的差別為List支
Java多執行緒(二)Java併發工具包concurrent例項簡述
傳統的多執行緒並沒有提供高階特性,例如:訊號量、執行緒池和執行管理器等,而這些特性恰恰有助於建立強大的併發程式。新的Fork/Join框架針對當前的多核系統,也提供了並行程式設計的可能。這塊的內容是java多執行緒資訊量最大的一部分內容,本篇部落格循序漸進的,首
Java7 java.util.concurrent 併發包計劃
原文連結,譯文連結,譯者:DOM,校對:鄭旭東 Java7釋出版本的具體計劃應該開始了,之前經常有人問我們關於JSR166的後續計劃包含哪些內容。下面是目前暫定的內容,歡迎提出你們的見 解和意見。 1.Phasers 一個通用的記憶體屏障實現。 2.LinkedTransferQue
深入淺出Java Concurrent,Java併發包整理
最近整理了一下java.util.concurrrent包下的相關類和功能實現。把相關比較好的部落格推薦一下給大家 先看一下JUC的大體結構 ReentrantLock實現原理深入探究 ConcurrentSkipList實現原理 :SkipList 跳錶 Conc
java併發包學習系列:jdk併發容器
同步容器 同步容器可以簡單地理解為通過synchronized來實現同步的容器,如果有多個執行緒呼叫同步容器的方法,它們將會序列執行。 同步容器將它們的狀態封裝起來,並對每一個公有方法進行同步。主要包括: Vector Stack HashTable C
Java --- 併發庫concurrent包學習總結
下面的例子演示了這個阻塞佇列的基本功能。 import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concur
JAVA的synchronized關鍵字與concurrent併發包的效能比較。
private static AtomicInteger mySessionID= new AtomicInteger(-2); public static int getMySessionID() { return mySessionID
jdk基礎設施回顧:jdk中的concurrent包
public class Test { public static void main(String[] args) { //第一種方式 ExecutorService executor = Executors.newCachedThreadPool();
知識梳理之Java併發包相關(java.util.concurrent/Blocking/Concurrent/ThreadPoolExecutors/CopyOnWrite)
Java併發包提供了哪些併發工具類? 我們通常所說的併發包也就是 java.util.concurrent 及其子包,集中了 Java 併發的各種基礎工具類,具體主要包括幾個方面: 提供了比 synchronized 更加高階的各種同步結構,包括 Coun