1. 程式人生 > 程式設計 >手寫JDK元件之阻塞佇列BlockedQueue

手寫JDK元件之阻塞佇列BlockedQueue

研究了一段時間框架,有點審美疲勞,今天講點輕鬆的,手寫一個阻塞佇列,實踐一把lock+condition。

“等待通知”機制

首先複習一下經典的 “等待通知”機制。

執行緒首先獲取互斥鎖,當執行緒要求的條件不滿足時,釋放互斥鎖,進入等待狀態;當要求的條件滿足時,通知等待的執行緒,重新獲取互斥鎖 --《極客時間-Java併發程式設計實戰》

在Java中實現 “等待通知” 機制一般有兩種方式,synchronized/Lock+Condition。

通過synchronized實現 “等待-通知” 機制

synchronized同步原語(或稱:管程)配合wait()、notify()、notifyAll()就可以實現“等待通知”機制。

機理是怎樣的呢?

當使用synchronized管程對某一塊臨界區進行加鎖,同一時刻,只能允許一個執行緒進入synchronized保護的臨界區中。

當該遠端進入臨界區之後,其他的執行緒如果來訪問臨界區就需要進入等待佇列中進行等待。

這裡要注意,等待佇列與鎖是一一對應關係,每個互斥鎖都有自己的獨立的等待佇列。

Java物件的wait()方法就能夠讓執行緒進入等待狀態,此時執行緒被阻塞。

當執行緒進入等待佇列時,會釋放當前持有的互斥鎖。當它釋放鎖之後,其他的執行緒就有機會獲得該互斥鎖並進入臨界區。

那如何通知滿足條件的執行緒呢?

通過Java物件的notify()和notifyAll()方法就能夠實現。當條件滿足時呼叫notify(),會通知等待佇列中的執行緒,通知它 條件曾經滿足過

notify()只能保證在通知的那一時間點,條件是滿足的。也就是,有可能被通知執行緒執行的時間點與通知的時間點是不相等的;即:執行緒執行的時候,條件已經不滿足了(可能有其他的執行緒滿足了該條件而插隊)

另外,就算執行緒被通知而喚醒,在進入臨界區前依舊需要獲取互斥鎖,因為這把需要獲取的鎖在呼叫wait()的時候已經被釋放了。

需要注意的是

wait()、notify()、notifyAll()被呼叫的前提是獲取到了響應的互斥鎖,也就是呼叫這三個方法的位置都是在 synchronized{} 內部。如果呼叫的位置在synchronized外部或者不是使用同一把互斥鎖,JVM會丟擲 java.lang.IllegalMonitorStateException

異常。

關於synchronized實現 “等待-通知” 機制我們就講到這裡。

通過Lock+Condition實現 “等待-通知” 機制與synchronized類似,我們本文實現阻塞佇列BlockedQueue的方式就是通過Lock+Condition實現。

Lock+Condition原理講解

Condition 定義了等待/通知兩種型別的方法:await()/signal()/signalAll()。執行緒呼叫這些方法之前需要獲取Condition關聯的鎖。

Condition物件是由Lock物件通過newCondition()方法建立的,也就是說,Condition是依賴Lock物件的。

類比上文中講到的synchronized實現 “等待-通知” 機制,Lock/Condition涉及到的方法與synchronized方式涉及到的方法的語義是一一對應的,具體如下表:

實現阻塞佇列BlockedQueue

瞭解並複習了 管程中的“等待/通知機制”,我們開始實現阻塞佇列BlockedQueue。

