1. 程式人生 > >深入理解阻塞佇列(三)——LinkedBlockingQueue原始碼分析

深入理解阻塞佇列(三)——LinkedBlockingQueue原始碼分析

LinkedBlockingQueue是一個基於連結串列實現的可選容量的阻塞佇列。隊頭的元素是插入時間最長的,隊尾的元素是最新插入的。新的元素將會被插入到佇列的尾部。
LinkedBlockingQueue的容量限制是可選的,如果在初始化時沒有指定容量,那麼預設使用int的最大值作為佇列容量。

概述

類繼承關係

LinkedBlockingQueue的繼承關係如下圖:
LinkedBlockingQueue類繼承關係圖
可以參考深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析中ArrayBlockingQueue的類繼承關係圖,這兩個類的繼承關係是一樣的。

底層資料結構

LinkedBlockingQueue內部是使用連結串列實現一個佇列的,但是卻有別於一般的佇列,在於該佇列至少有一個節點,頭節點不含有元素。結構圖如下:
LinkedBlockingQueue底層資料結構


可以發現head.item=null,last.next=null。

原理

LinkedBlockingQueue中維持兩把鎖,一把鎖用於入隊,一把鎖用於出隊,這也就意味著,同一時刻,只能有一個執行緒執行入隊,其餘執行入隊的執行緒將會被阻塞;同時,可以有另一個執行緒執行出隊,其餘執行出隊的執行緒將會被阻塞。換句話說,雖然入隊和出隊兩個操作同時均只能有一個執行緒操作,但是可以一個入隊執行緒和一個出隊執行緒共同執行,也就意味著可能同時有兩個執行緒在操作佇列,那麼為了維持執行緒安全,LinkedBlockingQueue使用一個AtomicInterger型別的變量表示當前佇列中含有的元素個數,所以可以確保兩個執行緒之間操作底層佇列是執行緒安全的,這個在後面原始碼分析的時候會說明。

原始碼分析

重要欄位

LinkedBlockingQueue可以指定容量,內部維持一個佇列,所以有一個頭節點head和一個尾節點last,內部維持兩把鎖,一個用於入隊,一個用於出隊,還有鎖關聯的Condition物件。主要物件的定義如下:

    //容量,如果沒有指定,該值為Integer.MAX_VALUE;
    private final int capacity;

    //當前佇列中的元素
    private final AtomicInteger count = new AtomicInteger();

    //佇列頭節點,始終滿足head.item==null
transient Node<E> head; //佇列的尾節點,始終滿足last.next==null private transient Node<E> last; //用於出隊的鎖 private final ReentrantLock takeLock = new ReentrantLock(); //當佇列為空時,儲存執行出隊的執行緒 private final Condition notEmpty = takeLock.newCondition(); //用於入隊的鎖 private final ReentrantLock putLock = new ReentrantLock(); //當佇列滿時,儲存執行入隊的執行緒 private final Condition notFull = putLock.newCondition();

構造方法

LinkedBlockingQueue的構造方法有三個,分別如下:

 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }


    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//last和head在佇列為空時都存在,所以佇列中至少有一個節點
    }


    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

從上面的構造方法中可以得出3點結論:
1. 當呼叫無參的構造方法時,容量是int的最大值
2. 佇列中至少包含一個節點,哪怕佇列對外表現為空
3. LinkedBlockingQueue不支援null元素

put(E e)方法

put(E e)方法用於將一個元素插入到佇列的尾部,其實現如下:

public void put(E e) throws InterruptedException {
        //不允許元素為null
        if (e == null) throw new NullPointerException();
        int c = -1;
        //以當前元素新建一個節點
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //獲得入隊的鎖
        putLock.lockInterruptibly();
        try {

            //如果佇列已滿,那麼將該執行緒加入到Condition的等待佇列中
            while (count.get() == capacity) {
                notFull.await();
            }
            //將節點入隊
            enqueue(node);
            //得到插入之前佇列的元素個數
            c = count.getAndIncrement();
            //如果還可以插入元素,那麼釋放等待的入隊執行緒
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //解鎖
            putLock.unlock();
        }
        //通知出隊執行緒佇列非空
        if (c == 0)
            signalNotEmpty();
    }

