1. 程式人生 > 其它 >Java併發49:併發集合系列-基於獨佔鎖+連結串列實現的單向阻塞無界佇列LinkedBlockingQueue

Java併發49:併發集合系列-基於獨佔鎖+連結串列實現的單向阻塞無界佇列LinkedBlockingQueue

原文地址:http://www.importnew.com/25583.html

一、前言

前面介紹了使用CAS實現的非阻塞佇列ConcurrentLinkedQueue,下面就來介紹下使用獨佔鎖實現的阻塞佇列LinkedBlockingQueue的實現。

二、 LinkedBlockingQueue類圖結構

如圖LinkedBlockingQueue中:

  • 也有兩個Node分別用來存放首尾節點,
  • 並且裡面有個初始值為0的原子變數count用來記錄佇列元素個數,
  • 另外裡面有兩個ReentrantLock的獨佔鎖,分別用來控制元素入隊和出隊加鎖,
  • 其中takeLock用來控制同時只有一個執行緒可以從佇列獲取元素,其他執行緒必須等待,
  • putLock控制同時只能有一個執行緒可以獲取鎖去新增元素,其他執行緒必須等待。
  • 另外notEmpty和notFull用來實現入隊和出隊的同步。

另外由於出入隊是兩個非公平獨佔鎖,所以可以同時有一個執行緒入隊和一個執行緒出隊,其實這個是個生產者-消費者模型。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);
public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾節點 last = head = new Node<E>(null); }

如圖預設佇列容量為0x7fffffff,使用者也可以自己指定容量。

三、必備基礎

3.1 ReentrantLock

可以參考:Java併發19:Lock系列-Lock介面基本方法學習例項

3.2 條件變數(Condition)

可以參考:Java併發20:Lock系列-Condition介面基本方法學習例項

四 、帶超時時間的offer操作-生產者

在隊尾新增元素,

  • 如果佇列滿了,那麼等待timeout時候,如果時間超時則返回false,
  • 如果在超時前佇列有空餘空間,則插入後返回true。
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    //空元素拋空指標異常
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;

    //獲取可被中斷鎖,只有一個執行緒克獲取
    putLock.lockInterruptibly();
    try {

        //如果佇列滿則進入迴圈
        while (count.get() == capacity) {
            //nanos<=0直接返回
            if (nanos <= 0)
                return false;
            //否者呼叫await進行等待,超時則返回<=0(1)
            nanos = notFull.awaitNanos(nanos);
        }
        //await在超時時間內返回則新增元素(2)
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();

        //佇列不滿則啟用其他等待入隊執行緒(3)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //釋放鎖
        putLock.unlock();
    }

    //c==0說明佇列裡面有一個元素,這時候喚醒出隊執行緒(4)
    if (c == 0)
        signalNotEmpty();
    return true;
}

private void enqueue(Node<E> node) {   
    last = last.next = node;
}

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

五、 帶超時時間的poll操作-消費者

獲取並移除隊首元素,在指定的時間內去輪詢佇列看有沒有首元素有則返回,否者超時後返回null

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;

    //出隊執行緒獲取獨佔鎖
    takeLock.lockInterruptibly();
    try {

        //迴圈直到佇列不為空
        while (count.get() == 0) {

            //超時直接返回null
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }

        //出隊,計數器減一
        x = dequeue();
        c = count.getAndDecrement();

        //如果出隊前佇列不為空則傳送訊號,啟用其他阻塞的出隊執行緒
        if (c > 1)
            notEmpty.signal();
    } finally {
        //釋放鎖
        takeLock.unlock();
    }

    //當前佇列容量為最大值-1則啟用入隊執行緒。
    if (c == capacity)
        signalNotFull();
    return x;
}

六、put操作-生產者

與帶超時時間的poll類似不同在於put時候如果當前佇列滿了它會一直等待其他執行緒呼叫notFull.signal才會被喚醒。

七、 take操作-消費者

與帶超時時間的poll類似不同在於take時候如果當前佇列空了它會一直等待其他執行緒呼叫notEmpty.signal()才會被喚醒。

八、 size操作

當前佇列元素個數,如程式碼直接使用原子變數count獲取。

public int size() {
    return count.get();
}

九、peek操作

獲取但是不移除當前佇列的頭元素,沒有則返回null:

public E peek() {
    //佇列空,則返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

十、 remove操作

刪除佇列裡面的一個元素,有則刪除返回true,沒有則返回false。

在刪除操作時候由於要遍歷佇列所以加了雙重鎖,也就是在刪除過程中不允許入隊也不允許出隊操作:

public boolean remove(Object o) {
    if (o == null) return false;

    //雙重加鎖
    fullyLock();
    try {

        //遍歷佇列找則刪除返回true
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        //找不到返回false
        return false;
    } finally {
        //解鎖
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

void unlink(Node<E> p, Node<E> trail) {

    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    //如果當前佇列滿,刪除後,也不忘記最快的喚醒等待的執行緒
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

十一、開源框架中使用

tomcat中任務佇列TaskQueue

11.1 類圖結構

可知TaskQueue繼承了LinkedBlockingQueue並且泛化型別固定了為Runnalbe.重寫了offer、poll、take方法。

十二、總結

LinkedBlockingQueue與ConcurrentLinkedQueue相比前者是阻塞佇列,使用可重入獨佔的非公平鎖來實現。

通過使用put鎖和take鎖使得入隊和出隊解耦可以同時進行處理,但是同時只有一個執行緒可以入隊或者出隊,其他執行緒必須等待。

另外引入了條件變數來進行入隊和出隊的同步,每個條件變數維護一個條件佇列用來存放阻塞的執行緒。

LinkedBlockingQueue的size操作通過使用原子變數count獲取能夠比較精確的獲取當前佇列的元素個數。

另外remove方法使用雙鎖保證刪除時候佇列元素保持不變,另外其實這個是個生產者-消費者模型。


而ConcurrentLinkedQueue則使用CAS非阻塞演算法來實現,使用CAS原子操作保證連結串列構建的安全性。

當多個執行緒併發時候CAS失敗的執行緒不會被阻塞,而是使用cpu資源去輪詢CAS直到成功。

size方法先比LinkedBlockingQueue的獲取的個數是不精確的,因為獲取size的時候是通過遍歷佇列進行的,而遍歷過程中可能進行增加刪除操作,remove方法操作時候也沒有對整個佇列加鎖。

remove時候可能進行增加刪除操作,這就可能刪除了一個剛剛新增的元素,而不是刪除的想要位置的。