細粒度的執行緒控制?使用Lock Condition~
阿新 • • 發佈:2019-02-01
這裡舉一個栗子,我們對一個資源進行加鎖,可是又要進行細粒度的控制,該如何實現呢?
比如我們開了了個餐館。餐館有一個廚房,服務員可以通知廚房進行做菜,當前冰箱裡有菜時,廚房就會開始做菜,冰箱裡沒菜則會等待。
/**
* Created by Anur IjuoKaruKas on 6/28/2018
*/
@SuppressWarnings("Duplicates")
public class Restaurant {
private final Lock kitchen = new ReentrantLock();
private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱
public Runnable cockMeet() {
return new Runnable() {
@Override
public void run() {
synchronized (kitchen) {
System.out.println("通知廚房做肉");
if (meetFridge.isEmpty()) {
try {
System.out.println("冰箱沒有肉了,等待冰箱有肉");
kitchen.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String meetNeedToCock = meetFridge.getFirst();
System.out.println("正在炒" + meetNeedToCock);
}
}
};
}
public Runnable buySomething() {
return new Runnable() {
@Override
public void run() {
synchronized (kitchen) {
System.out.println("進貨了");
meetFridge.addLast("牛肉");
kitchen.notify();
}
}
};
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Restaurant restaurant = new Restaurant();
executorService.execute(restaurant.cockMeet());
executorService.execute(restaurant.cockMeet());
Thread.sleep(1000);
executorService.execute(restaurant.buySomething());
Thread.sleep(1000);
executorService.execute(restaurant.buySomething());
Thread.sleep(1000);
executorService.execute(restaurant.buySomething());
executorService.execute(restaurant.cockMeet());
}
}
執行一下main方法,可以得到以下輸出:
通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
進貨了
正在炒牛肉
進貨了
正在炒牛肉
進貨了
通知廚房做肉
正在炒牛肉
到這裡是沒有什麼問題的。
進來了一個新需求,一個剛好可以用上Condition的新需求
現在我們既需要做肉,也需要做菜。
也就是說:
1、服務員通知了廚房,需要做一個肉和一個菜。這個時候廚房正好沒庫存,廚房進行了等待。
2、這時候某人去菜市場買了菜回來,廚房開始做菜。
3、過了一段時間
4、某人去菜市場買了肉回來,廚房開始做肉。
這樣的一個需求,當然用其他方式實現也是可以的,但如果使用 Condition來實現,它將變得異常簡單。
/**
* Created by Anur IjuoKaruKas on 6/28/2018
*/
@SuppressWarnings("Duplicates")
public class Restaurant {
private final Lock kitchen = new ReentrantLock();
private final Condition waitMeet = kitchen.newCondition();
private final Condition waitVege = kitchen.newCondition();
private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱
private ConcurrentLinkedDeque<String> vegeFridge = new ConcurrentLinkedDeque<>();// 菜冰箱
public Runnable cockMeet() {
return new Runnable() {
@Override
public void run() {
kitchen.lock();
try {
System.out.println("通知廚房做肉");
if (meetFridge.isEmpty()) {
try {
System.out.println("冰箱沒有肉了,等待冰箱有肉");
waitMeet.await(); // 直接呼叫condition的wait方法
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String meetNeedToCock = meetFridge.getFirst();
System.out.println("正在炒" + meetNeedToCock);
} catch (Exception e) {
e.printStackTrace();
} finally {
kitchen.unlock();
}
}
};
}
public Runnable cockVege() {
return new Runnable() {
@Override
public void run() {
kitchen.lock();
try {
System.out.println("通知廚房做菜");
if (vegeFridge.isEmpty()) {
try {
System.out.println("冰箱沒有菜了,等待冰箱有菜");
waitVege.await(); // 直接呼叫condition的wait方法
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String meetNeedToCock = vegeFridge.getFirst();
System.out.println("正在炒" + meetNeedToCock);
} catch (Exception e) {
e.printStackTrace();
} finally {
kitchen.unlock();
}
}
};
}
public Runnable buySomething() {
return new Runnable() {
@Override
public void run() {
kitchen.lock();
try {
Random random = new Random();
if (random.nextBoolean()) {
System.out.println("肉進貨了");
meetFridge.addLast("牛肉");
waitMeet.signal();
} else {
System.out.println("菜進貨了");
vegeFridge.addLast("苦瓜");
waitVege.signal();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kitchen.unlock();
}
}
};
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Restaurant restaurant = new Restaurant();
executorService.execute(restaurant.cockMeet());
executorService.execute(restaurant.cockVege());
executorService.execute(restaurant.buySomething());
}
}
最後輸出:
通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
通知廚房做菜
冰箱沒有菜了,等待冰箱有菜
肉進貨了
正在炒牛肉
可見我們可以針對情況對不同的行為進行通知,這就是condition的力量。
提高篇
這裡就不瞎扯場景了,直接上程式碼。
這是仿kafka BufferPool的一種思路,(當然沒kafka實現的那麼複雜),它的思路是使用一個佇列來管理等待的執行緒。
每次執行緒進來sout(),都進行等待
滿足一定的條件時,mission()會通知隊頭的一個執行緒進行操作。
/**
* Created by Anur IjuoKaruKas on 6/25/2018
*/
public class Task {
private Deque<Condition> waiters = new ArrayDeque<>();
private Lock lock = new ReentrantLock();
private Integer count = 0;
private void sout(String str) {
this.lock.lock();
try {
System.out.println("sout " + str + " get the lock");
Condition condition = this.lock.newCondition();
waiters.addLast(condition);
condition.await();
Condition conditionFromWaiters = waiters.removeFirst();
if (conditionFromWaiters != condition) {
System.out.println("???????");
}
System.out.println("Test Task: " + str);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void mission() {
this.lock.lock();
try {
System.out.println("mission get the lock");
while (count < 10) {
count++;
}
Condition condition = waiters.peekFirst();
if (condition != null) {
condition.signal();
}
count = 0;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final Task task = new Task();
for (int i = 0; i < 1000000; i++) {
final int finalI = i;
executorService.execute(new Runnable() {
@Override
public void run() {
task.sout(finalI + "");
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
task.mission();
}
});
}
}
}