從上面的程式碼分析中可以得出6點結論:
1. LinkedBlockingQueue不允許元素為null,這一點在構造方法中也說過了。
2. 同一時刻,只能有一個執行緒執行入隊操作,因為putLock在將元素插入到佇列尾部時加鎖了
3. 如果佇列滿了,那麼將會呼叫notFull的await()方法將該執行緒加入到Condition等待佇列中。await()方法就會釋放執行緒佔有的鎖,這將導致之前由於被鎖阻塞的入隊執行緒將會獲取到鎖,執行到while迴圈處,不過可能因為由於佇列仍舊是滿的,也被加入到條件佇列中。
4. 一旦一個出隊執行緒取走了一個元素,並通知了入隊等待佇列中可以釋放執行緒了,那麼第一個加入到Condition佇列中的將會被釋放,那麼該執行緒將會重新獲得put鎖,繼而執行enqueue()方法,將節點插入到佇列的尾部
5. 然後得到插入一個節點之前的元素個數,如果佇列中還有空間可以插入,那麼就通知notFull條件的等待佇列中的執行緒。
6. 通知出隊執行緒佇列為空了,因為插入一個元素之前的個數為0,而插入一個之後佇列中的元素就從無變成了有,就可以通知因佇列為空而阻塞的出隊執行緒了。

