LINUX檔案基本操作
高階技術之 JUC 高併發程式設計
內容概覽
- 1 、什麼是JUC
- 2 、Lock介面
- 3 、執行緒間通訊
- 4 、集合的執行緒安全
- 5 、多執行緒鎖
- 6 、Callable介面
- 7 、JUC三大輔助類: CountDownLatch CyclicBarrier Semaphore
- 8 、讀寫鎖: ReentrantReadWriteLock
- 9 、阻塞佇列
- 10 、ThreadPool執行緒池
- 11 、Fork/Join框架
- 12 、CompletableFuture
1 什麼是 JUC
1.1 JUC簡介
在Java中,執行緒部分是一個重點,本篇文章說的JUC也是關於執行緒的。JUC就是java.util.concurrent工具包的簡稱。這是一個處理執行緒的工具包,JDK1.5開始出現的。
1.2 程序與執行緒
程序(Process) 是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。 在當代面向執行緒設計的計算機結構中,程序是執行緒的容器。程式是指令、資料及其組織形式的描述,程序是程式的實體。是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。程式是指令、資料及其組織形式的描述,程序是程式的實體。
執行緒(thread) 是作業系統能夠進行運算排程的最小單位。它被包含在程序之中,是程序中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個程序中可以併發多個執行緒,每條執行緒並行執行不同的任務。
總結來說:
程序:指在系統中正在執行的一個應用程式;程式一旦執行就是程序;
程序——資源分配的最小單位。
執行緒:系統分配處理器時間資源的基本單元,或者說程序之內獨立執行的一個單元執行流。
執行緒——程式執行的最小單位。
1.3 執行緒的狀態
1.3.1 執行緒狀態列舉類
Thread.State
public enum State { /** * Thread state for a thread which has not yet started. */ NEW,(新建) /** * Thread state for a runnable thread. A thread in the runnable * state is executing in the Java virtual machine but it may * be waiting for other resources from the operating system * such as processor. */ RUNNABLE,(準備就緒) /** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */ BLOCKED,(阻塞) /** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called {@code Object.wait()} * on an object is waiting for another thread to call * {@code Object.notify()} or {@code Object.notifyAll()} on * that object. A thread that has called {@code Thread.join()} * is waiting for a specified thread to terminate. */ WAITING,(不見不散) /** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */ TIMED_WAITING,(過時不候) /** * Thread state for a terminated thread. * The thread has completed execution. */ TERMINATED;(終結) }
1.3.2 wait/sleep的區別
( 1 )sleep是Thread的靜態方法,wait是Object的方法,任何物件例項都能呼叫。
( 2 )sleep不會釋放鎖,它也不需要佔用鎖。wait會釋放鎖,但呼叫它的前提是當前執行緒佔有鎖(即程式碼要在synchronized中)。
( 3 )它們都可以被interrupted方法中斷。
1.4 併發與並行
1.4.1 序列模式
序列表示所有任務都一一按先後順序進行。序列意味著必須先裝完一車柴才能運送這車柴,只有運送到了,才能卸下這車柴,並且只有完成了這整個三個步驟,才能進行下一個步驟。
序列是一次只能取得一個任務,並執行這個任務 。
1.4.2 並行模式
並行意味著可以同時取得多個任務,並同時去執行所取得的這些任務。並行模式相當於將長長的一條佇列,劃分成了多條短佇列,所以並行縮短了任務佇列的長度。並行的效率從程式碼層次上強依賴於多程序/多執行緒程式碼,從硬體角度上則依賴於多核CPU。
1.4.3 併發
併發(concurrent)指的是多個程式可以同時執行的現象,更細化的是多程序可以同時執行或者多指令可以同時執行 。但這不是重點,在描述併發的時候也不會去扣這種字眼是否精確,併發的重點在於它是一種現象, 併發描述的是多程序同時執行的現象。但實際上,對於單核心CPU來說,同一時刻只能執行一個執行緒。所以,這裡的"同時執行"表示的不是真的同一時刻有多個執行緒執行的現象,這是並行的概念,而是提供一種功能讓使用者看來多個程式同時執行起來了,但實際上這些程式中的程序不是一直霸佔CPU的,而是執行一會停一會。
要解決大併發問題,通常是將大任務分解成多個小任務 , 由於作業系統對程序的排程是隨機的,所以切分成多個小任務後,可能會從任一小任務處執行。
這可能會出現一些現象:
-
可能出現一個小任務執行了多次,還沒開始下個任務的情況。這時一般會採用佇列或類似的資料結構來存放各個小任務的成果
-
可能出現還沒準備好第一步就執行第二步的可能。這時,一般採用多路複用或非同步的方式,比如只有準備好產生了事件通知才執行某個任務。
-
可以多程序/多執行緒的方式並行執行這些小任務。也可以單程序/單執行緒執行這些小任務,這時很可能要配合多路複用才能達到較高的效率
1.4.4 小結(重點)
併發: 同一時刻多個執行緒在訪問同一個資源,多個執行緒對一個點
例子:春運搶票 電商秒殺...
並行: 多項工作一起執行,之後再彙總
例子:泡方便麵,電水壺燒水,一邊撕調料倒入桶中
1.5管程
管程(monitor)是保證了同一時刻只有一個程序在管程內活動,即管程內定義的操作在同一時刻只被一個程序呼叫(由編譯器實現).但是這樣並不能保證程序以設計的順序執行
JVM中同步是基於進入和退出管程(monitor)物件實現的,每個物件都會有一個管程(monitor)物件,管程(monitor)會隨著java物件一同建立和銷燬
執行執行緒首先要持有管程物件,然後才能執行方法,當方法完成之後會釋放管程,方法在執行時候會持有管程,其他執行緒無法再獲取同一個管程
1.6使用者執行緒和守護執行緒
使用者執行緒: 平時用到的普通執行緒,自定義執行緒
守護執行緒: 執行在後臺,是一種特殊的執行緒,比如垃圾回收
當主執行緒結束後,使用者執行緒還在執行,JVM存活
如果沒有使用者執行緒,都是守護執行緒,JVM結束
2 Lock 介面
2.1 Synchronized
2.1.1 Synchronized關鍵字回顧
synchronized是Java中的關鍵字,是一種同步鎖。它修飾的物件有以下幾種:
-
修飾一個程式碼塊,被修飾的程式碼塊稱為同步語句塊,其作用的範圍是大括號{}括起來的程式碼,作用的物件是呼叫這個程式碼塊的物件;
-
修飾一個方法,被修飾的方法稱為同步方法,其作用的範圍是整個方法,作用的物件是呼叫這個方法的物件;
- 雖然可以使用synchronized來定義方法,但synchronized並不屬於方法定義的一部分,因此,synchronized關鍵字不能被繼承。如果在父類中的某個方法使用了synchronized關鍵字,而在子類中覆蓋了這個方法,在子類中的這個方法預設情況下並不是同步的,而必須顯式地在子類的這個方法中加上synchronized關鍵字才可以。當然,還可以在子類方法中呼叫父類中相應的方法,這樣雖然子類中的方法不是同步的,但子類呼叫了父類的同步方法,因此,子類的方法也就相當於同步了。
-
修改一個靜態的方法,其作用的範圍是整個靜態方法,作用的物件是這個類的所有物件;
-
修改一個類,其作用的範圍是synchronized後面括號括起來的部分,作用主的物件是這個類的所有物件。
2.1.2 售票案例
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"aa").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"bb").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"cc").start();
}
}
class Ticket{
//票數
private int number = 30;
//操作方法:賣票
public synchronized void sale() {
//判讀:是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ":賣出:" + (number--) + "剩下:" + number);
}
}
}
如果一個程式碼塊被synchronized修飾了,當一個執行緒獲取了對應的鎖,並執行該程式碼塊時,其他執行緒便只能一直等待,等待獲取鎖的執行緒釋放鎖,而這裡獲取鎖的執行緒釋放鎖只會有兩種情況:
1 )獲取鎖的執行緒執行完了該程式碼塊,然後執行緒釋放對鎖的佔有;
2 )執行緒執行發生異常,此時JVM會讓執行緒自動釋放鎖。
那麼如果這個獲取鎖的執行緒由於要等待IO或者其他原因(比如呼叫sleep方法)被阻塞了,但是又沒有釋放鎖,其他執行緒便只能乾巴巴地等待,試想一下,這多麼影響程式執行效率。
因此就需要有一種機制可以不讓等待的執行緒一直無期限地等待下去(比如只等待一定的時間或者能夠響應中斷),通過Lock就可以辦到。
2.2 什麼是Lock
Lock鎖實現提供了比使用同步方法和語句可以獲得的更廣泛的鎖操作。它們允許更靈活的結構,可能具有非常不同的屬性,並且可能支援多個關聯的條件物件。Lock提供了比synchronized更多的功能。Lock與的Synchronized區別
-
Lock不是Java語言內建的,synchronized是Java語言的關鍵字,因此是內建特性。Lock是一個類,通過這個類可以實現同步訪問;
-
Lock和synchronized有一點非常大的不同,採用synchronized不需要使用者去手動釋放鎖,當 synchronized 方法或者 synchronized 程式碼塊執行完之後,系統會自動讓執行緒釋放對鎖的佔用;而Lock則必須要使用者去手動釋放鎖,如果沒有主動釋放鎖,就有可能導致出現死鎖現象。
死鎖:
2.2.1 Lock介面
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
下面來逐個講述Lock介面中每個方法的使用
2.2.2 lock
lock()方法是平常使用得最多的一個方法,就是用來獲取鎖。如果鎖已被其他執行緒獲取,則進行等待。
採用Lock,必須主動去釋放鎖,並且在發生異常時,不會自動釋放鎖。因此一般來說,使用Lock必須在try{}catch{}塊中進行,並且將釋放鎖的操作放在finally塊中進行,以保證鎖一定被被釋放,防止死鎖的發生。通常使用Lock來進行同步的話,是以下面這種形式去使用的:
Lock lock = ...;
lock.lock();
try{
//處理任務
}catch(Exception ex){
}finally{
lock.unlock(); //釋放鎖
}
賣票示例
public class LockSaleTicket {
public static void main(String[] args) {
LockTicket ticket = new LockTicket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "aa").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "bb").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "cc").start();
}
}
class LockTicket {
//票數
private int number = 30;
//建立可重入鎖
private final ReentrantLock lock = new ReentrantLock();
//操作方法:賣票
public void sale() {
//上鎖
lock.lock();
try {
//判讀:是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ":賣出:" + (number--) + "剩下:" + number);
}
} finally {
//解鎖
lock.unlock();
}
}
}
2.2.3 newCondition
關鍵字synchronized與wait()/notify()這兩個方法一起使用可以實現等待/通知模式,
Lock鎖的newContition()方法返回Condition物件,Condition類也可以實現等待/通知模式。
用notify()通知時,JVM會隨機喚醒某個等待的執行緒, 使用Condition類可以進行選擇性通知, Condition比較常用的兩個方法:
-
await()會使當前執行緒等待,同時會釋放鎖,當其他執行緒呼叫signal()時,執行緒會重新獲得鎖並繼續執行。
-
signal()用於喚醒一個等待的執行緒。
注意:在呼叫Condition的await()/signal()方法前,也需要執行緒持有相關的Lock鎖,呼叫await()後執行緒會釋放這個鎖,在singal()呼叫後會從當前Condition物件的等待佇列中,喚醒 一個執行緒,喚醒的執行緒嘗試獲得鎖, 一旦獲得鎖成功就繼續執行。
2.3 ReentrantLock
ReentrantLock,意思是“可重入鎖”,關於可重入鎖的概念將在後面講述。
ReentrantLock是唯一實現了Lock介面的類,並且ReentrantLock提供了更多的方法。下面通過一些例項看具體看一下如何使用。
public class Test {
private ArrayList<Integer> arrayList = new ArrayList<Integer>();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.insert(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.insert(Thread.currentThread());
};
}.start();
}
public void insert(Thread thread) {
Lock lock = new ReentrantLock(); //注意這個地方
lock.lock();
try {
System.out.println(thread.getName() + "得到了鎖");
for (int i = 0; i < 5; i++) {
arrayList.add(i);
}
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println(thread.getName() + "釋放了鎖");
lock.unlock();
}
}
}
2.4 ReadWriteLock
ReadWriteLock也是一個介面,在它裡面只定義了兩個方法:
public interface ReadWriteLock {
/**
- Returns the lock used for reading.
-
- @return the lock used for reading.
*/
Lock readLock();
##### /**
- Returns the lock used for writing.
-
- @return the lock used for writing.
*/
Lock writeLock();
}
一個用來獲取讀鎖,一個用來獲取寫鎖。也就是說將檔案的讀寫操作分開,分成 2 個鎖來分配給執行緒,從而使得多個執行緒可以同時進行讀操作。下面的ReentrantReadWriteLock 實現了ReadWriteLock介面。ReentrantReadWriteLock裡面提供了很多豐富的方法,不過最主要的有兩個方法:readLock()和writeLock()用來獲取讀鎖和寫鎖。
下面通過幾個例子來看一下ReentrantReadWriteLock具體用法。
假如有多個執行緒要同時進行讀操作的話,先看一下 synchronized 達到的效果:
public class Test {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在進行讀操作");
}
System.out.println(thread.getName() + "讀操作完畢");
}
}
而改成用讀寫鎖的話:
public class Test {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread() {
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName() + "正在進行讀操作");
}
System.out.println(thread.getName() + "讀操作完畢");
} finally {
rwl.readLock().unlock();
}
}
}
說明thread1和thread2在同時進行讀操作。這樣就大大提升了讀操作的效率。
== 注意: ==
-
如果有一個執行緒已經佔用了讀鎖,則此時其他執行緒如果要申請寫鎖,則申請寫鎖的執行緒會一直等待釋放讀鎖。
-
如果有一個執行緒已經佔用了寫鎖,則此時其他執行緒如果申請寫鎖或者讀鎖,則申請的執行緒會一直等待釋放寫鎖。
2.5 小結(重點)
Lock和synchronized有以下幾點不同:
-
Lock是一個介面,而synchronized是Java中的關鍵字,synchronized是內建的語言實現;
-
synchronized在發生異常時,會自動釋放執行緒佔有的鎖,因此不會導致死鎖現象發生;而Lock在發生異常時,如果沒有主動通過unLock()去釋放鎖,則很可能造成死鎖現象,因此使用Lock時需要在finally塊中釋放鎖;
-
Lock可以讓等待鎖的執行緒響應中斷,而synchronized卻不行,使用synchronized時,等待的執行緒會一直等待下去,不能夠響應中斷;
-
通過Lock可以知道有沒有成功獲取鎖,而synchronized卻無法辦到。
-
Lock可以提高多個執行緒進行讀操作的效率。在效能上來說,如果競爭資源不激烈,兩者的效能是差不多的,而當競爭資源非常激烈時(即有大量執行緒同時競爭),此時Lock的效能要遠遠優於synchronized。
3 執行緒間通訊
執行緒間通訊的模型有兩種:共享記憶體和訊息傳遞,以下方式都是基本這兩種模型來實現的。我們來基本一道面試常見的題目來分析
場景---兩個執行緒,一個執行緒對當前數值加 1 ,另一個執行緒對當前數值減1,要求用執行緒間通訊
3. 1 synchronized方案
public class Test {
/**
* 交替加減
*
* @param args
*/
public static void main(String[] args) {
DemoClass demoClass = new DemoClass();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demoClass.increment();
}
}, "執行緒A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demoClass.decrement();
}
}, "執行緒B").start();
}
}
class DemoClass {
//加減物件
private int number = 0;
/**
* 加 1
*/
public synchronized void increment() {
try {
while (number != 0) {
this.wait();
}
number++;
System.out.println("--------" + Thread.currentThread().getName() + "加一成功----------,值為:" + number);
notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 減一
*/
public synchronized void decrement() {
try {
while (number == 0) {
this.wait();
}
number--;
System.out.println("--------" + Thread.currentThread().getName() + "減一成功----------,值為:" + number);
notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 2 Lock方案
public class Test {
/**
* 交替加減
*
* @param args
*/
public static void main(String[] args) {
DemoClass demoClass = new DemoClass();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demoClass.increment();
}
}, "執行緒A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
demoClass.decrement();
}
}, "執行緒B").start();
}
}
class DemoClass {
//加減物件
private int number = 0;
//宣告鎖
private Lock lock = new ReentrantLock();
//宣告鑰匙
private Condition condition = lock.newCondition();
/**
* 加 1
*/
public void increment() {
try {
lock.lock();
while (number != 0) {
condition.await();
}
number++;
System.out.println("--------" + Thread.currentThread().getName() + "加一成 功----------,值為:" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 減一
*/
public void decrement() {
try {
lock.lock();
while (number == 0) {
condition.await();
}
number--;
System.out.println("--------" + Thread.currentThread().getName() + "減一成 功----------,值為:" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
如果wait寫在if中,多個執行緒搶佔出現wait虛假喚醒(在哪裡睡就在哪裡醒),所以if判斷完了後睡著在搶佔到的時候醒了就直接往下執行了
解決:把wait寫在while迴圈中
3 .3 執行緒間定製化通訊
3.3.1 案例介紹
== 問題: A執行緒列印 5 次A,B執行緒列印 10 次B,C執行緒列印 15 次C,按照此順序迴圈 10 輪 ==
3.3.2 實現流程
public class TestVolatile {
public static void main(String[] args) {
DemoClass demoClass = new DemoClass();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
demoClass.printA(i);
}
}, "A執行緒").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
demoClass.printB(i);
}
}, "B執行緒").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
demoClass.printC(i);
}
}, "C執行緒").start();
}
}
class DemoClass {
//通訊物件: 0--列印A 1---列印B 2----列印C
private int number = 0;
//宣告鎖
private Lock lock = new ReentrantLock();
//宣告鑰匙A
private Condition conditionA = lock.newCondition();
//宣告鑰匙B
private Condition conditionB = lock.newCondition();
//宣告鑰匙C
private Condition conditionC = lock.newCondition();
/**
* A列印 5 次
*/
public void printA(int j) {
try {
lock.lock();
while (number != 0) {
conditionA.await();
}
System.out.println(Thread.currentThread().getName() + "輸出A,第" + j + "輪開始");
//輸出 5 次A
for (int i = 0; i < 5; i++) {
System.out.println("A");
}
//開始列印B
number = 1;
//喚醒B
conditionB.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* B列印 10 次
*/
public void printB(int j) {
try {
lock.lock();
while (number != 1) {
conditionB.await();
}
System.out.println(Thread.currentThread().getName() + "輸出B,第" + j + "輪開始");
//輸出 10 次B
for (int i = 0; i < 10; i++) {
System.out.println("B");
}
//開始列印C
number = 2;
//喚醒C
conditionC.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* C列印 15 次
*/
public void printC(int j) {
try {
lock.lock();
while (number != 2) {
conditionC.await();
}
System.out.println(Thread.currentThread().getName() + "輸出C,第" + j + "輪開始");
//輸出 15 次C
for (int i = 0; i < 15; i++) {
System.out.println("C");
}
System.out.println("-----------------------------------------");
//開始列印A
number = 0;
//喚醒A
conditionA.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
synchronized實現同步的基礎:Java中的每一個物件都可以作為鎖
具體表現為以下3種形式
對於普通同步方法,鎖是當前例項的物件
對於靜態方法,鎖是當前類的Class物件
對於同步方法快,鎖是Synchronized括號裡配置的物件
4 集合的執行緒安全
4 .1 集合操作Demo
NotSafeDemo
/**
* 集合執行緒安全案例
*/
public class NotSafeDemo {
/**
- 多個執行緒同時對集合進行修改
- @param args
*/
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "執行緒" + i).start();
}
}
}
異常內容
java.util.ConcurrentModificationException
問題: 為什麼會出現併發修改異常?
檢視ArrayList的add方法原始碼
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
modCount++;
add(e, elementData, size);
return true;
}
== 那麼我們如何去解決List型別的執行緒安全問題? ==
使用以下三種方案,Vector,Collections,CopyOnWriteArrayList
4 .2 Vector
Vector 是 向量佇列 ,它是JDK1.0版本新增的類。繼承於AbstractList,實現了List, RandomAccess, Cloneable這些介面。 Vector 繼承了AbstractList,實現了List;所以, 它是一個佇列,支援相關的新增、刪除、修改、遍歷等功能 。 Vector 實現了RandmoAccess介面,即 提供了隨機訪問功能 。
RandmoAccess是java中用來被List實現,為List提供快速訪問功能的。在Vector中,我們即可以通過元素的序號快速獲取元素物件;這就是快速隨機訪問。 Vector 實現了Cloneable介面,即實現clone()函式。它能被克隆。
和ArrayList不同,Vector中的操作是執行緒安全的。
NotSafeDemo程式碼修改
/**
- 集合執行緒安全案例
*/
public class NotSafeDemo {
/**
- 多個執行緒同時對集合進行修改
*/
public static void main(String[] args) {
List list = new Vector();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "執行緒" + i).start();
}
}
}
現在沒有執行出現併發異常,為什麼?
檢視Vector的add方法
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
add(e, elementData, elementCount);
return true;
}
add方法被synchronized同步修飾,執行緒安全!因此沒有併發異常
4 .3 Collections
Collections提供了方法synchronizedList保證list是同步執行緒安全的
NotSafeDemo程式碼修改
public class NotSafeDemo {
/**
- 多個執行緒同時對集合進行修改
- @param args
*/
public static void main(String[] args) {
List list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "執行緒" + i).start();
}
}
}
沒有併發修改異常
檢視方法原始碼
/**
* Returns a synchronized (thread-safe) list backed by the specified
* list. In order to guarantee serial access, it is critical that
* <strong>all</strong> access to the backing list is accomplished
* through the returned list.<p>
*
* It is imperative that the user manually synchronize on the returned
* list when traversing it via {@link Iterator}, {@link Spliterator}
* or {@link Stream}:
* <pre>
* List list = Collections.synchronizedList(new ArrayList());
* ...
* synchronized (list) {
* Iterator i = list.iterator(); // Must be in synchronized block
* while (i.hasNext())
* foo(i.next());
* }
* </pre>
* Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned list will be serializable if the specified list is
* serializable.
*
* @param <T> the class of the objects in the list
* @param list the list to be "wrapped" in a synchronized list.
* @return a synchronized view of the specified list.
*/
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
4 .4 CopyOnWriteArrayList(重點)
首先我們對CopyOnWriteArrayList進行學習,其特點如下:
它相當於執行緒安全的ArrayList。和ArrayList一樣,它是個可變陣列;但是和ArrayList不同的時,它具有以下特性:
-
它最適合於具有以下特徵的應用程式:List 大小通常保持很小,只讀操作遠多於可變操作,需要在遍歷期間防止執行緒間的衝突。
-
它是執行緒安全的。
-
因為通常需要複製整個基礎陣列,所以可變操作(add()、set() 和 remove()等等)的開銷很大。
-
迭代器支援 hasNext(), next()等不可變操作,但不支援可變 remove()等操作。
-
使用迭代器進行遍歷的速度很快,並且不會與其他執行緒發生衝突。在構造迭代器時,迭代器依賴於不變的陣列快照。
-
獨佔鎖效率低:採用讀寫分離思想解決、
-
寫執行緒獲取到鎖,其他寫執行緒阻塞
-
複製思想:
當我們往一個容器新增元素的時候,不直接往當前容器新增,而是先將當前容器進行 Copy,複製出一個新的容器,然後新的容器裡新增元素,新增完元素之後,再將原容器的引用指向新的容器。
這時候會丟擲來一個新的問題,也就是資料不一致的問題。如果寫執行緒還沒來得及寫會記憶體,其他的執行緒就會讀到了髒資料。
== 這就是CopyOnWriteArrayList 的思想和原理。就是拷貝一份。 ==
-
NotSafeDemo程式碼修改
public class NotSafeDemo {
/**
* 多個執行緒同時對集合進行修改
*/
public static void main(String[] args) {
List list = new CopyOnWriteArrayList();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "執行緒" + i).start();
}
}
}
沒有執行緒安全問題
原因分析 ( 重點 ):== 動態陣列與執行緒安全 ==
下面從“動態陣列”和“執行緒安全”兩個方面進一步對CopyOnWriteArrayList的原理進行說明。
-
“動態陣列”機制
- 它內部有個“volatile陣列”(array)來保持資料。在“新增/修改/刪除”資料時,都會新建一個數組,並將更新後的資料拷貝到新建的陣列中,最後再將該陣列賦值給“volatile陣列”, 這就是它叫做CopyOnWriteArrayList的原因
- 由於它在“新增/修改/刪除”資料時,都會新建陣列,所以涉及到修改資料的操作,CopyOnWriteArrayList效率很低;但是單單只是進行遍歷查詢的話,效率比較高。
-
“執行緒安全”機制
- 通過volatile和互斥鎖來實現的。
- 通過“volatile陣列”來儲存資料的。一個執行緒讀取volatile陣列時,總能看到其它執行緒對該volatile變數最後的寫入;就這樣,通過volatile提供了“讀取到的資料總是最新的”這個機制的保證。
- 通過互斥鎖來保護資料。在“新增/修改/刪除”資料時,會先“獲取互斥鎖”,再修改完畢之後,先將資料更新到“volatile陣列”中,然後再“釋放互斥鎖”,就達到了保護資料的目的。
4 .5 小結(重點)
1.執行緒安全與執行緒不安全集合
集合型別中存線上程安全與執行緒不安全的兩種,常見例如:
ArrayList ----- Vector
HashMap -----HashTable
但是以上都是通過synchronized關鍵字實現,效率較低
2.Collections構建的執行緒安全集合
3.java.util.concurrent併發包下
CopyOnWriteArrayList,CopyOnWriteArraySet,ConcurrentHashMap型別,通過動態陣列與執行緒安全各個方面保證執行緒安全
5 多執行緒鎖
5 .1 鎖的八個問題演示
class Phone {
public static synchronized void sendSMS() throws Exception {
// 停留 4 秒
//TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
1 標準訪問,先列印簡訊還是郵件
------sendSMS
------sendEmail
2 停 4 秒在簡訊方法內,先列印簡訊還是郵件
------sendSMS
------sendEmail
3 新增普通的hello方法,是先打簡訊還是hello
------getHello
------sendSMS
4 現在有兩部手機,先列印簡訊還是郵件
------sendEmail
------sendSMS
5 兩個靜態同步方法, 1 部手機,先列印簡訊還是郵件
------sendSMS
------sendEmail
6 兩個靜態同步方法, 2 部手機,先列印簡訊還是郵件
------sendSMS
------sendEmail
7 1個靜態同步方法,1個普通同步方法, 1 部手機,先列印簡訊還是郵件
------sendEmail
------sendSMS
8 1個靜態同步方法,1個普通同步方法, 2 部手機,先列印簡訊還是郵件
------sendEmail
------sendSMS
結論:
一個物件裡面如果有多個synchronized方法,某一個時刻內,只要一個執行緒去呼叫其中的一個synchronized方法了,
其它的執行緒都只能等待,換句話說,某一個時刻內,只能有唯一一個執行緒去訪問這些
synchronized方法
鎖的是當前物件this,被鎖定後,其它的執行緒都不能進入到當前物件的其它的synchronized方法
加個普通方法後發現和同步鎖無關
換成兩個物件後,不是同一把鎖了,情況立刻變化。
synchronized實現同步的基礎:Java中的每一個物件都可以作為鎖。
具體表現為以下 3 種形式。
對於普通同步方法,鎖是當前例項物件。
對於靜態同步方法,鎖是當前類的Class物件。
對於同步方法塊,鎖是Synchonized括號裡配置的物件
當一個執行緒試圖訪問同步程式碼塊時,它首先必須得到鎖,退出或丟擲異常時必須釋放鎖。也就是說如果一個例項物件的非靜態同步方法獲取鎖後,該例項物件的其他非靜態同步方法必須等待獲取鎖的方法釋放鎖後才能獲取鎖,
可是別的例項物件的非靜態同步方法因為跟該例項物件的非靜態同步方法用的是不同的鎖,所以毋須等待該例項物件已獲取鎖的非靜態同步方法釋放鎖就可以獲取他們自己的鎖。所有的靜態同步方法用的也是同一把鎖——類物件本身,這兩把鎖是兩個不同的物件,所以靜態同步方法與非靜態同步方法之間是不會有競態條件的。
但是一旦一個靜態同步方法獲取鎖後,其他的靜態同步方法都必須等待該方法釋放鎖後才能獲取鎖,而不管是同一個例項物件的靜態同步方法之間,還是不同的例項物件的靜態同步方法之間,只要它們同一個類的例項物件!
6 Callable&Future 介面
6 .1 Callable介面
目前我們學習了有兩種建立執行緒的方法-一種是通過建立Thread類,另一種是通過使用Runnable建立執行緒。但是,Runnable缺少的一項功能是,當執行緒終止時(即run()完成時),我們無法使執行緒返回結果。為了支援此功能,
Java中提供了Callable介面。
== 現在我們學習的是建立執行緒的第三種方案---Callable介面 ==
Callable介面的特點如下(重點)
-
為了實現Runnable,需要實現不返回任何內容的run()方法,而對於Callable,需要實現在完成時返回結果的call()方法。
-
call()方法可以引發異常,而run()則不能。
-
為實現Callable而必須重寫call方法
-
不能直接替換runnable,因為Thread類的構造方法根本沒有Callable
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//runnable
new Thread(new MyThread(), "AA").start();
//callable
FutureTask<Integer> integerFutureTask = new FutureTask<>(new MyThread2());
new Thread(integerFutureTask, "BB").start();
System.out.println(integerFutureTask.get());
}
}
class MyThread implements Runnable {
@Override
public void run() {
System.out.println("Runnable");
}
}
// 新類MyThread2實現callable介面
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 200;
}
}
6 .2 Future介面
當call()方法完成時,結果必須儲存在主執行緒已知的物件中,以便主執行緒可以知道該執行緒返回的結果。為此,可以使用Future物件。
將Future視為儲存結果的物件–它可能暫時不儲存結果,但將來會儲存(一旦Callable返回)。Future基本上是主執行緒可以跟蹤進度以及其他執行緒的結果的一種方式。要實現此介面,必須重寫 5 種方法,這裡列出了重要的方法,如下:
-
public boolean cancel(boolean mayInterrupt):
用於停止任務。
如果尚未啟動,它將停止任務。如果已啟動,則僅在mayInterrupt為true時才會中斷任務。
-
public Object get()丟擲InterruptedException,ExecutionException:
用於獲取任務的結果。
如果任務完成,它將立即返回結果,否則將等待任務完成,然後返回結果。
-
public boolean isDone():
如果任務完成,則返回true,否則返回false
可以看到Callable和Future做兩件事-Callable與Runnable類似,因為它封裝了要在另一個執行緒上執行的任務,而Future用於儲存從另一個執行緒獲得的結果。實際上,future也可以與Runnable一起使用。要建立執行緒,需要Runnable。為了獲得結果,需要future。
6. 3 FutureTask
Java庫具有具體的FutureTask型別,該型別實現Runnable和Future,並方便地將兩種功能組合在一起。 可以通過為其建構函式提供Callable來建立FutureTask。然後,將FutureTask物件提供給Thread的建構函式以建立Thread物件。因此,間接地使用Callable建立執行緒。
核心原理:(重點)
在主執行緒中需要執行比較耗時的操作時,但又不想阻塞主執行緒時,可以把這些作業交給Future物件在後臺完成
-
當主執行緒將來需要時,就可以通過Future物件獲得後臺作業的計算結果或者執行狀態
-
一般FutureTask多用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果。
-
僅在計算完成時才能檢索結果;如果計算尚未完成,則阻塞 get 方法
-
一旦計算完成,就不能再重新開始或取消計算
-
get方法而獲取結果只有在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態,然後會返回結果或者丟擲異常
-
get只計算一次,因此get方法放到最後
6. 4 使用Callable和Future
如上述示例程式碼
6. 5 小結(重點)
-
在主執行緒中需要執行比較耗時的操作時,但又不想阻塞主執行緒時,可以把這些作業交給Future物件在後臺完成, 當主執行緒將來需要時,就可以通過Future物件獲得後臺作業的計算結果或者執行狀態
-
一般FutureTask多用於耗時的計算,主執行緒可以在完成自己的任務後,再去獲取結果
-
僅在計算完成時才能檢索結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。get方法而獲取結果只有在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態,然後會返回結果或者丟擲異常。
-
只計算一次
7 JUC 三大輔助類
JUC中提供了三種常用的輔助類,通過這些輔助類可以很好的解決執行緒數量過多時Lock鎖的頻繁操作。這三種輔助類為:
- CountDownLatch: 減少計數
- CyclicBarrier: 迴圈柵欄
- Semaphore: 訊號燈
下面我們分別進行詳細的介紹和學習
7 .1 減少計數CountDownLatch
CountDownLatch類可以設定一個計數器,然後通過countDown方法來進行減 1 的操作,使用await方法等待計數器不大於 0 ,然後繼續執行await方法之後的語句。
-
CountDownLatch主要有兩個方法,當一個或多個執行緒呼叫await方法時,這些執行緒會阻塞
-
其它執行緒呼叫countDown方法會將計數器減1(呼叫countDown方法的執行緒不會阻塞)
-
當計數器的值變為 0 時,因await方法阻塞的執行緒會被喚醒,繼續執行
場景: 6個同學陸續離開教室後值班同學才可以關門。
CountDownLatchDemo
public class CountDownLatchDemo {
/**
* 6個同學陸續離開教室後值班同學才可以關門
*/
public static void main(String[] args) throws Exception {
//定義一個數值為 6 的計數器
CountDownLatch countDownLatch = new CountDownLatch(6);
//建立 6 個同學
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
if (Thread.currentThread().getName().equals("同學6")) {
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName() + "離開了");
//計數器減一,不會阻塞
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}, "同學" + i).start();
}
//主執行緒await休息
System.out.println("主執行緒睡覺");
countDownLatch.await();
//全部離開後自動喚醒主執行緒
System.out.println("全部離開了,現在的計數器為" + countDownLatch.getCount());
}
}
7 .2 迴圈柵欄CyclicBarrier
CyclicBarrier看英文單詞可以看出大概就是迴圈阻塞的意思,在使用中CyclicBarrier0的構造方法第一個引數是目標障礙數,每次執行CyclicBarrier一次障礙數會加一,如果達到了目標障礙數,才會執行cyclicBarrier.await()之後的語句。可以將CyclicBarrier理解為加 1 操作
場景: 集齊 7 顆龍珠就可以召喚神龍
CyclicBarrierDemo
public class CyclicBarrierDemo {
//定義神龍召喚需要的龍珠總數
private final static int NUMBER = 7;
/**
* 集齊 7 顆龍珠就可以召喚神龍
*/
public static void main(String[] args) {
//定義迴圈柵欄
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("集齊" + NUMBER + "顆龍珠,現在召喚神龍!!!!!!!!!");
});
//定義 7 個執行緒分別去收集龍珠
for (int i = 1; i <= 7; i++)
new Thread(() -> {
try {
if (Thread.currentThread().getName().equals("龍珠3號")) {
System.out.println("龍珠 3 號搶奪戰開始,孫悟空開啟超級賽亞人模式!");
Thread.sleep(5000);
System.out.println("龍珠 3 號搶奪戰結束,孫悟空打贏了,拿到了龍珠 3 號!");
} else {
System.out.println(Thread.currentThread().getName() + "收集到 了!!!!");
}
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, "龍珠" + i + "號").start();
}
}
7 .3 訊號燈Semaphore
Semaphore的構造方法中傳入的第一個引數是最大訊號量(可以看成最大執行緒池),每個訊號量初始化為一個最多隻能分發一個許可證。使用acquire方法獲得許可證,release方法釋放許可
場景: 搶車位, 6部汽車 3 個停車位
SemaphoreDemo
public class SemaphoreDemo {
/**
* 搶車位, 10部汽車 1 個停車位
*/
public static void main(String[] args) throws Exception {
//定義 3 個停車位
Semaphore semaphore = new Semaphore(3);
//模擬 6 輛汽車停車
for (int i = 1; i <= 6; i++) {
Thread.sleep(100);
//停車
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "找車位ing");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "汽車停車成功!");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "溜了溜了");
semaphore.release();
}
}, "汽車" + i).start();
}
}
}
8 讀寫鎖
8 .1 讀寫鎖介紹
讀寫鎖的演變
現實中有這樣一種場景:對共享資源有讀和寫的操作,且寫操作沒有讀操作那麼頻繁。在沒有寫操作的時候,多個執行緒同時讀一個資源沒有任何問題,所以應該允許多個執行緒同時讀取共享資源;但是如果一個執行緒想去寫這些共享資源,就不應該允許其他執行緒對該資源進行讀和寫的操作了。
針對這種場景, JAVA的併發包提供了讀寫鎖ReentrantReadWriteLock,它表示兩個鎖,一個是讀操作相關的鎖,稱為共享鎖;一個是寫相關的鎖,稱為排他鎖
- 執行緒進入讀鎖的前提條件:
- 沒有其他執行緒的寫鎖
- 沒有寫請求, 或者有寫請求,但呼叫執行緒和持有鎖的執行緒是同一個(可重入鎖)。
- 執行緒進入寫鎖的前提條件:
- 沒有其他執行緒的讀鎖
- 沒有其他執行緒的寫鎖
而讀寫鎖有以下三個重要的特性:
( 1 )公平選擇性:支援非公平(預設)和公平的鎖獲取方式,吞吐量還是非公平優於公平。
( 2 )重進入:讀鎖和寫鎖都支援執行緒重進入。
( 3 )鎖降級:遵循獲取寫鎖、獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖。
讀寫鎖的降級
8 .2 ReentrantReadWriteLock
ReentrantReadWriteLock 類的整體結構
public class Demo02 {
public static void main(String[] args) {
//可重入讀寫鎖物件
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//讀鎖
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
//寫鎖
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
//鎖降級
//1.獲取寫鎖
writeLock.lock();
System.out.println("---write");
//2獲取讀鎖
readLock.lock();
System.out.println("---read");
// 3釋放寫鎖
writeLock.unlock();
// 4釋放讀鎖
readLock.unlock();
}
}
可以看到,ReentrantReadWriteLock實現了ReadWriteLock介面,ReadWriteLock介面定義了獲取讀鎖和寫鎖的規範,具體需要實現類去實現;
同時其還實現了Serializable介面,表示可以進行序列化,在原始碼中可以看到ReentrantReadWriteLock實現了自己的序列化邏輯。
8 .3 入門案例
場景: 使用ReentrantReadWriteLock 對一個hashmap進行讀和寫操作
8 .3.1 實現案例
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//建立執行緒放資料
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
myCache.put(num + "", num + "");
}, String.valueOf(i)).start();
}
//建立執行緒取資料
for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(() -> {
myCache.get(num + "");
}, String.valueOf(i)).start();
}
}
}
//資源類
class MyCache {
//建立map集合
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//放資料
public void put(String key, Object value) {
//新增寫鎖
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在寫操作" + key);
//暫停一會
TimeUnit.MILLISECONDS.sleep(300);
//放資料
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "寫完了" + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放寫鎖
readWriteLock.writeLock().unlock();
}
}
//取資料
public Object get(String key) {
//新增讀鎖
readWriteLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "正在讀取操作" + key);
//暫停一會
TimeUnit.MILLISECONDS.sleep(300);
//取資料
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "取完了" + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放讀鎖
readWriteLock.readLock().unlock();
}
return result;
}
}
8. 4 小結(重要)
-
線上程持有讀鎖的情況下,該執行緒不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被佔用,就馬上獲取失敗,不管讀鎖是不是被當前執行緒持有)。
-
線上程持有寫鎖的情況下,該執行緒可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被佔用,只有寫鎖沒有被當前執行緒佔用的情況才會獲取失敗)。
原因: 當執行緒獲取讀鎖的時候,可能有其他執行緒同時也在持有讀鎖,因此不能把獲取讀鎖的執行緒“升級”為寫鎖;而對於獲得寫鎖的執行緒,它一定獨佔了讀寫鎖,因此可以繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖後,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀鎖。
9 阻塞佇列
9 .1 BlockingQueue簡介
Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
阻塞佇列,顧名思義,首先它是一個佇列, 通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
當佇列是空的,從佇列中獲取元素的操作將會被阻塞
當佇列是滿的,從佇列中新增元素的操作將會被阻塞
試圖從空的佇列中獲取元素的執行緒將會被阻塞,直到其他執行緒往空的佇列插入新的元素
試圖向已滿的佇列中新增新元素的執行緒將會被阻塞,直到其他執行緒從佇列中移除一個或多個元素或者完全清空,使佇列變得空閒起來並後續新增
常用的佇列主要有以下兩種:
-
先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性
-
後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件(棧)
在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起
的執行緒又會自動被喚起
為什麼需要BlockingQueue
好處是我們不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切
BlockingQueue都給你一手包辦了
在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。
多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。
-
當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列
-
當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒
9 .2 BlockingQueue核心方法
BlockingQueue的核心方法 :
1.放入資料
-
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false. (本方法不阻塞當前執行方法的執行緒)
-
offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗
-
put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
2.獲取資料
-
poll(time): 取走BlockingQueue裡排在首位的物件,若不能立即取出, 則可以等time引數規定的時間,取不到時返回null
-
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
-
take(): 取走BlockingQueue裡排在首位的物件,若BlockingQueue為空, 阻斷進入等待狀態直到BlockingQueue有新的資料被加入 ;
-
drainTo(): 一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
9 .3 入門案例
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// List list = new ArrayList();
BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//第一組
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.element());
// System.out.println(blockingQueue.add("x"));
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// 第二組
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("x"));
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// 第三組
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// blockingQueue.put("x");
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// 第四組
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("a", 3L, TimeUnit.SECONDS));
}
}
9 .4 常見的BlockingQueue
9 .4.1 ArrayBlockingQueue(常用)
基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和
LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。
== 一句話總結: 由陣列結構組成的有界阻塞佇列。 ==
9 .4.2 LinkedBlockingQueue(常用)
基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。
== 一句話總結: 由連結串列結構組成的有界(但大小預設值為integer.MAX_VALUE)阻塞佇列。 ==
9 .4.3 DelayQueue
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
== 一句話總結: 使用優先順序佇列實現的延遲無界阻塞佇列。 ==
9 .4.4 PriorityBlockingQueue
基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並 不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者 。
因此使用的時候要特別注意, 生產者生產資料的速度絕對不能快於消費者消費資料的速度 ,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是 公平鎖 。
== 一句話總結: 支援優先順序排序的無界阻塞佇列。 ==
9 .4.5 SynchronousQueue
一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。
公平模式和非公平模式的區別:
-
公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
-
非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
== 一句話總結: 不儲存元素的阻塞佇列,也即單個元素的佇列。 ==
9 .4.6 LinkedTransferQueue
LinkedTransferQueue是一個由連結串列結構組成的無界阻塞TransferQueue佇列。相對於其他阻塞佇列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue採用一種預佔模式。意思就是消費者執行緒取元素時,如果佇列不為空,則直接取走資料,若佇列為空,那就生成一個節點(節點元素為null)入隊,然後消費者執行緒被等待在這個節點上,後面生產者執行緒入隊時發現有一個元素為null的節點,生產者執行緒就不入隊了,直接就將元素填充到該節點,並喚醒該節點等待的執行緒,被喚醒的消費者執行緒取走元素,從呼叫的方法返回。
== 一句話總結: 由連結串列組成的無界阻塞佇列。 ==
9 .4.7 LinkedBlockingDeque
LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列,即可以從佇列的兩端插入和移除元素。
對於一些指定的操作,在插入或者獲取佇列元素時如果佇列狀態不允許該操作可能會阻塞住該執行緒直到佇列狀態變更為允許操作,這裡的阻塞一般有兩種情況
-
插入元素時: 如果當前佇列已滿將會進入阻塞狀態,一直等到佇列有空的位置時再講該元素插入,該操作可以通過設定超時引數,超時後返回 false 表示操作失敗,也可以不設定超時引數一直阻塞,中斷後丟擲InterruptedException異常
-
讀取元素時: 如果當前佇列為空會阻塞住直到佇列不為空然後返回元素,同樣可以通過設定超時引數
== 一句話總結: 由連結串列組成的雙向阻塞佇列 ==
9 .5 小結
-
在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚起
-
為什麼需要BlockingQueue? 在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。使用後我們不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了
10 ThreadPool 執行緒池
10 .1 執行緒池簡介
執行緒池(英語:thread pool):一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。
例子: 10 年前單核CPU電腦,假的多執行緒,像馬戲團小丑玩多個球,CPU需要來回切換。 現在是多核電腦,多個執行緒各自跑在獨立的CPU上,不用切換效率高。
執行緒池的優勢: 執行緒池做的工作只要是控制執行的執行緒數量,處理過程中將任務放入佇列,然後線上程建立後啟動這些任務,如果執行緒數量超過了最大數量,超出數量的執行緒排隊等候,等其他執行緒執行完畢,再從佇列中取出任務來執行。
它的主要特點為:
-
降低資源消耗: 通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的銷耗。
-
提高響應速度: 當任務到達時,任務可以不需要等待執行緒建立就能立即執行。
-
提高執行緒的可管理性: 執行緒是稀缺資源,如果無限制的建立,不僅會銷耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
-
Java中的執行緒池是通過Executor框架實現的,該框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor這幾個類
10 .2 執行緒池引數說明
本次介紹 5 種類型的執行緒池
10 .2.1常用引數(重點)
- corePoolSize執行緒池的核心執行緒數
- maximumPoolSize能容納的最大執行緒數
- keepAliveTime空閒執行緒存活時間
- unit 存活的時間單位
- workQueue 存放提交但未執行任務的佇列
- threadFactory 建立執行緒的工廠類
- handler 等待佇列滿後的拒絕策略
執行緒池中,有三個重要的引數,決定影響了拒絕策略:
corePoolSize - 核心執行緒數,也即最小的執行緒數。
workQueue - 阻塞佇列 。
maximumPoolSize -最大執行緒數
當提交任務數大於 corePoolSize 的時候,會優先將任務放到 workQueue 阻塞佇列中。當阻塞佇列飽和後,會擴充執行緒池中執行緒數,直到達到maximumPoolSize 最大執行緒數配置。此時,再多餘的任務,則會觸發執行緒池的拒絕策略了。
總結起來,也就是一句話, 當提交的任務數大於(workQueue.size() +maximumPoolSize ),就會觸發執行緒池的拒絕策略 。
10 .2.2 拒絕策略(重點)
CallerRunsPolicy : 當觸發拒絕策略,只要執行緒池沒有關閉的話,則使用呼叫執行緒直接執行任務。一般併發比較小,效能要求不高,不允許失敗。但是,由於呼叫者自己執行任務,如果任務提交速度過快,可能導致程式阻塞,效能效率上必然的損失較大
AbortPolicy : 丟棄任務,並丟擲拒絕執行 RejectedExecutionException 異常資訊。執行緒池預設的拒絕策略。必須處理好丟擲的異常,否則會打斷當前的執行流程,影響後續的任務執行。
DiscardPolicy : 直接丟棄,其他啥都沒有
DiscardOldestPolicy : 當觸發拒絕策略,只要執行緒池沒有關閉的話,丟棄阻塞佇列 workQueue 中最老的一個任務,並將新任務加入
10 .3 執行緒池的種類與建立
10 .3.1 newCachedThreadPool(常用)
作用 :建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒.
特點 :
- 執行緒池中數量沒有固定,可達到最大值(Interger. MAX_VALUE)
- 執行緒池中的執行緒可進行快取重複利用和回收(回收預設時間為 1 分鐘)
- 當執行緒池中,沒有可用執行緒,會重新建立一個執行緒
建立方式:
ExecutorService threadPool3 = Executors.newCachedThreadPool();
場景: 適用於建立一個可無限擴大的執行緒池,伺服器負載壓力較輕,執行時間較短,任務多的場景
10 .3.2 newFixedThreadPool(常用)
作用 :建立一個可重用固定執行緒數的執行緒池,以共享的無界佇列方式來執行這些執行緒。在任意點,在大多數執行緒會處於處理任務的活動狀態。如果在所有執行緒處於活動狀態時提交附加任務,則在有可用執行緒之前,附加任務將在佇列中等待。如果在關閉前的執行期間由於失敗而導致任何執行緒終止,那麼一個新執行緒將代替它執行後續的任務(如果需要)。在某個執行緒被顯式地關閉之前,池中的執行緒將一直存在。
特徵:
- 執行緒池中的執行緒處於一定的量,可以很好的控制執行緒的併發量
- 執行緒可以重複被使用,在顯示關閉之前,都將一直存在
- 超出一定量的執行緒被提交時候需在佇列中等待
建立方式 :
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
場景: 適用於可以預測執行緒數量的業務中,或者伺服器負載較重,對執行緒數有嚴格限制的場景
10 .3.3 newSingleThreadExecutor(常用)
作用 :建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個執行緒,那麼如果需要,一個新執行緒將代替它執行後續的任務)。可保證順序地執行各個任務,並且在任意給定的時間不會有多個執行緒是活動的。與其他等效的newFixedThreadPool不同,可保證無需重新配置此方法所返回的執行程式即可使用其他的執行緒。
特徵: 執行緒池中最多執行 1 個執行緒,之後提交的執行緒活動將會排在佇列中以此執行
建立方式:
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
場景: 適用於需要保證順序執行各個任務,並且在任意時間點,不會同時有多個執行緒的場景
10 .3.4 newScheduleThreadPool(瞭解)
作用: 執行緒池支援定時以及週期性執行任務,建立一個corePoolSize為傳入引數,最大執行緒數為整形的最大數的執行緒池
特徵:
( 1 )執行緒池中具有指定數量的執行緒,即便是空執行緒也將保留
( 2 )可定時或者延遲執行執行緒活動
建立方式:
ExecutorService threadPool4 = Executors.newScheduledThreadPool(5);
場景: 適用於需要多個後臺執行緒執行週期任務的場景
10 .3.5 newWorkStealingPool
jdk1.8提供的執行緒池,底層使用的是ForkJoinPool實現,建立一個擁有多個任務佇列的執行緒池,可以減少連線數,建立當前可用cpu核數的執行緒來並行執行任務
建立方式:
ExecutorService executorService = Executors.newWorkStealingPool();
場景: 適用於大耗時,可並行執行的場景
10 .4 執行緒池入門案例
場景: 火車站 3 個售票口, 10個使用者買票
public class ThreadPoolDemo1 {
/**
* 火車站 3 個售票口, 10個使用者買票
*/
public static void main(String[] args) {
//定時執行緒次:執行緒數量為 3 ---視窗數為 3
ExecutorService threadService = new ThreadPoolExecutor(3,
3,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
//10個人買票
for (int i = 1; i <= 10; i++) {
threadService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "視窗,開始賣票");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "視窗買票結束");
} catch (Exception e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//完成後結束
threadService.shutdown();
}
}
}
10 .5 執行緒池底層工作原理(重要)
-
在建立了執行緒池後,執行緒池中的執行緒數為零
-
當呼叫execute()方法新增一個請求任務時,執行緒池會做出如下判斷:
2.1 如果正在執行的執行緒數量小於corePoolSize,那麼馬上建立執行緒執行這個任務;
2.2 如果正在執行的執行緒數量大於或等於corePoolSize,那麼將這個任務放入佇列;
2.3 如果這個時候佇列滿了且正在執行的執行緒數量還小於maximumPoolSize,那麼還是要建立非核心執行緒立刻執行這個任務;
2.4 如果佇列滿了且正在執行的執行緒數量大於或等於maximumPoolSize,那麼執行緒池會啟動飽和拒絕策略來執行。
-
當一個執行緒完成任務時,它會從佇列中取下一個任務來執行
-
當一個執行緒無事可做超過一定的時間(keepAliveTime)時,執行緒會判斷:
4.1 如果當前執行的執行緒數大於corePoolSize,那麼這個執行緒就被停掉。
4.2所以執行緒池的所有任務完成後,它最終會收縮到corePoolSize的大小。
10 .6 注意事項(重要)
-
專案中建立多執行緒時,使用常見的三種執行緒池建立方式,單一、可變、定長都有一定問題,原因是FixedThreadPool和SingleThreadExecutor底層都是用LinkedBlockingQueue實現的,這個佇列最大長度為Integer.MAX_VALUE,容易導致OOM。所以實際生產一般自己通過ThreadPoolExecutor的 7 個引數,自定義執行緒池
-
建立執行緒池推薦適用ThreadPoolExecutor及其 7 個引數手動建立
- corePoolSize執行緒池的核心執行緒數
- maximumPoolSize能容納的最大執行緒數
- keepAliveTime空閒執行緒存活時間
- unit 存活的時間單位
- workQueue 存放提交但未執行任務的隊
- threadFactory 建立執行緒的工廠類
- handler 等待佇列滿後的拒絕策略
-
為什麼不允許適用不允許Executors.的方式手動建立執行緒池,如下圖
11 Fork/Join
11 .1 Fork/Join框架簡介
Fork/Join它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,並進行輸出。Fork/Join框架要完成兩件事情:
Fork:把一個複雜任務進行分拆,大事化小
Join:把分拆任務的結果進行合併
-
任務分割 :首先Fork/Join框架需要把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割
-
執行任務併合並結果 :分割的子任務分別放到雙端佇列裡,然後幾個啟動執行緒分別從雙端佇列裡獲取任務執行。子任務執行完的結果都放在另外一個佇列裡,啟動一個執行緒從佇列裡取資料,然後合併這些資料。
在Java的Fork/Join框架中,使用兩個類完成上述操作
-
ForkJoinTask :我們要使用Fork/Join框架,首先需要建立一個ForkJoin任務。該類提供了在任務中執行fork和join的機制。通常情況下我們不需要直接整合ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了兩個子類:
- a.RecursiveAction:用於沒有返回結果的任務
- b.RecursiveTask:用於有返回結果的任務
-
ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行
-
RecursiveTask : 繼承後可以實現遞迴(自己調自己)呼叫的任務
Fork/Join框架的實現原理
ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責將存放以及將程式提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。
11 .2 Fork方法
Fork方法的實現原理: 當我們呼叫ForkJoinTask的fork方法時,程式會把任務放在ForkJoinWorkerThread的pushTask的 workQueue 中,非同步地執行這個任務,然後立即返回結果
public final ForkJoinTask fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread) t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
pushTask方法把當前任務存放在ForkJoinTask陣列佇列裡。然後再呼叫ForkJoinPool的signalWork()方法喚醒或建立一個工作執行緒來執行任務。程式碼如下:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);//執行
} else if (n >= m)
growArray();
}
}
11 .3 join方法
Join方法的主要作用是阻塞當前執行緒並等待獲取結果。讓我們一起看看ForkJoinTask的join方法的實現,程式碼如下:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
它首先呼叫doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回
什麼結果,任務狀態有 4 種:
已完成(NORMAL)、被取消(CANCELLED)、訊號(SIGNAL)和出現異常(EXCEPTIONAL)
- 如果任務狀態是已完成,則直接返回任務結果。
- 如果任務狀態是被取消,則直接丟擲CancellationException
- 如果任務狀態是丟擲異常,則直接丟擲對應的異常
讓我們分析一下doJoin方法的實現
final int doExec() {
int s;
boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
在doJoin()方法流程如下:
-
首先通過檢視任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;
-
如果沒有執行完,則從任務數組裡取出任務並執行。
-
如果任務順利執行完成,則設定任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設定為EXCEPTIONAL。
11 .4 Fork/Join框架的異常處理
ForkJoinTask在執行的時候可能會丟擲異常,但是我們沒辦法在主執行緒裡直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經丟擲異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。
getException方法返回Throwable物件,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有丟擲異常則返回null。
11 .5 入門案例
場景: 生成一個計算任務,計算1+2+3.........+1000 , 每 100 個數切分一個子任務
/**
- 遞迴累加
*/
class TaskExample extends RecursiveTask<Long> {
private int start;
private int end;
private long sum;
/**
- 建構函式
- @param start
- @param end
*/
public TaskExample(int start, int end) {
this.start = start;
this.end = end;
}
/**
- The main computation performed by this task.
-
- @return the result of the computation
*/
@Override
protected Long compute() {
System.out.println("任務" + start + "=========" + end + "累加開始");
//大於 100 個數相加切分,小於直接加
if (end - start <= 100) {
for (int i = start; i <= end; i++) {
//累加
sum += i;
}
} else {
//切分為 2 塊
int middle = start + 100;
//遞迴呼叫,切分為 2 個小任務
TaskExample taskExample1 = new TaskExample(start, middle);
TaskExample taskExample2 = new TaskExample(middle + 1, end);
//執行:非同步
taskExample1.fork();
taskExample2.fork();
//同步阻塞獲取執行結果
sum = taskExample1.join()+taskExample2.join();
}
//加完返回
return sum;
}
}
/**
* 分支合併案例
*/
public class ForkJoinDemo {
/**
* 生成一個計算任務,計算1+2+3.........+1000
*/
public static void main(String[] args) {
//定義任務
TaskExample taskExample = new TaskExample(1, 1000);
//定義執行物件
ForkJoinPool forkJoinPool = new ForkJoinPool();
//加入任務執行
ForkJoinTask result = forkJoinPool.submit(taskExample);
//輸出結果
try {
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
forkJoinPool.shutdown();
}
}
}
12 CompletableFuture
12 .1 CompletableFuture簡介
CompletableFuture在Java裡面被用於非同步程式設計,非同步通常意味著非阻塞,可以使得我們的任務單獨執行在與主執行緒分離的其他執行緒中,並且通過回撥可以在主執行緒中得到非同步任務的執行狀態,是否完成,和是否異常等資訊。
CompletableFuture實現了Future, CompletionStage介面,實現了Future介面就可以相容現在有執行緒池框架,而CompletionStage接口才是非同步程式設計的介面抽象,裡面定義多種非同步方法,通過這兩者集合,從而打造出了強大的CompletableFuture類。
12 .2 Future與CompletableFuture
Futrue在Java裡面,通常用來表示一個非同步任務的引用,比如我們將任務提交到執行緒池裡面,然後我們會得到一個Futrue,在Future裡面有isDone方法來 判斷任務是否處理結束,還有get方法可以一直阻塞直到任務結束然後獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或者不斷輪詢才能知道任務是否完成。
Future的主要缺點如下:
( 1 )不支援手動完成
我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果通知到正在執行的執行緒,所以必須主動取消或者一直等待它執行完成
( 2 )不支援進一步的非阻塞呼叫
通過Future的get方法會一直阻塞到任務完成,但是想在獲取任務之後執行額外的任務,因為Future不支援回撥函式,所以無法實現這個功能
( 3 )不支援鏈式呼叫
對於Future的執行結果,我們想繼續傳到下一個Future處理使用,從而形成一個鏈式的pipline呼叫,這在Future中是沒法實現的。
( 4 )不支援多個Future合併
比如我們有 10 個Future並行執行,我們想在所有的Future執行完畢之後,執行某些函式,是沒法通過Future實現的。
( 5 )不支援異常處理
Future的API沒有任何的異常處理的api,所以在非同步執行時,如果出了問題是不好定位的。
12 .3 CompletableFuture入門
12 .3.1 使用CompletableFuture
場景:主執行緒裡面建立一個CompletableFuture,然後主執行緒呼叫get方法會阻塞,最後我們在一個子執行緒中使其終止。
/**
* - 主執行緒裡面建立一個CompletableFuture,然後主執行緒呼叫get方法會阻塞,最後我們
* 在一個子執行緒中使其終止
*/
public static void main(String[] args) throws Exception {
CompletableFuture future = new CompletableFuture<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "子執行緒開始幹活");
//子執行緒睡 5 秒
Thread.sleep(5000);
//在子執行緒中完成主執行緒
future.complete("success");
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
//主執行緒呼叫get方法阻塞
System.out.println("主執行緒呼叫get方法獲取結果為: " + future.get());
System.out.println("主執行緒完成,阻塞結束!!!!!!");
}
12 .3.2 沒有返回值的非同步任務
/**
* 沒有返回值的非同步任務
*/
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
//執行一個沒有返回值的非同步任務
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
System.out.println("子執行緒啟動幹活");
Thread.sleep(5000);
System.out.println("子執行緒完成");
} catch (Exception e) {
e.printStackTrace();
}
});
//主執行緒阻塞
future.get();
System.out.println("主執行緒結束");
}
12 .3.3 有返回值的非同步任務
/**
* 沒有返回值的非同步任務
*/
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
//執行一個有返回值的非同步任務
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子執行緒開始任務");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return "子執行緒完成了!";
});
//主執行緒阻塞
Object o = future.get();
System.out.println("主執行緒結束, 子執行緒的結果為:" + o);
}
12 .3.4 執行緒依賴
當一個執行緒依賴另一個執行緒時,可以使用 thenApply 方法來把這兩個執行緒序列化。
private static Integer num = 10;
/**
* 先對一個數加10,然後取平方
*/
public static void main(String[] args) throws Exception{
System.out.println("主執行緒開始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("加 10 任務開始");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply(integer -> {
return num * num;
});
Object integer = future.get();
System.out.println("主執行緒結束, 子執行緒的結果為:" + integer);
}
12 .3.5 消費處理結果
thenAccept 消費處理結果, 接收任務的處理結果,並消費處理,無返回結果。
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
CompletableFuture.supplyAsync(() -> {
try {
System.out.println("加 10 任務開始");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply(integer -> {
return num * num;
}).thenAccept(new Consumer() {
@Override
public void accept(Object integer) {
System.out.println("子執行緒全部處理完成,最後呼叫了accept,結果為:" + integer);
}
});
}
12 3.6 異常處理
exceptionally異常處理,出現異常時觸發
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
System.out.println("加 10 任務開始");
num += 10;
return num;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return -1;
});
System.out.println(future.get());
}
handle類似於thenAccept/thenRun方法,是最後一步的處理呼叫,但是同時可以處理異常
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任務開始");
num += 10;
return num;
}).handle((i, ex) -> {
System.out.println("進入handle方法");
if (ex != null) {
System.out.println("發生了異常,內容為:" + ex.getMessage());
return -1;
} else {
System.out.println("正常完成,內容為: " + i);
return i;
}
});
System.out.println(future.get());
}
12 .3.7 結果合併
thenCompose合併兩個有依賴關係的CompletableFutures的執行結果
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
//第一步加 10
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任務開始");
num += 10;
return num;
});
//合併
CompletableFuture future1 = future.thenCompose(i ->
//再來一個CompletableFuture
CompletableFuture.supplyAsync(() -> {
return i + 1;
}));
System.out.println(future.get());
System.out.println(future1.get());
}
thenCombine合併兩個沒有依賴關係的CompletableFutures任務
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
CompletableFuture job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任務開始");
num += 10;
return num;
});
CompletableFuture job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任務開始");
num = num * 10;
return num;
});
//合併兩個結果
CompletableFuture future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List>() {
@Override
public List apply(Integer a, Integer b) {
List list = new ArrayList<>();
list.add(a);
list.add(b);
return list;
}
});
System.out.println("合併結果為:" + future.get());
}
合併多個任務的結果allOf與anyOfallOf: 一系列獨立的future任務,等其所有的任務執行完後做一些事情
private static Integer num = 10;
/**
* 先對一個數加10,然後取平方
*/
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
List<CompletableFuture<Integer>> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任務開始");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任務開始");
num = num * 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
System.out.println("減以 10 任務開始");
num = num * 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
System.out.println("除以 10 任務開始");
num = num * 10;
return num;
});
list.add(job4);
//多工合併
List<Integer> collect = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(collect);
}
anyOf : 只要在多個future裡面有一個返回,整個任務就可以結束,而不需要等到每一個
future結束
private static Integer num = 10;
/**
* 先對一個數加10,然後取平方@param args
*/
public static void main(String[] args) throws Exception {
System.out.println("主執行緒開始");
CompletableFuture[] futures = new CompletableFuture[4];
CompletableFuture job1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("加 10 任務開始");
num += 10;
return num;
} catch (Exception e) {
return 0;
}
});
futures[0] = job1;
CompletableFuture job2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("乘以 10 任務開始");
num = num * 10;
return num;
} catch (Exception e) {
return 1;
}
});
futures[1] = job2;
CompletableFuture job3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("減以 10 任務開始");
num = num * 10;
return num;
} catch (Exception e) {
return 2;
}
});
futures[2] = job3;
CompletableFuture job4 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
System.out.println("除以 10 任務開始");
num = num * 10;
return num;
} catch (Exception e) {
return 3;
}
});
futures[3] = job4;
CompletableFuture future = CompletableFuture.anyOf(futures);
System.out.println(future.get());
}