1. 程式人生 > >Java併發學習(二十三)-LinkedBlockingQueue和LinkedBlockingDeque分析

Java併發學習(二十三)-LinkedBlockingQueue和LinkedBlockingDeque分析

有兩個比較相似的併發阻塞佇列,LinkedBlockingQueue和LinkedBlockingDeque,兩個都是佇列,只不過前者只能一端出一端入,後者則可以兩端同時出入,並且都是結構改變執行緒安全的佇列。其實兩個佇列從實現思想上比較容易理解,有以下特點:

  • 連結串列結構(動態陣列)
  • 通過ReentrantLock實現鎖
  • 利用Condition實現佇列的阻塞等待,喚醒

以下將分開講述LinkedBlockingQueue和LinkedBlockingDeque的基本特點及操作。

LinkedBlockingQueue

這是一個只能一端出一端如的單向佇列結構,是有FIFO特性的,並且是通過兩個ReentrantLock和兩個Condition來實現的。先看它的結構基本欄位:

/**
 * 基於連結串列。
 * FIFO
 * 單向
 *最大容量是Integer.MAX_VALUE.
 */
public class LinkedBlockingQueueAnalysis<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /*
     * 兩個方向。
     * putLock
     * takeLock
     * 有些操作會需要同時獲取兩把鎖。
     * 例如remove操作,也需要獲取兩把鎖
     */
//主要的node節點 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //容量,一開始就固定了的。 private final int capacity; //用AtomicInteger 來記錄數量。 private final AtomicInteger count = new AtomicInteger(); //head節點 head.item == null transient Node<E> head; //last節點,last.next == null
private transient Node<E> last; //take鎖 private final ReentrantLock takeLock = new ReentrantLock(); //等待take的節點序列。 private final Condition notEmpty = takeLock.newCondition(); //put的lock。 private final ReentrantLock putLock = new ReentrantLock(); //等待puts的佇列。 private final Condition notFull = putLock.newCondition(); ... }

和LinkedBlockingDeque的區別之一就是,LinkedBlockingQueue採用了兩把鎖來對佇列進行操作,也就是隊尾新增的時候,
隊頭仍然可以刪除等操作。接下來看典型的操作。

put操作

首先看put操作:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();   //e不能為null
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;     //獲取put鎖
        final AtomicInteger count = this.count;          //獲取count
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {        //如果滿了,那麼就需要使用notFull阻塞
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)                    //如果此時又有空間了,那麼notFull喚醒
                notFull.signal();
        } finally {
            putLock.unlock();             //釋放鎖
        }
        if (c == 0)            //當c為0時候,也要根take鎖說一下,併發下
            signalNotEmpty();        //呼叫notEmpty        
    }

主要的思想還是比較容易理解的,現在看看enqueue 方法:

    private void enqueue(Node<E> node) {        //入對操作。
        last = last.next = node;      //隊尾進
    }

再看看signalNotEmpty方法:

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();        //加鎖
        try {
            notEmpty.signal();    //用於signal,notEmpty
        } finally {
            takeLock.unlock();
        }
    }

take操作

take操作,就是從佇列裡面彈出一個元素,下面看它的詳細程式碼:

    public E take() throws InterruptedException {
        E x;
        int c = -1;            //設定一個記錄變數
        final AtomicInteger count = this.count;     //獲得count
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();        //加鎖
        try {
            while (count.get() == 0) {       //如果沒有元素,那麼就阻塞性等待
                notEmpty.await();
            }
            x = dequeue();            //一定可以拿到。
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();        //報告還有元素,喚醒佇列
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();           //解鎖
        return x;
    }

接下來看dequeue方法:

   private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h;        // help GC 指向自己,幫助gc回收
        head = first;
        E x = first.item;       //從隊頭出。
        first.item = null;      //將head.item設為null。
        return x;
    }

對於LinkedBlockingQueue來說,有兩個ReentrantLock分別控制隊頭和隊尾,這樣就可以使得新增操作分開來做,一般的操作是獲取一把鎖就可以,但有些操作例如remove操作,則需要同時獲取兩把鎖:

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();     //獲取鎖
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {     //依次迴圈遍歷
                if (o.equals(p.item)) {       //找到了
                    unlink(p, trail);       //解除連結
                    return true;
                }
            }
            return false;        //沒找到,或者解除失敗
        } finally {
            fullyUnlock();
        }
    }

當然,除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是需要加全鎖進行操作的。

LinkedBlockingDeque

名字很相近,LinkedBlockingDeque就是一個雙端佇列,任何一端都可以進行元素的出入,接下來看它的主要欄位:

/**
 * 雙端佇列。
 * 最大值是Integer.MAX_VALUE
 * 所謂弱一致性有利於刪除,有點理解了,
 * 或許是比如clear方法,不知直接把引用置為null,而是一個個解除連線。
 * 利用lock鎖去控制併發訪問,利用condition去控制阻塞
 * weakly consistent的iterators。
 * 我們需要保持所有的node都要是gc可達的。
 */
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    //雙向聯結的節點。
    static final class Node<E> {
        E item;    //泛型的item變數

        // 前一個節點
        Node<E> prev;

        //next後一個節點
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    //頭節點
    transient Node<E> first;

    //尾節點。
    transient Node<E> last;

    //count,表示數值。
    private transient int count;

    //容量
    private final int capacity;

    //實現控制訪問的鎖
    final ReentrantLock lock = new ReentrantLock();

    //take的Condition
    private final Condition notEmpty = lock.newCondition();

    //put的Condition
    private final Condition notFull = lock.newCondition();
    ...
}

從上面的結果來看,其實LinkedBlockingDeque的結構上來說,有點像ArrayBlockingQueue的構造,也是一個ReentrantLock和兩個Condition,下面分別對其中重要方法進行分析。

  • public void addFirst(E e)
  • public void addLast(E e)
  • public boolean offerFirst(E e)
  • public boolean offerLast(E e)

對於LinkedBlockingDeque,和ArrayBlockingQueue結構還是很類似的,也是一個ReentrantLock和兩個Condition使用,但是僅僅是在這二者使用上,其實內部運轉還是很大不同的。

offerFirst操作

offerFirst就是在隊頭新增一個元素:

    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;    //加鎖
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }

接下來看linkFirst方法:

    private boolean linkFirst(Node<E> node) {
        if (count >= capacity)         //容量滿了
            return false;
        Node<E> f = first;           //在隊頭新增
        node.next = f;
        first = node;
        if (last == null)        //第一個節點
            last = node;
        else
            f.prev = node;
        ++count;              //count自增
        notEmpty.signal();           //說明不為null。喚醒等待佇列
        return true;
    }

其他的方法類似,都是加鎖後對連結串列的操作,這裡就不贅述了。

clear操作

其實我一開始看clear操作時候,總以為它是直接把first和last分別置為null就行了,非常簡單,但實際上,它的實現方法卻是遍歷以便,分別把所有node指標都指向null從而方便gc。

    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();               //加鎖後清空所有。
        try {
            for (Node<E> f = first; f != null; ) {   //遍歷一遍
                f.item = null;       //置空操作
                Node<E> n = f.next;
                f.prev = null;
                f.next = null;
                f = n;      //f後移動一個
            }
            first = last = null;
            count = 0;
            notFull.signalAll();           //通知等待put執行緒
        } finally {
            lock.unlock();
        }
    }

這樣的思路很值得學習借鑑。

總結

總的來說,這兩個阻塞佇列實現上還是比較容易理解的,具體細節方面還是很值得閱讀的。