【java併發】條件阻塞Condition的應用
Condition將Object監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意Lock實現組合使用,為每個物件提供多個等待 set(wait-set)。其中,Lock 替代了synchronized方法和語句的使用,Condition替代了Object監視器方法的使用。
1. Condition的基本使用 |
由於Condition可以用來替代wait、notify等方法,所以可以對比著之前寫過的執行緒間通訊的程式碼來看,再來看一下原來那個問題:
有兩個執行緒,子執行緒先執行10次,然後主執行緒執行5次,然後再切換到子執行緒執行10,再主執行緒執行5次……如此往返執行50次。
之前用wait和notify來實現的,現在用Condition來改寫一下,程式碼如下:
public class ConditionCommunication {
public static void main(String[] args) {
Business bussiness = new Business();
new Thread(new Runnable() {// 開啟一個子執行緒
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub(i);
}
}
}).start();
// main方法主執行緒
for (int i = 1; i <= 50; i++) {
bussiness.main(i);
}
}
}
class Business {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition(); //Condition是在具體的lock之上的
private boolean bShouldSub = true;
public void sub(int i) {
lock.lock();
try {
while (!bShouldSub) {
try {
condition.await(); //用condition來呼叫await方法
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = false;
condition.signal(); //用condition來發出喚醒訊號,喚醒某一個
} finally {
lock.unlock();
}
}
public void main(int i) {
lock.lock();
try {
while (bShouldSub) {
try {
condition.await(); //用condition來呼叫await方法
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("main thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = true;
condition.signal(); //用condition來發出喚醒訊號麼,喚醒某一個
} finally {
lock.unlock();
}
}
}
從程式碼來看,Condition的使用時和Lock一起的,沒有Lock就沒法使用Condition,因為Condition是通過Lock來new出來的,這種用法很簡單,只要掌握了synchronized和wait、notify的使用,完全可以掌握Lock和Condition的使用。
2. Condition的拔高 |
2.1 緩衝區的阻塞佇列
上面使用Lock和Condition來代替synchronized和Object監視器方法實現了兩個執行緒之間的通訊,現在再來寫個稍微高階點應用:模擬緩衝區的阻塞佇列。
什麼叫緩衝區呢?舉個例子,現在有很多人要發訊息,我是中轉站,我要幫別人把訊息發出去,那麼現在我 就需要做兩件事,一件事是接收使用者發過來的訊息,並按順序放到緩衝區,另一件事是從緩衝區中按順序取出使用者發過來的訊息,併發送出去。
現在把這個實際的問題抽象一下:緩衝區即一個數組,我們可以向陣列中寫入資料,也可以從陣列中把資料取走,我要做的兩件事就是開啟兩個執行緒,一個存資料,一個取資料。但是問題來了,如果緩衝區滿了,說明接收的訊息太多了,即傳送過來的訊息太快了,我另一個執行緒還來不及發完,導致現在緩衝區沒地方放了,那麼此時就得阻塞存資料這個執行緒,讓其等待;相反,如果我轉發的太快,現在緩衝區所有內容都被我發完了,還沒有使用者發新的訊息來,那麼此時就得阻塞取資料這個執行緒。
好了,分析完了這個緩衝區的阻塞佇列,下面就用Condition技術來實現一下:
class Buffer {
final Lock lock = new ReentrantLock(); //定義一個鎖
final Condition notFull = lock.newCondition(); //定義阻塞佇列滿了的Condition
final Condition notEmpty = lock.newCondition();//定義阻塞佇列空了的Condition
final Object[] items = new Object[10]; //為了下面模擬,設定阻塞佇列的大小為10,不要設太大
int putptr, takeptr, count; //陣列下標,用來標定位置的
//往佇列中存資料
public void put(Object x) throws InterruptedException {
lock.lock(); //上鎖
try {
while (count == items.length) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時無法存資料!");
notFull.await(); //如果佇列滿了,那麼阻塞存資料這個執行緒,等待被喚醒
}
//如果沒滿,按順序往陣列中存
items[putptr] = x;
if (++putptr == items.length) //這是到達陣列末端的判斷,如果到了,再回到始端
putptr = 0;
++count; //訊息數量
System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
notEmpty.signal(); //好了,現在佇列中有資料了,喚醒佇列空的那個執行緒,可以取資料啦
} finally {
lock.unlock(); //放鎖
}
}
//從佇列中取資料
public Object take() throws InterruptedException {
lock.lock(); //上鎖
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時無法取資料!");
notEmpty.await(); //如果佇列是空,那麼阻塞取資料這個執行緒,等待被喚醒
}
//如果沒空,按順序從陣列中取
Object x = items[takeptr];
if (++takeptr == items.length) //判斷是否到達末端,如果到了,再回到始端
takeptr = 0;
--count; //訊息數量
System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
notFull.signal(); //好了,現在佇列中有位置了,喚醒佇列滿的那個執行緒,可以存資料啦
return x;
} finally {
lock.unlock(); //放鎖
}
}
}
這個程式很經典,我從官方JDK文件中拿出來的,然後加了註釋。程式中定義了兩個Condition,分別針對兩個執行緒,等待和喚醒分別用不同的Condition來執行,思路很清晰,程式也很健壯。可以考慮一個問題,為啥要用兩個Codition呢?之所以這麼設計肯定是有原因的,如果用一個Condition,現在假設佇列滿了,但是有2個執行緒A和B同時存資料,那麼都進入了睡眠,好,現在另一個執行緒取走一個了,然後喚醒了其中一個執行緒A,那麼A可以存了,存完後,A又喚醒一個執行緒,如果B被喚醒了,那就出問題了,因為此時佇列是滿的,B不能存的,B存的話就會覆蓋原來還沒被取走的值,就因為使用了一個Condition,存和取都用這個Condition來睡眠和喚醒,就亂了套。到這裡,就能體會到這個Condition的用武之地了,現在來測試一下上面的阻塞佇列的效果:
public class BoundedBuffer {
public static void main(String[] args) {
Buffer buffer = new Buffer();
for(int i = 0; i < 5; i ++) { //開啟5個執行緒往緩衝區存資料
new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.put(new Random().nextInt(1000)); //隨機存資料
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 0; i < 10; i ++) { //開啟10個執行緒從緩衝區中取資料
new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.take(); //從緩衝區取資料
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
我故意只開啟5個執行緒存資料,10個執行緒取資料,就是想讓它出現取資料被阻塞的情況發生,看執行的結果:
Thread-5 被阻塞了,暫時無法取資料!
Thread-10 被阻塞了,暫時無法取資料!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暫時無法取資料!
Thread-11 被阻塞了,暫時無法取資料!
Thread-12 被阻塞了,暫時無法取資料!
Thread-10 被阻塞了,暫時無法取資料!
Thread-13 被阻塞了,暫時無法取資料!
從結果中可以看出,執行緒5和10搶先執行,發現佇列中沒有,於是就被阻塞了,睡在那了,直到佇列中有新的值存入才可以取,但是它們兩運氣不好,存的資料又被其他執行緒給搶先取走了,哈哈……可以多執行幾次。如果想要看到存資料被阻塞,可以將取資料的執行緒設定少一點,這裡我就不設了。
2.2 兩個以上執行緒之間的喚醒
還是原來那個題目,現在讓三個執行緒來執行,看一下題目:
有三個執行緒,子執行緒1先執行10次,然後子執行緒2執行10次,然後主執行緒執行5次,然後再切換到子執行緒1執行10次,子執行緒2執行10次,主執行緒執行5次……如此往返執行50次。
如過不用Condition,還真不好弄,但是用Condition來做的話,就非常方便了,原理很簡單,定義三個Condition,子執行緒1執行完喚醒子執行緒2,子執行緒2執行完喚醒主執行緒,主執行緒執行完喚醒子執行緒1。喚醒機制和上面那個緩衝區道理差不多,下面看看程式碼吧,很容易理解。
public class ThreeConditionCommunication {
public static void main(String[] args) {
Business bussiness = new Business();
new Thread(new Runnable() {// 開啟一個子執行緒
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub1(i);
}
}
}).start();
new Thread(new Runnable() {// 開啟另一個子執行緒
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub2(i);
}
}
}).start();
// main方法主執行緒
for (int i = 1; i <= 50; i++) {
bussiness.main(i);
}
}
static class Business {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition(); //Condition是在具體的lock之上的
Condition condition2 = lock.newCondition();
Condition conditionMain = lock.newCondition();
private int bShouldSub = 0;
public void sub1(int i) {
lock.lock();
try {
while (bShouldSub != 0) {
try {
condition1.await(); //用condition來呼叫await方法
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub1 thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 1;
condition2.signal(); //讓執行緒2執行
} finally {
lock.unlock();
}
}
public void sub2(int i) {
lock.lock();
try {
while (bShouldSub != 1) {
try {
condition2.await(); //用condition來呼叫await方法
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub2 thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 2;
conditionMain.signal(); //讓主執行緒執行
} finally {
lock.unlock();
}
}
public void main(int i) {
lock.lock();
try {
while (bShouldSub != 2) {
try {
conditionMain.await(); //用condition來呼叫await方法
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 5; j++) {
System.out.println("main thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 0;
condition1.signal(); //讓執行緒1執行
} finally {
lock.unlock();
}
}
}
}
相關推薦
【java併發】條件阻塞Condition的應用
Condition將Object監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意Lock實現組合使用,為每個物件提供多個等待 set(wait-set)。其中,Lock 替代了synchroniz
【Java併發】Lock 和 Condition
Lock 和 Condition Lock 介面 ReentrantLock ReentrantReadWriteLock Condition 參考 Lock 介面 在Lock接口出現之前,Java程式是靠synchro
Java多執行緒之執行緒併發庫條件阻塞Condition的應用
鎖(Lock/synchronized)只能實現互斥不能實現通訊,Condition的功能類似於在傳統的執行緒技術中的,Object.wait()和Object.notify()的功能,在等待Condition時,允許發生"虛假喚醒",這通常作為對基礎平臺語義的讓步,對於大多
【Java併發】Executor框架
Executor框架 Executor框架結構 ThreadPoolExecutor FixedThreadPool SingleThreadExecutor newCachedThreadPool Schedul
【Java併發】Java中的原子操作
Java中的原子操作 原子更新基本型別 原子更新陣列 原子更新引用型別 原子更新欄位類 參考 原子更新基本型別 一個生動的例子 public class AtomicIntegerExample { privat
【Java併發】Java中的執行緒池
Java中的執行緒池 執行流程 執行緒池的建立 提交任務 關閉執行緒池 參考 執行流程 處理流程如下: execute()方法執行示意圖如下: 執行緒池的建立 corePoolSize:執行緒池
【Java併發】CountDownLatch、CyclicBarrier、Semaphore、Exchanger
CountDownLatch、CyclicBarrier、Semaphore、Exchanger CountDownLatch CyclicBarrier Semaphore Exchanger 參考 CountDownLa
【Java併發】Daemon、Interrupt、Wait/Notify、Join、ThreadLocal、Pipe
Daemon、Interrupt、Wait/Notify、Join、ThreadLocal、Pipe 關於執行緒 Daemon Interrupt Wait/Notify Join ThreadLocal Pipe 參考
【Java併發】volatile 和 final
volatile 和 final 重排序 資料依賴性 as-if-serial語義 重排序對多執行緒的影響 happens - before 規則 volatile記憶體語義 final記憶體語義 參考
【Java併發】synchronized之偏向鎖和輕量級鎖
synchronized之偏向鎖和輕量級鎖 上下文切換 synchronized 鎖的升級與對比 偏向鎖 輕量級鎖 參考 上下文切換 即使是單核處理器也支援多執行緒執行程式碼執行程式碼,CPU通
【Java併發】二、JVM記憶體模型
JVM記憶體模型 什麼是Java記憶體模型 Java記憶體模型即Java Memory Model,簡稱JMM。JMM定義了Java 虛擬機器(JVM)在計算機記憶體(RAM)中的工作方式。JVM是整個計算機虛擬模型,所以JMM是隸屬於JVM的。 想要理解Ja
【java併發】執行緒併發庫的使用
1. 執行緒池的概念 在java5之後,就有了執行緒池的功能了,在介紹執行緒池之前,先來簡單看一下執行緒池的概念。假設我開了家諮詢公司,那麼每天會有很多人過來諮詢問題,如果我一個個接待的話,必
【Java併發】JUC—ReentrantReadWriteLock有坑,小心讀鎖!
好長一段時間前,某些場景需要JUC的讀寫鎖,但在某個時刻內讀寫執行緒都報超時預警(長時間無響應),看起來像是鎖競爭過程中出現死鎖(我猜)。經過排查專案並沒有能造成死鎖的可疑之處,因為業務程式碼並不複雜(僅僅是一個計算過程),經幾番折騰,把注意力轉移到JDK原始碼,正文詳細說下ReentrantReadWrit
【Java併發】生產者-消費者模式簡單實現(模擬訊息佇列)
簡單的模擬了一個訊息佇列 Producer:生產者 Consumer:消費者 Message:訊息體 import java.util.concurrent.ArrayBlockingQueue; import java.util.c
【java併發】多個執行緒間共享資料
先看一個多執行緒間共享資料的問題: 設計四個執行緒,其中兩個執行緒每次對data增加1,另外兩個執行緒每次對data減少1。 從問題來看,很明顯涉及到了執行緒間通資料的共享,四個執行
【java併發】造成HashMap非執行緒安全的原因
0. 寫在前面 在前面我的一篇總結執行緒範圍內共享資料文章中提到,為了資料能線上程範圍內使用,我用了HashMap來儲存不同執行緒中的資料,key為當前執行緒,value為當前執行緒中的資料。
【Java併發】- ReentrantLock,重入鎖
ReentrantLock概述 ReentrantLock(重入鎖),顧名思義是可以重新進入的鎖,也就是說當一個執行緒獲取了這個鎖之後還能在釋放鎖之前再次獲取這個鎖,同時還支援公平所和非公平鎖的選擇。 ReentrantLock的重入 重入例子
【Java併發程式設計】之二十:併發新特性—Lock鎖和條件變數(含程式碼)
簡單使用Lock鎖 Java 5中引入了新的鎖機制——java.util.concurrent.locks中的顯式的互斥鎖:Lock介面,它提供了比synchronized更加廣泛的鎖定操作。Lock介面有3個實現它的類:ReentrantLock、Reetrant
【死磕Java併發】-----J.U.C之AQS:阻塞和喚醒執行緒
此篇部落格所有原始碼均來自JDK 1.8 線上程獲取同步狀態時如果獲取失敗,則加入CLH同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued(): if (sho
【死磕Java併發】-----J.U.C之阻塞佇列:ArrayBlockingQueue
ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用FIFO的原則對元素進行排序新增的。 ArrayBlockingQueue為有界且固定,其大小在構造時由建構函式來決定,確認之後就不能再改變了。ArrayBlockingQueu