1. 程式人生 > >java 阻塞隊列 LinkedBlockingQueue ArrayBlockingQueue 分析

java 阻塞隊列 LinkedBlockingQueue ArrayBlockingQueue 分析

java 阻塞隊列 linkedblockingqueue等


BlockingQueue是阻塞隊列接口類,該接口繼承了Queue接口

BlockingQueue實現類常見的有以下幾種。

  1. ArrayBlockingQueue:ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味著,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(譯者註:因為它是基於數組實現的,也就具有數組的特性:一旦初始化,大小就無法修改)。


  2. DelayQueue:DelayQueue 對元素進行持有直到一個特定的延遲到期。註入其中的元素必須實現 java.util.concurrent.Delayed 接口。


  3. LinkedBlockingQueue:LinkedBlockingQueue 內部以一個鏈式結構(鏈接節點)對其元素進行存儲。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限


  4. PriorityBlockingQueue:PriorityBlockingQueue 是一個無界的並發隊列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個隊列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。因此該隊列中元素的排序就取決於你自己的 Comparable 實現


  5. SynchronousQueue:SynchronousQueue 是一個特殊的隊列,它的內部同時只能夠容納單個元素。如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。


BlockingQueue接口提供了

3個添加元素方法

  1. add:添加元素到隊列裏,添加成功返回true,由於容量滿了添加失敗會拋出IllegalStateException異常

  2. offer:添加元素到隊列裏,添加成功返回true,添加失敗返回false

  3. put:添加元素到隊列裏,如果容量滿了會阻塞直到容量不滿

3個刪除方法。

  1. poll:刪除隊列頭部元素,如果隊列為空,返回null。否則返回元素。

  2. remove:基於對象找到對應的元素,並刪除。刪除成功返回true,否則返回false

  3. take:刪除隊列頭部元素,如果隊列為空,一直阻塞到隊列有元素並刪除



例子:生產者和消費者非常適合阻塞隊列,其實我也弄過redis作為生產者和消費者模式,redis的list非常適合做隊列,生產者放入隊列和消費者從隊列裏取出,同時也提供阻塞的取出等。先回歸到java的阻塞隊列裏,用LinkedBlockingQueue來做這個例子。

package com.basic.test;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by sdc on 2017/6/9.
 */
public class BlockingQueueTest {

    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
        Producter producer = new Producter(blockingQueue);
        Consumer consumer = new Consumer(blockingQueue);

