1. 程式人生 > >java阻塞隊列之LinkedBlockingQueue

java阻塞隊列之LinkedBlockingQueue

nbsp order pointer prot works 成功 try 成對出現 lin

LinkedBlockingQueue是BlockingQueue中的其中一個,其實現方式為單向鏈表,下面看其具體實現。(均為JDK8)

一、構造函數

在LinkedBlockingQueue中有三個構造函數,如下圖,

技術分享圖片

1、LinkedBlockingQueue()

這是一個無參的構造函數,其定義如下,

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

在這個構造函數中調用了有參的構造函數,傳入的整型值為Integer所能表示的最大值(0x7fffffff)。下面看這個帶參數的構造方法。

2、LinkedBlockingQueue(int capacity)

這是一個整型參數的構造方法其,定義如下,

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

從其實現上來看,其整型參數代表此隊列的容量,即元素的最大個數;然後初始化了last和head兩個變量,我們猜last應該是此隊列的尾元素、head為此隊列的頭元素。這裏有一個Node類,下面看Node類的實現,

static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item 
= x; } }

Node是LinkedBlockingQueue的靜態內部類,也是組成此隊列的節點元素,其內部有兩個屬性,一個Item代表節點元素,next指向下一個Node節點,這樣就可以得出LinkedBlockingQueue的結構是使用Node連接起來,頭節點為head,尾節點為last。

技術分享圖片

3、LinkedBlockingQueue(Collection<? extends E> c)

此構造方法是使用一個集合類進行構造,其定義如下

public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

4、LinkedBlockingQueue的屬性

從上面的構造函數中可以大體了解其屬性有容量(capacity),隊列中的元素數量(count)、頭節點(head)、尾節點(last)等,如下

/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

使用原子類AtomicInteger的count表示隊列中元素的個數,可以很好的處理並發,AtomicInteger底層大都是使用樂觀鎖進行操作,多線程下是安全的。

關註下takeLock、putLock這兩個屬性,這裏定義了兩把鎖,一把take鎖,另一把put鎖。通過兩把可重入鎖實現並發,與ArrayBlockingQueue的一把鎖相比。

二、隊列的操作

需要使用阻塞隊列,那麽就需要向隊列中添加或取出元素,在LinkedBlockingQueue中已經實現了相關操作,對於添加/取出均是成對出現,提供的方法中有拋出異常、返回false、線程阻塞等幾種情形。

1、put/take

put/take是一對互斥操作,put向隊列中添加元素,take從隊列中取出元素。

1.1、put

put方法定義如下,

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        //獲得添加鎖,
        final ReentrantLock putLock = this.putLock;
        //獲得當前隊列中的元素數量
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            //如果當前隊列中元素的數量等於隊列的容量,則阻塞當前線程,
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            //當前線程中元素數量增1,返回操作前的數量
            c = count.getAndIncrement();
            //c+1其實是當前隊列中元素的數量,如果比容量小,則喚醒notFull的操作,即可以進行繼續添加,執行put等添加操作。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
//說明在執行enqueue前的數量為0,執行完enqueue後數量為1,則需要喚醒取進程。
if (c == 0) signalNotEmpty(); }

此方法的執行步驟大體如下,

  • 判斷要put的元素e是否為null,如果為null直接拋出空指針異常;
  • e不為null,則使用e創建一個Node節點,獲得put鎖;
  • 判斷當前隊列中的元素數量和隊列的容量,如果相等,則阻塞當前線程;
  • 如果不相等,把生成的node節點插入隊列,enqueue方法定義如下,
private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
  • 使用原子操作類把當前隊列中的元素數量增1;如果添加後的隊列中的元素數量比容量小,則表示可以繼續執行put類的操作,喚醒notFull.singal();
  • 如果c=0,即在enqueue前為空,數量為0(此時會阻塞take進程),enqueue後為1,則需要喚醒take進程,如下
private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

1.2、take

take方法和put剛好相反,其定義如下,

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //獲得當前隊列的元素數量
        final AtomicInteger count = this.count;
        //獲得take鎖
        final ReentrantLock takeLock = this.takeLock;
        //執行take操作
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();//阻塞當前線程
            }
            x = dequeue();
            //當前隊列的數量減1,返回操作前的數量
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
//當前隊列中元素數量為capacity-1,即未滿,可以調用put方法,需要喚醒阻塞在put鎖上的線程
if (c == capacity) signalNotFull(); return x; }