在編寫過程中參考了JUC中的ArrayBlockingQueue原始碼實現。

    public class BlockedQueue<T> {複製程式碼
        final Lock lock = new ReentrantLock();
        // 條件變數:佇列不滿
        final Condition notFull = lock.newCondition();
        // 條件變數:佇列不空
        final Condition notEmpty = lock.newCondition();複製程式碼
        // 阻塞單列最大長度
        int capacity = 0;複製程式碼
        // 當前已經存在下標:入隊
        int putIndex = 0;複製程式碼
        // 當前已經存在下標:出隊
        int takeIndex = 0;複製程式碼
        // 元素總數
        int elementsSize = 0;複製程式碼
        // 元素陣列
        Object[] items;複製程式碼
        // 構造方法
        public BlockedQueue(int capacity) {
            this.capacity = capacity;
            items = new Object[capacity];
            System.out.println("capacity=" + capacity + ",items.size=" + items.length);
        }複製程式碼

這段程式碼中我們宣告瞭阻塞佇列,支援泛型。宣告瞭需要的成員變數以及有參構造方法。構造方法中根據外界輸入的佇列最大長度初始化了內部的元素陣列。

提前宣告並初始化了Lock(實現方式為ReentrantLock可重入鎖),並在Lock基礎上初始化了兩個Condition條件變數,分別標記佇列不滿、佇列不空。

    // 入隊
    void enq(T x) {
        lock.lock();
        try {
            // 佇列已滿
            while (items.length == elementsSize) {
                // 等待佇列不滿
                notFull.await();
            }
            // 入隊操作...
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            ++elementsSize;
            // 入隊後,通知可出隊
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.out.println(x.toString() + "--入隊完成");
        }
    }複製程式碼

這段程式碼為入隊邏輯。

首先獲取可重入鎖,如果加鎖成功則進入臨界區邏輯,否則嘗試解鎖。

當佇列已經滿時,則進入阻塞狀態,等待佇列不滿。

如果佇列不滿則進行入隊,當前下標的元素即為要入隊的元素,元素總長度增1。

    // 出隊
    T deq() {
        lock.lock();
        T x = null;
        try {
            // 佇列已空
            while (items.length == 0) {
                // 等待佇列不空
                notEmpty.await();
            }
            // 出隊操作...
            x = (T) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            elementsSize--;
            // 出隊後,通知可入隊
            notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return x;
    }複製程式碼

這段程式碼為出隊邏輯。

首先獲取可重入鎖,如果加鎖成功則進入臨界區邏輯,否則嘗試解鎖。

當佇列已經空,則進入阻塞狀態,等待佇列不空。

如果佇列不空則進行出隊操作,先暫存當前下標的元素,並將當前下標的元素標記為空(NULL);元素總長度減1,解鎖後返回當前已經出隊的元素。

    public T get(int index) {
        return (T) items[index];
    }複製程式碼

這段程式碼為獲取對應下標的元素,如果元素不存在則返回空。

測試阻塞佇列:單執行緒操作

開發完基本邏輯之後,我們寫一個demo來測試一下BlockedQueue。

    public static void main(String[] args) {
        BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
        for (int i = 0; i < 20; i++) {
            blockedQueue.enq("snowalker:" + i);
        }複製程式碼
        System.out.println("入隊結束:-------------------------");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }複製程式碼
        for (int i = 0; i < 20; i++) {
            blockedQueue.deq();
        }
        System.out.println("出隊結束:-------------------------");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }複製程式碼
    }複製程式碼

邏輯很好理解,我們構造了一個BlockedQueue,添加了20個元素進行入隊。入隊之後遍歷元素,檢視入隊結果。

接著進行20次出隊,並遍歷出隊後的結果。

執行結果如下:

    capacity=20,items.size=20
    入隊結束:-------------------------
    snowalker:0
    snowalker:1
    snowalker:2
    snowalker:3
    snowalker:4
    snowalker:5
    snowalker:6
    snowalker:7
    snowalker:8
    snowalker:9
    snowalker:10
    snowalker:11
    snowalker:12
    snowalker:13
    snowalker:14
    snowalker:15
    snowalker:16
    snowalker:17
    snowalker:18
    snowalker:19
    出隊結束:-------------------------
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null複製程式碼

可以看到,進行了20次入隊之後元素共有20個;

進行了20次出隊操作之後,元素全部為空,表示出隊成功。

測試阻塞佇列:多執行緒操作

我們接著測試一下多執行緒併發操作下,BlockedQueue的表現。

        BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(2);複製程式碼
        Thread thread0 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("執行緒0準備完畢");
                    for (int i = 0; i < 10; i++) {
                        blockedQueue.enq("執行緒0-snowalker-" + i);
                    }
                    System.out.println("執行緒0入隊結束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    end.countDown();
                }
            }
        });複製程式碼
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("執行緒1準備完畢");
                    for (int i = 10; i < 20; i++) {
                        blockedQueue.enq("執行緒1-snowalker-" + i);
                    }
                    System.out.println("執行緒1入隊結束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    end.countDown();
                }
            }
        });複製程式碼
        thread0.start();
        thread1.start();
        begin.countDown();
        end.await();
        System.out.println("主執行緒準備完畢!");
        System.out.println("主執行緒遍歷開始!");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }
        System.out.println("Bingo!");
    }複製程式碼

