1. 程式人生 > 其它 >Java集合類框架原始碼分析 之 BlockingQueue介面原始碼解析 【11】

Java集合類框架原始碼分析 之 BlockingQueue介面原始碼解析 【11】

技術標籤:Java集合類原始碼分析BlockingQueue原始碼BlockingQueue解析

先看類簡介:

/**
* 這是一個佇列,當從queue中檢索一個元素,或者向queue中插入一個元素的時候,可以進行等待,等待該queue變為非空,及等待queue中有足夠的空間可用。
* 等待queue變成非空當檢索一個元素,並且等待queue中的空間變為可用當儲存一個元素的時候。
* A {@link java.util.Queue} that additionally supports operations
* that wait for the queue to become non-empty when retrieving an
* element, and wait for space to become available in the queue when
* storing an element.
*
* BlockingQueue的所有方法有四種形勢,它們以不同的方式處理不能立即滿足的操作,但可以在將來的某個時候滿足。
* 第一個丟擲異常,第二個返回特定的值null或者false,第三個阻塞當前執行緒直到操作成功,第四個阻塞給定的最大時間,然後放棄。
* <p>{@code BlockingQueue} methods come in four forms, with different ways
* of handling operations that cannot be satisfied immediately, but may be
* satisfied at some point in the future:
* one throws an exception, the second returns a special value (either
* {@code null} or {@code false}, depending on the operation), the third
* blocks the current thread indefinitely until the operation can succeed,
* and the fourth blocks for only a given maximum time limit before giving
* up. These methods are summarized in the following table:
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
* <caption>Summary of BlockingQueue methods</caption>
* <tr>
*  <td></td>
*  <td ALIGN=CENTER><em>Throws exception</em></td>
*  <td ALIGN=CENTER><em>Special value</em></td>
*  <td ALIGN=CENTER><em>Blocks</em></td>
*  <td ALIGN=CENTER><em>Times out</em></td>
* </tr>
* <tr>
*  <td><b>Insert</b></td>
*  <td>{@link #add add(e)}</td>
*  <td>{@link #offer offer(e)}</td>
*  <td>{@link #put put(e)}</td>
*  <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td>
* </tr>
* <tr>
*  <td><b>Remove</b></td>
*  <td>{@link #remove remove()}</td>
*  <td>{@link #poll poll()}</td>
*  <td>{@link #take take()}</td>
*  <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td>
* </tr>
* <tr>
*  <td><b>Examine</b></td>
*  <td>{@link #element element()}</td>
*  <td>{@link #peek peek()}</td>
*  <td><em>not applicable</em></td>
*  <td><em>not applicable</em></td>
* </tr>
* </table>
*
* BlockingQueue不接受null元素,當嘗試新增null的時候,add()/put()/offer()會丟擲 NullPointerException異常。null只能作為方法操作失敗時的返回值。
* <p>A {@code BlockingQueue} does not accept {@code null} elements.
* Implementations throw {@code NullPointerException} on attempts
* to {@code add}, {@code put} or {@code offer} a {@code null}. A
* {@code null} is used as a sentinel value to indicate failure of
* {@code poll} operations.
*
* BlockingQueue 或許能力有限,在給定的時間,remainingCapacity說明還有這麼多的元素可以非阻塞的新增。
* 沒有手動指明queue的容量時,該容量為 Integer.MAX_VALUE;
* <p>A {@code BlockingQueue} may be capacity bounded. At any given
* time it may have a {@code remainingCapacity} beyond which no
* additional elements can be {@code put} without blocking.
* A {@code BlockingQueue} without any intrinsic capacity constraints always
* reports a remaining capacity of {@code Integer.MAX_VALUE}.
*
* BlockingQueue 實現主要被使用者生產者-消費者佇列,但同時還支援 Collection介面。因此,也可以使用remove()來移除佇列中的任意元素。
* 然而,這些方法不會被很有效的執行,僅僅偶爾使用,比如取消一個排隊的訊息。
* <p>{@code BlockingQueue} implementations are designed to be used
* primarily for producer-consumer queues, but additionally support
* the {@link java.util.Collection} interface. So, for example, it is
* possible to remove an arbitrary element from a queue using
* {@code remove(x)}. However, such operations are in general
* <em>not</em> performed very efficiently, and are intended for only
* occasional use, such as when a queued message is cancelled.
*
* BlockingQueue是執行緒安全的,所有的排隊方法自動使用內建鎖或其他形式的併發控制。
* 然而,在一個實現中,除非特殊指定,集合的批量操作addAll()/containsAll()/retainAll()/removeAll()不必自動執行。
* 因此,對於addAll(c)操作來說,可能會在只新增一部分元素後就失敗丟擲異常的情況,即c中的元素沒有完全新增。
* <p>{@code BlockingQueue} implementations are thread-safe. All
* queuing methods achieve their effects atomically using internal
* locks or other forms of concurrency control. However, the
* <em>bulk</em> Collection operations {@code addAll},
* {@code containsAll}, {@code retainAll} and {@code removeAll} are
* <em>not</em> necessarily performed atomically unless specified
* otherwise in an implementation. So it is possible, for example, for
* {@code addAll(c)} to fail (throwing an exception) after adding
* only some of the elements in {@code c}.
*
* BlockingQueue不支援close或者shutdown操作來表明不再新增更多的元素。
* 這些特性的需求和使用往往依賴於實現。例如,一個常見的策略是生產者插入特殊的<em>結束流</em>或<em>汙染</em>物件,當消費者使用這些物件時,會相應地進行處理。
* <p>A {@code BlockingQueue} does <em>not</em> intrinsically support
* any kind of &quot;close&quot; or &quot;shutdown&quot; operation to
* indicate that no more items will be added. The needs and usage of
* such features tend to be implementation-dependent. For example, a
* common tactic is for producers to insert special
* <em>end-of-stream</em> or <em>poison</em> objects, that are
* interpreted accordingly when taken by consumers.
*
*
* 用法例子:基於典型的生產者-消費者場景,注意 BlockingQueue可以被安全地用於多生產者和多消費者。
* Usage example, based on a typical producer-consumer scenario.
* Note that a {@code BlockingQueue} can safely be used with multiple
* producers and multiple consumers.
* <pre> {@code
* class Producer implements Runnable {
*  private final BlockingQueue queue;
*  Producer(BlockingQueue q) { queue = q; }
*  public void run() {
*   try {
*    while (true) { queue.put(produce()); }
*   } catch (InterruptedException ex) { ... handle ...}
*  }
*  Object produce() { ... }
* }
*
* class Consumer implements Runnable {
*  private final BlockingQueue queue;
*  Consumer(BlockingQueue q) { queue = q; }
*  public void run() {
*   try {
*    while (true) { consume(queue.take()); }
*   } catch (InterruptedException ex) { ... handle ...}
*  }
*  void consume(Object x) { ... }
* }
*
* class Setup {
*  void main() {
*   BlockingQueue q = new SomeQueueImplementation();
*   Producer p = new Producer(q);
*   Consumer c1 = new Consumer(q);
*   Consumer c2 = new Consumer(q);
*   new Thread(p).start();
*   new Thread(c1).start();
*   new Thread(c2).start();
*  }
* }}</pre>
*
* 記憶體一致性影響:就像其他的併發集合,在一個執行緒中放置一個元素到 BlockingQueue中,同時有另外一個執行緒,將這個元素從佇列中移除。
* <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a
* {@code BlockingQueue}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions subsequent to the access or removal of that element from
* the {@code BlockingQueue} in another thread.
*
* <p>This interface is a member of the
* <a href="{@docRoot}/../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/

從類簡介中我們可以看出來有這麼7點需要注意:

1、向一個 BlockingQueue中插入或者從中檢索一個數據的時候,會進行等待,而且等待可以設定等待的超時時間。

2、BlockingQueue的所有方法有四種形勢,它們以不同的方式處理不能立即滿足的操作。

第一個丟擲異常,第二個返回特定的值null或者false,第三個阻塞當前執行緒直到操作成功,第四個阻塞給定的最大時間,然後放棄。

如圖:

3、BlockingQueue不接受null元素,當嘗試新增null的時候,add()/put()/offer()會丟擲 NullPointerException異常。null只能作為方法操作失敗時的返回值。

4、BlockingQueue 儲存容量並非無上限,在給定的時間,remainingCapacity指明還有這麼多的元素可以非阻塞的新增。如果沒有手動指明queue的容量時,該容量為 Integer.MAX_VALUE;

5、BlockingQueue 實現主要被使用者生產者-消費者佇列,但同時還支援 Collection介面。因此,也可以使用remove()來移除佇列中的任意元素。然而,這些方法不會被很有效的執行,僅僅偶爾使用,比如取消一個排隊的訊息。

6、BlockingQueue是執行緒安全的,所有的排隊方法自動使用內建鎖或其他形式的併發控制。然而,在一個實現中,除非特殊指定,集合的批量操作addAll()/containsAll()/retainAll()/removeAll()不必自動執行。因此,對於addAll(c)操作來說,可能會在只新增一部分元素後就失敗丟擲異常的情況,即c中的元素沒有完全新增。

7、BlockingQueue不支援close或者shutdown操作來表明不再新增更多的元素。這些特性的需求和使用往往依賴於自己實現。例如,一個常見的策略是生產者插入特殊的流結束符或者非法汙染物件,當消費者使用這些物件時,會相應地進行處理。

8、用法例子:基於典型的生產者-消費者場景,注意 BlockingQueue可以被安全地用於多生產者和多消費者。

9、記憶體一致性影響:就像其他的併發集合,在一個執行緒中放置一個元素到 BlockingQueue中,同時有另外一個執行緒,將這個元素從佇列中移除。

類簡介介紹完畢。

下邊介紹該介面的方法:

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 向佇列中插入元素,成功返回true,失敗丟擲 IllegalStateException 異常。
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and throwing an
     * {@code IllegalStateException} if no space is currently available.
     * When using a capacity-restricted queue, it is generally preferable to
     * use {@link #offer(Object) offer}.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean add(E e);

    /**
     * 成功返回true,失敗返回false。當用於有容量限制的queue,這個方法通常比add()更好,add()方法失敗時只會丟擲異常。
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and {@code false} if no space is currently
     * available.  When using a capacity-restricted queue, this method is
     * generally preferable to {@link #add}, which can fail to insert an
     * element only by throwing an exception.
     *
     * @param e the element to add
     * @return {@code true} if the element was added to this queue, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e);

    /**
     * 向queue中插入指定的元素,當沒有足夠的空間的時候,會進行等待。
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

    /**
     * 向queue中插入指定的元素,當沒有足夠的空間的時候,會進行等待。等待的時間,按照 timeout指定,單位是 unit,超過等待的時間後,返回false
     * TimeUnit 是一個列舉類,包含這些元素:NANOSECONDS ,MICROSECONDS ,MILLISECONDS ,SECONDS ,MINUTES ,HOURS ,DAYS
	 *
     * Inserts the specified element into this queue, waiting up to the
     * specified wait time if necessary for space to become available.
     *
     * @param e the element to add
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 檢索並刪除佇列的頭元素,如果需要,會一直等到元素可用
     * 等待過程中,如果被幹擾,就會丟擲 InterruptedException 異常。
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take() throws InterruptedException;

    /**
     * 檢索並刪除第一個元素,會等待指定的時間,直到元素變得可用。
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 返回剩餘空間大小,如果沒有限制,返回 Integer.MAX_VALUE
     * Returns the number of additional elements that this queue can ideally
     * (in the absence of memory or resource constraints) accept without
     * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
     * limit.
     *
     * 注意不能依賴這個值來判定一定可以成功插入,因為還要考慮到有其他的執行緒增加或者刪除元素。
     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
     * an element will succeed by inspecting {@code remainingCapacity}
     * because it may be the case that another thread is about to
     * insert or remove an element.
     *
     * @return the remaining capacity
     */
    int remainingCapacity();

    /**
     * 刪除佇列中的某個元素,如果元素存在多個,則刪除一個。
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     */
    boolean remove(Object o);

    /**
     * 如果佇列至少包含指定的一個元素,則返回true
     * Returns {@code true} if this queue contains the specified element.
     * More formally, returns {@code true} if and only if this queue contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     *
     * @param o object to be checked for containment in this queue
     * @return {@code true} if this queue contains the specified element
     * @throws ClassCastException if the class of the specified element
     *         is incompatible with this queue
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     * @throws NullPointerException if the specified element is null
     *         (<a href="../Collection.html#optional-restrictions">optional</a>)
     */
    public boolean contains(Object o);

    /**
     * 從queue中移除所有的可用元素,並新增到給定的集合中。這個方法比重複呼叫poll()會更加有效。
     * 當嘗試新增一個元素到指定的集合中的時候,可能導致該元素同時在兩個容器中,或者在這個裡邊不在另一個裡邊,或者兩個容器中都不存在的錯誤。
	 * 如果嘗試向自身執行drain操作,會丟擲 IllegalArgumentException 異常。而且,當這個操作執行過程中,如果特定的集合發生了改變,行為將會不可知。
     * Removes all available elements from this queue and adds them
     * to the given collection.  This operation may be more
     * efficient than repeatedly polling this queue.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     *
     * @param c the collection to transfer elements into
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c);

    /**
     * 操作和drainTo(Collection<? super E> c)相同,只是對遷移的數量進行限制。
     * 如何限制呢?從頭到尾,只遷移最大數量的元素到指定的集合中?存疑。
     * Removes at most the given number of available elements from
     * this queue and adds them to the given collection.  A failure
     * encountered while attempting to add elements to
     * collection {@code c} may result in elements being in neither,
     * either or both collections when the associated exception is
     * thrown.  Attempts to drain a queue to itself result in
     * {@code IllegalArgumentException}. Further, the behavior of
     * this operation is undefined if the specified collection is
     * modified while the operation is in progress.
     *
     * @param c the collection to transfer elements into
     * @param maxElements the maximum number of elements to transfer
     * @return the number of elements transferred
     * @throws UnsupportedOperationException if addition of elements
     *         is not supported by the specified collection
     * @throws ClassCastException if the class of an element of this queue
     *         prevents it from being added to the specified collection
     * @throws NullPointerException if the specified collection is null
     * @throws IllegalArgumentException if the specified collection is this
     *         queue, or some property of an element of this queue prevents
     *         it from being added to the specified collection
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

方法介紹簡介:

package java.util.concurrent;

import java.util.Collection;
import java.util.Queue;

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 向佇列中插入元素,成功返回true,失敗丟擲 IllegalStateException 異常。
     */
    boolean add(E e);

    /**
     * 成功返回true,失敗返回false。當用於有容量限制的queue,這個方法通常比add()更好,add()方法失敗時只會丟擲異常。
     */
    boolean offer(E e);

    /**
     * 向queue中插入指定的元素,當沒有足夠的空間的時候,會進行等待。
     */
    void put(E e) throws InterruptedException;

    /**
     * 向queue中插入指定的元素,當沒有足夠的空間的時候,會進行等待。等待的時間,按照 timeout指定,單位是 unit,超過等待的時間後,返回false
     * TimeUnit 是一個列舉類,包含這些元素:NANOSECONDS ,MICROSECONDS ,MILLISECONDS ,SECONDS ,MINUTES ,HOURS ,DAYS
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 檢索並刪除佇列的頭元素,如果需要,會一直等到元素可用
     * 等待過程中,如果被幹擾,就會丟擲 InterruptedException 異常。
     */
    E take() throws InterruptedException;

    /**
     * 檢索並刪除第一個元素,會等待指定的時間,直到元素變得可用。
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 返回剩餘空間大小,如果沒有限制,返回 Integer.MAX_VALUE
     */
    int remainingCapacity();

    /**
     * 刪除佇列中的某個元素,如果元素存在多個,則刪除一個。
     */
    boolean remove(Object o);

    /**
     * 如果佇列至少包含指定的一個元素,則返回true
     */
    public boolean contains(Object o);

    /**
     * 從queue中移除所有的可用元素,並新增到給定的集合中。這個方法比重複呼叫poll()會更加有效。
     * 當嘗試新增一個元素到指定的集合中的時候,可能導致該元素同時在兩個容器中,或者在這個裡邊不在另一個裡邊,或者兩個容器中都不存在的錯誤。
	 * 如果嘗試向自身執行drain操作,會丟擲 IllegalArgumentException 異常。而且,當這個操作執行過程中,如果特定的集合發生了改變,行為將會不可知。
     */
    int drainTo(Collection<? super E> c);

    /**
     * 操作和drainTo(Collection<? super E> c)相同,只是對遷移的數量進行限制。
     * 如何限制呢?從頭到尾,只遷移最大數量的元素到指定的集合中?存疑。
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

其他的方法還都是比較明確,只有一個int drainTo(Collection<? super E> c, int maxElements);說的是可以向指定的集合中遷移最大數量的元素,但是遷移的規則並不確定,我們可以進行程式碼驗證:

以ArrayBlockingQueue為例:

public class SoureBlockingQueue {
	
	public static void main(String[] args) {
		BlockingQueue<String> bl = new ArrayBlockingQueue<String>(20) ;
		for(int i = 0 ;i<20 ;i++){
			bl.add(String.valueOf(i));
		}
		System.out.println(bl);
		ArrayList<String> al = new ArrayList<String>();
		bl.drainTo(al, 10) ;
		System.out.println(bl);
		System.out.println(al);
	}
}

// 結果:
// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
// [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

可以看到,遷移的最大數量的元素,是從頭到尾開始進行遷移的,剩餘的元素仍然保留在原來的BlockingQueue佇列中。

紙上得來終覺淺,絕知此事要躬行,實踐出真理,驗證得真知,不能怕麻煩。