此方法的執行步驟大體如下,

  • 獲得take鎖,表示執行take操作;
  • 獲得當前隊列的元素數量,如果數量為0,則阻塞當前線程,直到被中斷或者被喚醒;
  • 如果當前隊列的元素數量不額外i0,則執行出隊操作;
private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;//head賦值給h
        Node<E> first = h.next;//相當於第二個節點賦值給first
        h.next = h; // help GC
        head = first;//頭節點指向第二個節點
        E x = first.item;
        first.item = null;
        return x;
    }

從上面的代碼可以看出把頭節點進行出隊,即head指向下一個節點

  • 當前隊列的元素數量減一,並返回操作前的數量;
  • 如果之前大於1(c最小為2),指向dequeue後數量最小為1,證明隊列中仍有元素,需要喚醒獲得take鎖的其他阻塞線程,take.singal();
  • 如果c等於當前隊列的容量(執行完dequeue後,當前隊列中元素的數量等於capacity-1,則未滿),則需要喚醒獲得put鎖的其他put線程;
private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
//喚醒阻塞在put鎖的其他線程 notFull.signal(); }
finally { putLock.unlock(); } }

2、offer/poll

2.1、offer方法

offer方法的定義如下,

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

此方法的執行步驟大體如下,

  • 判斷當前隊列中的元素數量和隊列容量,如果相等,直接返回false;
  • 如果當前隊列中元素數量小於隊列容量,執行入隊操作;
  • 入隊操作之後,判斷隊列中元素數量如果仍小於隊列容量,喚醒其他的阻塞線程;
  • 如果c==0(即入隊成功,隊列中元素的數量為1),則需要喚醒阻塞在put鎖的線程;

2.2、poll

poll方法定義如下,

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

此方法的執行步驟大體如下,

  • 如果當前隊列元素數量為0,直接返回null;
  • 如果當前隊列元素數量大於0,執行出隊操作;
  • 如果c>1,即c最小為2,則出隊成功後,仍有1個元素,可以喚醒阻塞在take鎖的線程;
  • 如果c=capacity,則出隊成功後,隊列中的元素為capacity-1,這時隊列為滿,可以喚醒阻塞在put鎖上的其他線程,即可以添加元素;

3、offer(E e, long timeout, TimeUnit unit)/poll(long timeout, TimeUnit unit)

3.1、offer(E e, long timeout, TimeUnit unit)

在規定的時間內阻塞線程,其定義如下

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

此方法的執行步驟大體如下,

  • 如果當前隊列中元素的數量等於隊列的容量,則在規定時間內會一直阻塞,如果超過了規定時間則直接返回false;
  • 如果在規定時間內當前隊列中元素的數量不等於隊列容量,則跳出了while循環,則執行入隊操作;
  • 判斷入隊前隊列中元素的數量,如果小於隊列容量,則喚醒其他put鎖的阻塞線程;如果等於0,則入隊後元素數量大於0,則喚醒take鎖阻塞的線程;

3.2、poll(long timeout, TimeUnit unit)

其方法定義如下,

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

此方法的執行步驟大體如下,

  • 如果當前隊列中的元素數量為0,則在規定的時間內阻塞線程;如果超過了規定時間直接返回null;
  • 執行出隊操作,出隊成功後,判斷出隊前隊列中元素的數量,如果大於1(最小為2),則喚醒其他阻塞在take鎖上的線程;
  • 如果出隊前隊列中元素數量等於隊列容量,則出隊後隊列中元素數量為capacity-1,則喚醒阻塞在put鎖上的線程;

三、方法比較

3.1、添加方法比較

序號 方法名 隊列滿時處理方式 方法返回值
1 offer(E e) 返回false boolean
2 put(E e) 線程阻塞,直到中斷或被喚醒 void
3 offer(E e, long timeout, TimeUnit unit) 在規定時間內重試,超過規定時間返回false boolean

3.2、取出方法比較

序號 方法名 隊列空時處理方式 方法返回值
1 poll() 返回null E
2 take() 線程阻塞,直到中斷或被喚醒 E
3 poll(long timeout, TimeUnit unit) 在規定時間內重試,超過規定時間返回null E

四、總結

以上是關於LinkedBlockingQueue隊列的相關實現及方法介紹,此隊列使用單向鏈表為載體,配合put/take鎖實現生產線程和消費線程共享數據。LinkedBlockingQueue作為共享的數據池,實現並發環境下的添加及取出方法。

有不正之處歡迎指正,感謝!

java阻塞隊列之LinkedBlockingQueue