signalNotEmpty()方法只會在put/take之類的入隊方法中才會被呼叫,並且是當佇列元素從無到有的時候。下面是signalNotEmpty()方法的實現:

 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //獲取takeLock
        takeLock.lock();
        try {
            //釋放notEmpty條件佇列中的第一個等待執行緒
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

E take()方法

take()方法用於得到隊頭的元素,在佇列為空時會阻塞,知道佇列中有元素可取。其實現如下:

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //獲取takeLock鎖
        takeLock.lockInterruptibly();
        try {
            //如果佇列為空,那麼加入到notEmpty條件的等待佇列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //得到隊頭元素
            x = dequeue();
            //得到取走一個元素之前佇列的元素個數
            c = count.getAndDecrement();
            //如果佇列中還有資料可取,釋放notEmpty條件等待佇列中的第一個執行緒
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果佇列中的元素從滿到非滿,通知put執行緒
        if (c == capacity)
            signalNotFull();
        return x;
    }

上面的程式碼註釋已將說明了take()方法的整體路程,大體上與put()相對。
當佇列為空時,就加入到notEmpty(的條件等待佇列中,當佇列不為空時就取走一個元素,當取完發現還有元素可取時,再通知一下自己的夥伴(等待在條件佇列中的執行緒);最後,如果佇列從滿到非滿,通知一下put執行緒。
下面看一下dequeue()的刪除節點操作,其特別之處在於頭節點是一個哨兵節點。

private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

入隊、出隊總結

LinkedBlockingQueue中除了上面的put()方法之外,還有另外幾個入隊的方法,比如offer(E)、offer(E,long,TimeUnit);也有另外幾個出隊的方法,比如poll()、poll(long,TimeUnit),但實現和put()以及take()都大同小異,這兒就不再一一分析了。下面就兩個方法總結一下:
LinkedBlockingQueue是允許兩個執行緒同時在兩端進行入隊或出隊的操作的,但一端同時只能有一個執行緒進行操作,這是通過兩把鎖來區分的;為了維持底部資料的統一,引入了AtomicInteger的一個count變數,表示佇列中元素的個數。count只能在兩個地方變化,一個是入隊的方法(可以+1),另一個是出隊的方法(可以-1),而AtomicInteger是原子安全的,所以也就確保了底層佇列的資料同步。
另外,入隊、出隊執行緒之間還存在合作的關係,這個以入隊為例:當一群執行緒執行入隊操作時,一個執行緒A幸運地佔有了putLock鎖,然後也成功的插入了一個元素,但是插完這個元素就達到了佇列的容量了,當這個執行緒A釋放了鎖之後,前面一群執行緒中一個執行緒B又獲得了putLock鎖,但是由於佇列已經滿了,那麼執行緒B釋放了putLock鎖後被加入到了notEmpty條件的等待佇列中;由於釋放了鎖,執行緒C也搶到了鎖,但是很不幸,它也被加入到了等待佇列中,並且被加在了執行緒B的尾部;這時一個出隊執行緒出現了,它成功地取走了一個元素,使得佇列從滿變為了非滿狀態,並且呼叫signalNotFull()方法通知了notFull的等待佇列,這時執行緒B又重新獲得了鎖,插入了一個元素,插完一個元素,它發現還有容量可以插元素,它也沒有忘記了和它一起被困在條件佇列中的執行緒C,就呼叫了notFull.await()通知了執行緒C,這樣執行緒C也執行了插入元素的操作。出隊的過程與這個基本相同,就不再介紹了。由此可以看到,入隊的執行緒不止和出隊的執行緒協作,還和自己的難兄難弟,在條件佇列中等待的入隊執行緒協作;出隊的執行緒同樣不止和入隊的執行緒協作,還和另外的出隊執行緒協作。

remove()方法

remove()方法用於刪除佇列中一個元素,如果佇列中不含有該元素,那麼返回false;有的話則刪除並返回true。入隊和出隊都是隻獲取一個鎖,而remove()方法需要同時獲得兩把鎖,其實現如下:

 public boolean remove(Object o) {
        //因為佇列不包含null元素,返回false
        if (o == null) return false;
        //獲取兩把鎖
        fullyLock();
        try {
            //從頭的下一個節點開始遍歷
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //如果匹配,那麼將節點從佇列中移除,trail表示前驅節點
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //釋放兩把鎖
            fullyUnlock();
        }
    }

可以看到remove()方法中首先獲取兩把鎖,然後再執行遍歷刪除操作,最後釋放兩把鎖。下面先看一下是如何獲取和釋放兩把鎖的,其實現如下:

 /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

那麼問題來了,為什麼remove()方法同時需要兩把鎖?
remove()操作會從佇列的頭遍歷到尾,用到了佇列的兩端,所以需要對兩端加鎖,而對兩端加鎖就需要獲取兩把鎖;入隊和出隊均只在佇列的一端操作,所以只需獲取一把鎖。

size()方法

size()方法用於返回佇列中元素的個數,其實現如下:

public int size() {
        return count.get();
    }

由於count是一個AtomicInteger的變數,所以該方法是一個原子性的操作,是執行緒安全的。

總結

在上面分析LinkedBlockingQueue的原始碼之後,可以與ArrayBlockingQueue做一個比較。
相同點有如下2點:
1. 不允許元素為null
2. 執行緒安全的佇列

不同點有如下幾點:
1. ArrayBlockingQueue底層基於定長的陣列,所以容量限制了;LinkedBlockingQueue底層基於連結串列實現佇列,所以容量可選,如果不設定,那麼容量是int的最大值
2. ArrayBlockingQueue內部維持一把鎖和兩個條件,同一時刻只能有一個執行緒佇列的一端操作;LinkedBlockingQueue內部維持兩把鎖和兩個條件,同一時刻可以有兩個執行緒在佇列的兩端操作,但同一時刻只能有一個執行緒在一端操作。
3. LinkedBlockingQueue的remove()類似方法時,由於需要對整個佇列連結串列實現遍歷,所以需要獲取兩把鎖,對兩端加鎖。

下一篇部落格將分析LinkedBlockingDeque原始碼,敬請關注。