        //創建5個生產者,5個消費者
        for (int i = 0; i < 10; i++) {
            if (i < 5) {
                new Thread(producer, "producer" + i).start();
            } else {
                new Thread(consumer, "consumer" + (i - 5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        producer.shutDown();
        consumer.shutDown();
    }


    static class Producter implements Runnable {

        private final BlockingQueue<Integer> blockingQueue;

        private volatile boolean flag;

        private Random random;

        public Producter(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag = false;
            random = new Random();

        }

        @Override
        public void run() {
            while (!flag) {
                int info = random.nextInt(100);
                try {
                    blockingQueue.put(info);
                    System.out.println(Thread.currentThread().getName() + "produce" + info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        public void shutDown() {
            flag = true;
        }
    }

    static class Consumer implements Runnable {

        private final BlockingQueue<Integer> blockingQueue;

        private volatile boolean flag;

        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            while (!flag) {
                int info;
                try {
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + " consumer " + info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void shutDown() {
            flag = true;
        }
    }

}

LinkedBlockingQueue

LinkedBlockingQueue隊列是一個使用鏈表完成的阻塞隊列,鏈表是單向的。

內部用了兩個鎖,takeLock,putLock,添加數據和刪除數據都是並行執行的,當然添加數據和刪除數據的時候只能有1個線程各自執行。


// 容量大小
private final int capacity;

// 元素個數,因為有2個鎖,存在競態條件,使用AtomicInteger
private final AtomicInteger count = new AtomicInteger(0);

// 頭結點
private transient Node<E> head;

// 尾節點
private transient Node<E> last;

// 拿鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 拿鎖的條件對象
private final Condition notEmpty = takeLock.newCondition();

// 放鎖
private final ReentrantLock putLock = new ReentrantLock();

// 放鎖的條件對象
private final Condition notFull = putLock.newCondition();


LinkedBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。

offer與put都是添加元素到queue的尾部, 只不過 put 方法在隊列滿時會進行阻塞, 直到成功; 
而 offer 操作在容量滿時直接返回 false.

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不允許空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 如果容量滿了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程
    try {
        if (count.get() < capacity) { // 再次判斷容量是否已滿,因為可能拿鎖在進行消費數據,沒滿的話繼續執行
            enqueue(node); // 節點添加到鏈表尾部
            c = count.getAndIncrement(); // 元素個數+1
            if (c + 1 < capacity) // 如果容量還沒滿
                notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列裏面加數據了,隊列還沒滿
        }
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其他線程可以調用offer方法
    }
    if (c == 0) // 由於存在放鎖和拿鎖,這裏可能拿鎖一直在消費數據,count會變化。這裏的if條件表示如果隊列中還有1條數據
        signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,可以進行消費
    return c >= 0; // 添加成功返回true,否則返回false
}
put元素是將元素添加到隊列尾部,queue滿時進行await,添加成功後容量還未滿,則進行signal.
代碼的註釋中基本把操作思想都說了, 有幾個註意的地方
當queue滿時, 會調用 notFull.await() 進行等待, 而相應的喚醒的地方有兩處, 一個是 "有線程進行
put/offer 成功後且 (c + 1) < capacity 時", 另一處是 "在線程進行 take/poll 成功 且 
(c == capacity) (PS: 這裏的 c 指的是 在進行 take/poll 之前的容量)"代碼中的 "signalNotEmpty" 這時在原來queue的數量 c (getAndIncrement的返回值是原來的值) 
==0 時對此時在調用 take/poll 方法的線程進行喚醒。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不允許空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程
    try {
        while (count.get() == capacity) { // 如果容量滿了
            notFull.await(); // 阻塞並掛起當前線程
        }
        enqueue(node); // 節點添加到鏈表尾部
        c = count.getAndIncrement(); // 元素個數+1
        if (c + 1 < capacity) // 如果容量還沒滿
            notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列裏面加數據了,隊列還沒滿
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其他線程可以調用put方法
    }
    if (c == 0) // 由於存在放鎖和拿鎖,這裏可能拿鎖一直在消費數據,count會變化。這裏的if條件表示如果隊列中還有1條數據
        signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列裏還有1條數據,可以進行消費
}
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 如果元素個數為0
        return null; // 返回null
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程
    try {
        if (count.get() > 0) { // 判斷隊列裏是否還有數據
            x = dequeue(); // 刪除頭結點
            c = count.getAndDecrement(); // 元素個數-1
            if (c > 1) // 如果隊列裏還有元素
                notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,可以再次消費
        }
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用poll方法
    }
    if (c == capacity) // 由於存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示如果隊列中還可以再插入數據
        signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據
                return x;
}
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count.get() == 0) { // 如果隊列裏已經沒有元素了
            notEmpty.await(); // 阻塞並掛起當前線程
        }
        x = dequeue(); // 刪除頭結點
        c = count.getAndDecrement(); // 元素個數-1
        if (c > 1) // 如果隊列裏還有元素
            notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列裏還有數據,可以再次消費
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用take方法
    }
    if (c == capacity) // 由於存在放鎖和拿鎖,這裏可能放鎖一直在添加數據,count會變化。這裏的if條件表示如果隊列中還可以再插入數據
        signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列裏還能再次添加數據
    return x;
}

poll 與 take 都是獲取頭節點的元素, 唯一的區別是 take在queue為空時進行await, poll
則直接返回
public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖
    try {
        for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判斷是否找到對象
                unlink(p, trail); // 修改節點的鏈接信息,同時調用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2個鎖解鎖
    }
}


ArrayBlockingQueue


ArrayBlockingQueue的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進行並發控制(classic two-condition algorithm)。


ArrayBlockingQueue是一個帶有長度的阻塞隊列,初始化的時候必須要指定隊列長度,且指定長度之後不允許進行修改。


它帶有的屬性如下:

// 存儲隊列元素的數組,是個循環數組
final Object[] items;

// 拿數據的索引,用於take,poll,peek,remove方法
int takeIndex;

// 放數據的索引,用於put,offer,add方法
int putIndex;

// 元素個數
int count;

// 可重入鎖
final ReentrantLock lock;
// notEmpty條件對象,由lock創建
private final Condition notEmpty;
// notFull條件對象,由lock創建
private final Condition notFull;
數據的添加


ArrayBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。


add方法:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

add方法內部調用offer方法如下:

public boolean offer(E e) {
    checkNotNull(e); // 不允許元素為空
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用offer方法的時候只有1個線程
    try {
        if (count == items.length) // 如果隊列已滿
            return false; // 直接返回false,添加失敗
        else {
            insert(e); // 數組沒滿的話調用insert方法
            return true; // 返回true,添加成功
        }
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用offer方法
    }
}


insert方法如下:

private void insert(E x) {
    items[putIndex] = x; // 元素添加到數組裏
    putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0
    ++count; // 元素個數+1
    notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列裏沒有數據,被阻塞。這個時候隊列insert了一條數據,需要調用signal進行通知
}

put方法:

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不允許元素為空
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖,保證調用put方法的時候只有1個線程
    try {
        while (count == items.length) // 如果隊列滿了,阻塞當前線程,並加入到條件對象notFull的等待隊列裏
            notFull.await(); // 線程阻塞並被掛起,同時釋放鎖
        insert(e); // 調用insert方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用put方法
    }
}


ArrayBlockingQueue的添加數據方法有add,put,offer這3個方法,總結如下:


add方法內部調用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true


offer方法如果隊列滿了,返回false,否則返回true


add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列裏的數據才有可能被喚醒。


這3個方法內部都會使用可重入鎖保證原子性。


數據的刪除


ArrayBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。


poll方法:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程
    try {
        return (count == 0) ? null : extract(); // 如果隊列裏沒元素了,返回null,否則調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用poll方法
    }
}

