1. 程式人生 > >細粒度的執行緒控制?使用Lock Condition~

細粒度的執行緒控制?使用Lock Condition~

這裡舉一個栗子,我們對一個資源進行加鎖,可是又要進行細粒度的控制,該如何實現呢?

比如我們開了了個餐館。餐館有一個廚房,服務員可以通知廚房進行做菜,當前冰箱裡有菜時,廚房就會開始做菜,冰箱裡沒菜則會等待。

/**
 * Created by Anur IjuoKaruKas on 6/28/2018
 */
@SuppressWarnings("Duplicates")
public class Restaurant {

    private final Lock kitchen = new ReentrantLock();

    private ConcurrentLinkedDeque<String> meetFridge = new
ConcurrentLinkedDeque<>();// 肉冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("通知廚房做肉"); if (meetFridge.isEmpty()) { try
{ System.out.println("冰箱沒有肉了,等待冰箱有肉"); kitchen.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒"
+ meetNeedToCock); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("進貨了"); meetFridge.addLast("牛肉"); kitchen.notify(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockMeet()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); executorService.execute(restaurant.cockMeet()); } }

執行一下main方法,可以得到以下輸出:

通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
進貨了
正在炒牛肉
進貨了
正在炒牛肉
進貨了
通知廚房做肉
正在炒牛肉

到這裡是沒有什麼問題的。

進來了一個新需求,一個剛好可以用上Condition的新需求

現在我們既需要做肉,也需要做菜。
也就是說:
1、服務員通知了廚房,需要做一個肉和一個菜。這個時候廚房正好沒庫存,廚房進行了等待。
2、這時候某人去菜市場買了菜回來,廚房開始做菜。
3、過了一段時間
4、某人去菜市場買了肉回來,廚房開始做肉。

這樣的一個需求,當然用其他方式實現也是可以的,但如果使用 Condition來實現,它將變得異常簡單。
/**
 * Created by Anur IjuoKaruKas on 6/28/2018
 */
@SuppressWarnings("Duplicates")
public class Restaurant {

    private final Lock kitchen = new ReentrantLock();

    private final Condition waitMeet = kitchen.newCondition();

    private final Condition waitVege = kitchen.newCondition();

    private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱

    private ConcurrentLinkedDeque<String> vegeFridge = new ConcurrentLinkedDeque<>();// 菜冰箱

    public Runnable cockMeet() {
        return new Runnable() {

            @Override
            public void run() {
                kitchen.lock();
                try {
                    System.out.println("通知廚房做肉");
                    if (meetFridge.isEmpty()) {
                        try {
                            System.out.println("冰箱沒有肉了,等待冰箱有肉");
                            waitMeet.await();   // 直接呼叫condition的wait方法
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    String meetNeedToCock = meetFridge.getFirst();
                    System.out.println("正在炒" + meetNeedToCock);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    kitchen.unlock();
                }
            }
        };
    }

    public Runnable cockVege() {
        return new Runnable() {

            @Override
            public void run() {
                kitchen.lock();
                try {
                    System.out.println("通知廚房做菜");
                    if (vegeFridge.isEmpty()) {
                        try {
                            System.out.println("冰箱沒有菜了,等待冰箱有菜");
                            waitVege.await();   // 直接呼叫condition的wait方法
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    String meetNeedToCock = vegeFridge.getFirst();
                    System.out.println("正在炒" + meetNeedToCock);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    kitchen.unlock();
                }
            }
        };
    }

    public Runnable buySomething() {
        return new Runnable() {

            @Override
            public void run() {
                kitchen.lock();
                try {
                    Random random = new Random();

                    if (random.nextBoolean()) {
                        System.out.println("肉進貨了");
                        meetFridge.addLast("牛肉");
                        waitMeet.signal();
                    } else {
                        System.out.println("菜進貨了");
                        vegeFridge.addLast("苦瓜");
                        waitVege.signal();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    kitchen.unlock();
                }
            }
        };
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        Restaurant restaurant = new Restaurant();

        executorService.execute(restaurant.cockMeet());
        executorService.execute(restaurant.cockVege());

        executorService.execute(restaurant.buySomething());
    }
}

最後輸出:

通知廚房做肉
冰箱沒有肉了,等待冰箱有肉
通知廚房做菜
冰箱沒有菜了,等待冰箱有菜
肉進貨了
正在炒牛肉

可見我們可以針對情況對不同的行為進行通知,這就是condition的力量。

提高篇

這裡就不瞎扯場景了,直接上程式碼。

這是仿kafka BufferPool的一種思路,(當然沒kafka實現的那麼複雜),它的思路是使用一個佇列來管理等待的執行緒。
每次執行緒進來sout(),都進行等待
滿足一定的條件時,mission()會通知隊頭的一個執行緒進行操作。

/**
 * Created by Anur IjuoKaruKas on 6/25/2018
 */
public class Task {

    private Deque<Condition> waiters = new ArrayDeque<>();

    private Lock lock = new ReentrantLock();

    private Integer count = 0;

    private void sout(String str) {
        this.lock.lock();
        try {
            System.out.println("sout " + str + " get the lock");
            Condition condition = this.lock.newCondition();

            waiters.addLast(condition);
            condition.await();

            Condition conditionFromWaiters = waiters.removeFirst();

            if (conditionFromWaiters != condition) {
                System.out.println("???????");
            }

            System.out.println("Test Task: " + str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void mission() {
        this.lock.lock();
        try {
            System.out.println("mission get the lock");
            while (count < 10) {
                count++;
            }
            Condition condition = waiters.peekFirst();
            if (condition != null) {
                condition.signal();
            }
            count = 0;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        final Task task = new Task();

        for (int i = 0; i < 1000000; i++) {
            final int finalI = i;

            executorService.execute(new Runnable() {

                @Override
                public void run() {
                    task.sout(finalI + "");
                }
            });

            executorService.execute(new Runnable() {

                @Override
                public void run() {
                    task.mission();
                }
            });
        }
    }
}