Java裏的阻塞隊列
JDK7提供了7個阻塞隊列,如下:
ArrayBlockingQueue : 一個數組結構組成的有界阻塞隊列。
LinkedBlockingQueue : 一個由鏈表結構組成的有界阻塞隊列 。
PriorityBlockingQueue : 一個支持優先級排序的無界阻塞隊列 。
DelayQueue : 一個使用優先級隊列實現的無界阻塞隊列 。
SynchronousQueue : 一個不存儲元素的阻塞隊列 。
LinkedTransferQueue : 一個由鏈表結構組成的無界阻塞隊列 。
LinkedBlockingDeque : 一個由鏈表結構組成的雙向阻塞隊列 。
下面分別介紹幾個隊列 :
1.ArrayBlockingQueue
ArrayBlockingQueue是一個由數組結構組成的有界阻塞隊列,此隊列按照FIFO的原則對元素進行排序 。
默認情況下不保證線程公平的訪問隊列,所謂公平的訪問隊列是指阻塞的線程,可以按照阻塞的先後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程都可以爭奪隊列的資格,有可能先阻塞的隊列最後才訪問隊列。為了保證公平性通常都會降低吞吐量。下面的代碼可以創建一個公平的阻塞隊列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(100,true);
訪問者的公平性是通過可重入鎖實現的,構造函數如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull= lock.newCondition(); }
ArrayBlockingQueue使用一個Object數組保存數據,一個int類型的count表示當前隊列添加的元素個數,有界保證依靠的兩個Condition對象,下面看一下put()方法,代碼如下:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //判斷隊列是否已滿 while (count == items.length) //如果已經滿了,等待 notFull.await(); //插入 insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex);//加1 ++count; notEmpty.signal(); }
ArrayBlockingQueue在執行put操作時,首先獲取鎖,然後判斷插入隊列是否已滿,如果隊列已滿則等待,否則順利插入,並且執行一次notEmpty.signal()喚醒有可能隊列為空的情況下執行take()操作在等待的線程 ,take()方法代碼如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果隊列為空,則等待元素入隊 while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
take操作也是先判斷隊列是否為空,為空則等待,不為空則返回items[takeIndex] ,並且執行notFull.signal()喚醒可能在等待put操作的線程。
2.LinkedBlockingQueue
Java裏的阻塞隊列