1. 程式人生 > 程式設計 >JDK原始碼分析-LinkedBlockingQueue

JDK原始碼分析-LinkedBlockingQueue

概述


前文「JDK原始碼分析-ArrayBlockingQueue」分析了 ArrayBlockingQueue 的程式碼實現,LinkedBlockingQueue 也是阻塞佇列的實現。與前者不同的是,後者內部是由連結串列實現的。

LinkedBlockingQueue 的繼承結構如下:

下面分析其主要方法的程式碼實現。

程式碼分析

LinkedBlockingQueue 內部有一個巢狀類 Node,它表示連結串列的節點,如下:

static class Node<E> {    E item; // 節點元素
   Node<E> next; // 後繼節點        Node(E x) { item = x; }

}

PS: 從 Node 定義可以看出該連結串列是一個單連結串列。

主要成員變數

// 連結串列的容量(若不指定則為 Integer.MAX_VALUE)private final int capacity;// 當前元素的數量(即連結串列中元素的數量)private final AtomicInteger count = new AtomicInteger();// 連結串列的頭節點(節點元素為空)transient Node<E> head;// 連結串列的尾結點(節點元素為空)private transient Node<E> last;// take、poll 等出隊操作持有的鎖private

final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */// 出隊鎖的條件佇列private final Condition notEmpty = takeLock.newCondition();// put、offer 等入隊操作的鎖private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */// 入隊鎖的條件佇列private final Condition notFull = putLock.newCondition();

構造器

LinkedBlockingQueue 有三個構造器,分別如下:

// 構造器 1:無參構造器,初始容量為 Integer.MAX_VALUE,即 2^31-1public LinkedBlockingQueue() {    this(Integer.MAX_VALUE);}// 構造器 2:指定容量的構造器public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;    // 初始化連結串列的頭尾節點    last = head = new Node<E>(null);// 構造器 3:用給定集合初始化的構造器public LinkedBlockingQueue(Collection<? extends E> c) {    // 呼叫構造器 2 進行初始化    final ReentrantLock putLock = this.putLock;    putLock.lock(); // Never contended,but necessary for visibility    try {        int n = 0;        for (E e : c) {            if (e == null)                throw new NullPointerException();            if (n == capacity)                throw new IllegalStateException("Queue full");            // 將集合中的元素封裝成 Node 物件,並新增到連結串列末尾            enqueue(new Node<E>(e));            ++n;        }        count.set(n);    } finally {        putLock.unlock();    }}

enqueue 方法如下:

// 將 node 節點新增到連結串列末尾private void enqueue(Node<E> node) {    last = last.next = node;public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    // Note: convention in all put/take/etc is to preset local var    // holding count negative to indicate failure unless set.    int c = -1;    // 把 E 封裝成 Node 節點    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    final AtomicInteger count = this.count;    putLock.lockInterruptibly();        /*         * Note that count is used in wait guard even though it is         * not protected by lock. This works because count can         *>         * out by lock),and we (or some other waiting put) are         * signalled if it ever changes from capacity. Similarly         * for all other uses of count in other wait guards.         */        // 若佇列已滿,notFull 等待(類比生產者)        while (count.get() == capacity) {            notFull.await();        // node 入隊        enqueue(node);        c = count.getAndIncrement();        // 若該元素新增後,佇列仍未滿,喚醒一個其他生產者執行緒        if (c + 1 < capacity)            notFull.signal();    // c==0 說明之前佇列為空,出隊執行緒處於等待狀態,    // 新增一個元素後,將出隊執行緒喚醒(消費者)    if (c == 0)        signalNotEmpty();private void signalNotEmpty() {    final ReentrantLock takeLock = this.takeLock;    takeLock.lock();        // 喚醒 notEmpty 條件下的一個執行緒(消費者)        notEmpty.signal();        takeLock.unlock();public boolean offer(E e,long timeout,TimeUnit unit)    throws InterruptedException {    long nanos = unit.toNanos(timeout);            // 等待超時,返回 false            if (nanos <= 0)                return false;            nanos = notFull.awaitNanos(nanos);        // 入隊        enqueue(new Node<E>(e));    return true;}

該方法與 put 操作類似,不同的是 put 方法在佇列滿時會一直等待,而該方法有超時時間,超時後返回 false。


3. offer(E):

public boolean offer(E e) {    if (e == null) throw new NullPointerException();    final AtomicInteger count = this.count;    // 若佇列已滿,直接返回 false    if (count.get() == capacity)        return false;    int c = -1;    Node<E> node = new Node<E>(e);    final ReentrantLock putLock = this.putLock;    putLock.lock();        // 佇列未滿,入隊        if (count.get() < capacity) {            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                // 佇列未滿,喚醒 notFull 下的執行緒,繼續入隊                notFull.signal();    } finally {    }    if (c == 0)    return c >= 0;public E take() throws InterruptedException {    E x;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();        // 佇列為空,notEmpty 條件下的執行緒等待(消費者)        while (count.get() == 0) {            notEmpty.await();        // 從佇列頭部刪除節點        x = dequeue();        c = count.getAndDecrement();        // 若佇列不為空,喚醒一個 notEmpty 條件下的執行緒(消費者)        if (c > 1)            notEmpty.signal();    // 佇列已經不滿了,喚醒 notFull 條件下的執行緒(生產者)    if (c == capacity)        signalNotFull();    return x;private E dequeue() {    Node<E> h = head; // 頭節點    Node<E> first = h.next; // 頭節點的後繼節點    h.next = h; // help GC // 後繼節點指向自己(從連結串列中刪除)    head = first; // 更新頭節點    E x = first.item; // 獲取要刪除節點的資料    first.item = null// 清空資料(新的頭節點)private void signalNotFull() {    putLock.lock();        // 喚醒生產者        notFull.signal();public E poll(long timeout,TimeUnit unit) throws InterruptedException {    E x = null;    final ReentrantLock takeLock = this.takeLock;        // 佇列已空        while (count.get() == 0) {            // 超時返回 null                return null;            nanos = notEmpty.awaitNanos(nanos);        // 若佇列不空,喚醒一個 notEmpty 條件下的執行緒(消費者)        c = count.getAndDecrement();        if (c > 1)    // 佇列不滿,喚醒 notFull 條件下的執行緒(生產者)    if (c == capacity)public E poll() {    // 佇列為空,返回 null    if (count.get() == 0)        return null;    E x = null;    takeLock.lock();        // 佇列不為空,出隊        if (count.get() > 0) {            x = dequeue();            c = count.getAndDecrement();            // 該元素出隊後,佇列仍不為空,喚醒其他消費者            if (c > 1)                notEmpty.signal();    // 佇列已經不滿,喚醒生產者public E peek() {    if (count.get() == 0)        return null;        // 頭節點的後繼節點        Node<E> first = head.next;        if (first == null)            return null;        else            return first.item;