java阻塞隊列之LinkedBlockingQueue
LinkedBlockingQueue是BlockingQueue中的其中一個,其實現方式為單向鏈表,下面看其具體實現。(均為JDK8)
一、構造函數
在LinkedBlockingQueue中有三個構造函數,如下圖,
1、LinkedBlockingQueue()
這是一個無參的構造函數,其定義如下,
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
在這個構造函數中調用了有參的構造函數,傳入的整型值為Integer所能表示的最大值(0x7fffffff)。下面看這個帶參數的構造方法。
2、LinkedBlockingQueue(int capacity)
這是一個整型參數的構造方法其,定義如下,
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
從其實現上來看,其整型參數代表此隊列的容量,即元素的最大個數;然後初始化了last和head兩個變量,我們猜last應該是此隊列的尾元素、head為此隊列的頭元素。這裏有一個Node類,下面看Node類的實現,
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item= x; } }
Node是LinkedBlockingQueue的靜態內部類,也是組成此隊列的節點元素,其內部有兩個屬性,一個Item代表節點元素,next指向下一個Node節點,這樣就可以得出LinkedBlockingQueue的結構是使用Node連接起來,頭節點為head,尾節點為last。
3、LinkedBlockingQueue(Collection<? extends E> c)
此構造方法是使用一個集合類進行構造,其定義如下
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
4、LinkedBlockingQueue的屬性
從上面的構造函數中可以大體了解其屬性有容量(capacity),隊列中的元素數量(count)、頭節點(head)、尾節點(last)等,如下
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
使用原子類AtomicInteger的count表示隊列中元素的個數,可以很好的處理並發,AtomicInteger底層大都是使用樂觀鎖進行操作,多線程下是安全的。
關註下takeLock、putLock這兩個屬性,這裏定義了兩把鎖,一把take鎖,另一把put鎖。通過兩把可重入鎖實現並發,與ArrayBlockingQueue的一把鎖相比。
二、隊列的操作
需要使用阻塞隊列,那麽就需要向隊列中添加或取出元素,在LinkedBlockingQueue中已經實現了相關操作,對於添加/取出均是成對出現,提供的方法中有拋出異常、返回false、線程阻塞等幾種情形。
1、put/take
put/take是一對互斥操作,put向隊列中添加元素,take從隊列中取出元素。
1.1、put
put方法定義如下,
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); //獲得添加鎖, final ReentrantLock putLock = this.putLock; //獲得當前隊列中的元素數量 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //如果當前隊列中元素的數量等於隊列的容量,則阻塞當前線程, while (count.get() == capacity) { notFull.await(); } enqueue(node); //當前線程中元素數量增1,返回操作前的數量 c = count.getAndIncrement(); //c+1其實是當前隊列中元素的數量,如果比容量小,則喚醒notFull的操作,即可以進行繼續添加,執行put等添加操作。 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
//說明在執行enqueue前的數量為0,執行完enqueue後數量為1,則需要喚醒取進程。 if (c == 0) signalNotEmpty(); }
此方法的執行步驟大體如下,
- 判斷要put的元素e是否為null,如果為null直接拋出空指針異常;
- e不為null,則使用e創建一個Node節點,獲得put鎖;
- 判斷當前隊列中的元素數量和隊列的容量,如果相等,則阻塞當前線程;
- 如果不相等,把生成的node節點插入隊列,enqueue方法定義如下,
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
- 使用原子操作類把當前隊列中的元素數量增1;如果添加後的隊列中的元素數量比容量小,則表示可以繼續執行put類的操作,喚醒notFull.singal();
- 如果c=0,即在enqueue前為空,數量為0(此時會阻塞take進程),enqueue後為1,則需要喚醒take進程,如下
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
1.2、take
take方法和put剛好相反,其定義如下,
public E take() throws InterruptedException { E x; int c = -1; //獲得當前隊列的元素數量 final AtomicInteger count = this.count; //獲得take鎖 final ReentrantLock takeLock = this.takeLock; //執行take操作 takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await();//阻塞當前線程 } x = dequeue(); //當前隊列的數量減1,返回操作前的數量 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); }
//當前隊列中元素數量為capacity-1,即未滿,可以調用put方法,需要喚醒阻塞在put鎖上的線程 if (c == capacity) signalNotFull(); return x; }
此方法的執行步驟大體如下,
- 獲得take鎖,表示執行take操作;
- 獲得當前隊列的元素數量,如果數量為0,則阻塞當前線程,直到被中斷或者被喚醒;
- 如果當前隊列的元素數量不額外i0,則執行出隊操作;
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head;//head賦值給h Node<E> first = h.next;//相當於第二個節點賦值給first h.next = h; // help GC head = first;//頭節點指向第二個節點 E x = first.item; first.item = null; return x; }
從上面的代碼可以看出把頭節點進行出隊,即head指向下一個節點
- 當前隊列的元素數量減一,並返回操作前的數量;
- 如果之前大於1(c最小為2),指向dequeue後數量最小為1,證明隊列中仍有元素,需要喚醒獲得take鎖的其他阻塞線程,take.singal();
- 如果c等於當前隊列的容量(執行完dequeue後,當前隊列中元素的數量等於capacity-1,則未滿),則需要喚醒獲得put鎖的其他put線程;
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try {
//喚醒阻塞在put鎖的其他線程 notFull.signal(); } finally { putLock.unlock(); } }
2、offer/poll
2.1、offer方法
offer方法的定義如下,
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
此方法的執行步驟大體如下,
- 判斷當前隊列中的元素數量和隊列容量,如果相等,直接返回false;
- 如果當前隊列中元素數量小於隊列容量,執行入隊操作;
- 入隊操作之後,判斷隊列中元素數量如果仍小於隊列容量,喚醒其他的阻塞線程;
- 如果c==0(即入隊成功,隊列中元素的數量為1),則需要喚醒阻塞在put鎖的線程;
2.2、poll
poll方法定義如下,
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
此方法的執行步驟大體如下,
- 如果當前隊列元素數量為0,直接返回null;
- 如果當前隊列元素數量大於0,執行出隊操作;
- 如果c>1,即c最小為2,則出隊成功後,仍有1個元素,可以喚醒阻塞在take鎖的線程;
- 如果c=capacity,則出隊成功後,隊列中的元素為capacity-1,這時隊列為滿,可以喚醒阻塞在put鎖上的其他線程,即可以添加元素;
3、offer(E e, long timeout, TimeUnit unit)/poll(long timeout, TimeUnit unit)
3.1、offer(E e, long timeout, TimeUnit unit)
在規定的時間內阻塞線程,其定義如下
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
此方法的執行步驟大體如下,
- 如果當前隊列中元素的數量等於隊列的容量,則在規定時間內會一直阻塞,如果超過了規定時間則直接返回false;
- 如果在規定時間內當前隊列中元素的數量不等於隊列容量,則跳出了while循環,則執行入隊操作;
- 判斷入隊前隊列中元素的數量,如果小於隊列容量,則喚醒其他put鎖的阻塞線程;如果等於0,則入隊後元素數量大於0,則喚醒take鎖阻塞的線程;
3.2、poll(long timeout, TimeUnit unit)
其方法定義如下,
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
此方法的執行步驟大體如下,
- 如果當前隊列中的元素數量為0,則在規定的時間內阻塞線程;如果超過了規定時間直接返回null;
- 執行出隊操作,出隊成功後,判斷出隊前隊列中元素的數量,如果大於1(最小為2),則喚醒其他阻塞在take鎖上的線程;
- 如果出隊前隊列中元素數量等於隊列容量,則出隊後隊列中元素數量為capacity-1,則喚醒阻塞在put鎖上的線程;
三、方法比較
3.1、添加方法比較
序號 | 方法名 | 隊列滿時處理方式 | 方法返回值 |
1 | offer(E e) | 返回false | boolean |
2 | put(E e) | 線程阻塞,直到中斷或被喚醒 | void |
3 | offer(E e, long timeout, TimeUnit unit) | 在規定時間內重試,超過規定時間返回false | boolean |
3.2、取出方法比較
序號 | 方法名 | 隊列空時處理方式 | 方法返回值 |
1 | poll() | 返回null | E |
2 | take() | 線程阻塞,直到中斷或被喚醒 | E |
3 | poll(long timeout, TimeUnit unit) | 在規定時間內重試,超過規定時間返回null | E |
四、總結
以上是關於LinkedBlockingQueue隊列的相關實現及方法介紹,此隊列使用單向鏈表為載體,配合put/take鎖實現生產線程和消費線程共享數據。LinkedBlockingQueue作為共享的數據池,實現並發環境下的添加及取出方法。
有不正之處歡迎指正,感謝!
java阻塞隊列之LinkedBlockingQueue