poll方法內部調用extract方法:

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
    items[takeIndex] = null; // 對應取索引上的數據清空
    takeIndex = inc(takeIndex); // 取數據索引+1,當索引滿了變成0
    --count; // 元素個數-1
    notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知
    return x; // 返回元素
}

take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count == 0) // 如果隊列空,阻塞當前線程,並加入到條件對象notEmpty的等待隊列裏
            notEmpty.await(); // 線程阻塞並被掛起,同時釋放鎖
        return extract(); // 調用extract方法
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用take方法
    }
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素
            if (o.equals(items[i])) { // 兩個對象相等的話
                removeAt(i); // 調用removeAt方法
                return true; // 刪除成功,返回true
            }
        }
        return false; // 刪除成功,返回false
    } finally {
        lock.unlock(); // 釋放鎖,讓其他線程可以調用remove方法
    }
}

removeAt方法:

void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { // 如果要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,然後取索引+1即可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { // 如果要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
   --count; // 元素個數-1
   notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿
   ,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知 
}

ArrayBlockingQueue的刪除數據方法有poll,take,remove這3個方法,總結如下:


poll方法對於隊列為空的情況,返回null,否則返回隊列頭部元素。


remove方法取的元素是基於對象的下標值,刪除成功返回true,否則返回false。


poll方法和remove方法不會阻塞線程。


take方法對於隊列為空的情況,會阻塞並掛起當前線程,直到有數據加入到隊列中。


這3個方法內部都會調用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。



阻塞隊列常用的著兩個隊列都即使這樣的,所以就這麽個事情。


參考博文:

http://fangjian0423.github.io/2016/05/10/java-arrayblockingqueue-linkedblockingqueue-analysis/


以後會陸續補上其他的方法。


本文出自 “10093778” 博客,請務必保留此出處http://10103778.blog.51cto.com/10093778/1934068

java 阻塞隊列 LinkedBlockingQueue ArrayBlockingQueue 分析