1. 程式人生 > >掌握 java阻塞隊(ArrayBlockingQueue與LinkedBlockingQueue)

掌握 java阻塞隊(ArrayBlockingQueue與LinkedBlockingQueue)

在java開發中有些特殊場景下適用於阻塞佇列如:

多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒) 

家族成員一共有下面幾位:

  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

而在實際專案中經常使用ArrayBlockingQueue,和LinkedBlockingQueue

下面對這兩位進行剖析和介紹

方法表:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

原始碼註解:

public interface BlockingQueue<E> extends Queue<E> {

    //將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量)
    //在成功時返回 true,如果此佇列已滿,則拋IllegalStateException。 
    boolean add(E e); 

    //將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量) 
    // 將指定的元素插入此佇列的尾部,如果該佇列已滿, 
    //則在到達指定的等待時間之前等待可用的空間,該方法可中斷 
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 

    //將指定的元素插入此佇列的尾部,如果該佇列已滿,則一直等到(阻塞)。 
    void put(E e) throws InterruptedException; 

    //獲取並移除此佇列的頭部,如果沒有元素則等待(阻塞), 
    //直到有元素將喚醒等待執行緒執行該操作 
    E take() throws InterruptedException; 

    //獲取並移除此佇列的頭部,在指定的等待時間前一直等到獲取元素, //超過時間方法將結束
    E poll(long timeout, TimeUnit unit) throws InterruptedException; 

    //從此佇列中移除指定元素的單個例項(如果存在)。 
    boolean remove(Object o); 
}

    //除了上述方法還有繼承自Queue介面的方法 
    //獲取但不移除此佇列的頭元素,沒有則跑異常NoSuchElementException 
    E element(); 

    //獲取但不移除此佇列的頭;如果此佇列為空,則返回 null。 
    E peek(); 

    //獲取並移除此佇列的頭,如果此佇列為空,則返回 null。 
    E poll();

分類一下:

這裡我們把上述操作進行分類

插入方法:

add(E e) : 新增成功返回true,失敗拋IllegalStateException異常 offer(E e) : 成功返回 true,如果此佇列已滿,則返回 false。 put(E e) :將元素插入此佇列的尾部,如果該佇列已滿,則一直阻塞 刪除方法:

remove(Object o) :移除指定元素,成功返回true,失敗返回false poll() : 獲取並移除此佇列的頭元素,若佇列為空,則返回 null take():獲取並移除此佇列頭元素,若沒有元素則一直阻塞。 檢查方法

element() :獲取但不移除此佇列的頭元素,沒有元素則拋異常 peek() :獲取但不移除此佇列的頭;若佇列為空,則返回 null。

  • 常用put()和take()方法

ArrayBlockingQueue :

一個用陣列實現的有界阻塞佇列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為新增和刪除的阻塞方法,下面我們通過ArrayBlockingQueue佇列實現一個生產者消費者的案例,通過該案例簡單瞭解其使用方式:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by wuzejian on 2017/8/13
 */
public class ArrayBlockingQueueDemo {
    private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1);
    public static void main(String[] args){
        new Thread(new Producer(queue)).start();
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

 class Apple {
    public Apple(){
    }
 }

/**
 * 生產者執行緒
 */
class Producer implements Runnable{
    private final ArrayBlockingQueue<Apple> mAbq;
    Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
        this.mAbq = arrayBlockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            Produce();
        }
    }

    private void Produce(){
        try {
            Apple apple = new Apple();
            mAbq.put(apple);
            System.out.println("生產:"+apple);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消費者執行緒
 */
class Consumer implements Runnable{

    private ArrayBlockingQueue<Apple> mAbq;
    Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
        this.mAbq = arrayBlockingQueue;
    }

    @Override
    public void run() {
        while (true){
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
                comsume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void comsume() throws InterruptedException {
        Apple apple = mAbq.take();
        System.out.println("消費Apple="+apple);
    }
}

執行結果

有興趣的小夥伴可以試試將1改為2或者更多的效果;

拓展:

有點需要注意的是ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的,所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,即先阻塞的執行緒先訪問佇列。而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞佇列程式碼如下:  

//預設非公平阻塞佇列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
//公平阻塞佇列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);

//構造方法原始碼
public ArrayBlockingQueue(int capacity) {
     this(capacity, false);
 }

public ArrayBlockingQueue(int capacity, boolean fair) {
     if (capacity <= 0)
         throw new IllegalArgumentException();
     this.items = new Object[capacity];
     lock = new ReentrantLock(fair);
     notEmpty = lock.newCondition();
     notFull =  lock.newCondition();
 }

可以看出來,ArrayBlockingQueue阻塞佇列的預設模式是非公平阻塞佇列

再補充一些實用方法:

//自動移除此佇列中的所有元素。
void clear() 

//如果此佇列包含指定的元素,則返回 true。          
boolean contains(Object o) 

//移除此佇列中所有可用的元素,並將它們新增到給定collection中。           
int drainTo(Collection<? super E> c) 

//最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定collection 中。       
int drainTo(Collection<? super E> c, int maxElements) 

//返回在此佇列中的元素上按適當順序進行迭代的迭代器。         
Iterator<E> iterator() 

//返回佇列還能新增元素的數量
int remainingCapacity() 

//返回此佇列中元素的數量。      
int size() 

//返回一個按適當順序包含此佇列中所有元素的陣列。
Object[] toArray() 

//返回一個按適當順序包含此佇列中所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。      
<T> T[] toArray(T[] a)

LinkedBlockingQueue: LinkedBlockingQueue是一個基於連結串列的阻塞佇列,其內部維持一個基於連結串列的資料佇列,實際上我們對LinkedBlockingQueue的API操作都是間接操作該資料佇列,並且每個新增到LinkedBlockingQueue佇列中的資料都將被封裝成Node節點,新增的連結串列佇列中,其中head和last分別指向佇列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對併發進行控制,也就是說,新增和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。這裡再次強調如果沒有給LinkedBlockingQueue指定容量大小,其預設值將是Integer.MAX_VALUE,如果存在新增速度大於刪除速度時候,有可能會記憶體溢位,這點在使用前希望慎重考慮。至於LinkedBlockingQueue的實現原理圖與ArrayBlockingQueue是類似的,除了對新增和移除方法使用單獨的鎖控制外,兩者都使用了不同的Condition條件物件作為等待佇列,用於掛起take執行緒和put執行緒。 其實可以看出來在一般情況下資料量不大的情況下使用LinkedBlockingQueue或者ArrayBlokingQueue的大同小異

但是在資料量達到一定量後使用LinkedBlockingQueue效率更高,因為其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能

總結:

LinkedBlockingQueue和ArrayBlockingQueue區別

1.佇列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當新增速度大於移除速度時,在無界的情況下,可能會造成記憶體溢位等問題。

2.資料儲存容器不同,ArrayBlockingQueue採用的是陣列作為資料儲存容器,而LinkedBlockingQueue採用的則是以Node節點作為連線物件的連結串列。

3.由於ArrayBlockingQueue採用的是陣列的儲存容器,因此在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而LinkedBlockingQueue則會生成一個額外的Node物件。這可能在長時間內需要高效併發地處理大批量資料的時,對於GC可能存在較大影響。

4.兩者的實現佇列新增或移除的鎖不一樣,ArrayBlockingQueue實現的佇列中的鎖是沒有分離的,即新增操作和移除操作採用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的佇列中的鎖是分離的,其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。