我們定義了兩個執行緒,每個執行緒新增10個元素,通過閉鎖CountDownLatch進行併發新增,新增完成之後遍歷新增結果。列印如下:

    capacity=20,items.size=20
    執行緒0準備完畢
    執行緒1準備完畢
    執行緒0-snowalker-0--入隊完成
    執行緒1-snowalker-10--入隊完成
    執行緒0-snowalker-1--入隊完成
    執行緒1-snowalker-11--入隊完成
    執行緒0-snowalker-2--入隊完成
    執行緒1-snowalker-12--入隊完成
    執行緒0-snowalker-3--入隊完成
    執行緒1-snowalker-13--入隊完成
    執行緒0-snowalker-4--入隊完成
    執行緒1-snowalker-14--入隊完成
    執行緒0-snowalker-5--入隊完成
    執行緒1-snowalker-15--入隊完成
    執行緒1-snowalker-16--入隊完成
    執行緒1-snowalker-17--入隊完成
    執行緒1-snowalker-18--入隊完成
    執行緒0-snowalker-6--入隊完成
    執行緒1-snowalker-19--入隊完成
    執行緒1入隊結束:-------------------------
    執行緒0-snowalker-7--入隊完成
    執行緒0-snowalker-8--入隊完成
    執行緒0-snowalker-9--入隊完成
    執行緒0入隊結束:-------------------------
    主執行緒準備完畢!
    主執行緒遍歷開始!
    執行緒0-snowalker-0
    執行緒1-snowalker-10
    執行緒0-snowalker-1
    執行緒1-snowalker-11
    執行緒0-snowalker-2
    執行緒1-snowalker-12
    執行緒0-snowalker-3
    執行緒1-snowalker-13
    執行緒0-snowalker-4
    執行緒1-snowalker-14
    執行緒0-snowalker-5
    執行緒1-snowalker-15
    執行緒0-snowalker-6
    執行緒1-snowalker-16
    執行緒1-snowalker-17
    執行緒1-snowalker-18
    執行緒1-snowalker-19
    執行緒0-snowalker-7
    執行緒0-snowalker-8
    執行緒0-snowalker-9
    Bingo!複製程式碼

可以看到結果符合預期,我們接著測試一下併發出隊,接著上面的新增結果進行併發出隊操作。

        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch dequeue = new CountDownLatch(2);
        for (int i = 0; i < 20; i++) {
            blockedQueue.enq("snowalker:" + i);
        }複製程式碼
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("執行緒2準備完畢");
                    for (int i = 0; i <= 10; i++) {
                        blockedQueue.deq();
                    }
                    System.out.println("執行緒2出隊結束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    dequeue.countDown();
                }
            }
        });複製程式碼
        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("執行緒3準備完畢");
                    for (int i = 0; i <= 10; i++) {
                        blockedQueue.deq();
                    }
                    System.out.println("執行緒3出隊結束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    dequeue.countDown();
                }
            }
        });複製程式碼
        thread2.start();
        thread3.start();
        begin.countDown();
        dequeue.await();
        System.out.println("主執行緒準備完畢!");
        System.out.println("主執行緒遍歷開始!");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }
        System.out.println("Bingo!");
    }複製程式碼

我們準備了20個元素入隊,然後併發進行出隊,等待兩個執行緒出隊完成之後,在主執行緒進行佇列元素的遍歷操作,結果如下:

    capacity=20,items.size=20
    snowalker:0--入隊完成
    snowalker:1--入隊完成
    snowalker:2--入隊完成
    snowalker:3--入隊完成
    snowalker:4--入隊完成
    snowalker:5--入隊完成
    snowalker:6--入隊完成
    snowalker:7--入隊完成
    snowalker:8--入隊完成
    snowalker:9--入隊完成
    snowalker:10--入隊完成
    snowalker:11--入隊完成
    snowalker:12--入隊完成
    snowalker:13--入隊完成
    snowalker:14--入隊完成
    snowalker:15--入隊完成
    snowalker:16--入隊完成
    snowalker:17--入隊完成
    snowalker:18--入隊完成
    snowalker:19--入隊完成
    執行緒2準備完畢
    執行緒2出隊結束:-------------------------
    執行緒3準備完畢
    執行緒3出隊結束:-------------------------
    主執行緒準備完畢!
    主執行緒遍歷開始!
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    Bingo!複製程式碼

結果如上圖所示,可以看到併發出隊結果滿足預期。

小結

本文我們利用JUC中的Lock+Condition管程實現了自定義BlockedQueue阻塞佇列的開發,並通過測試用例測試了併發條件下的出隊入隊,結果符合預期。


版權宣告:
原創不易,洗文可恥。除非註明,本博文章均為原創,轉載請以連結形式標明本文地址。