Java源碼解析——集合框架(二)——ArrayBlockingQueue
ArrayBlockingQueue源碼解析
ArrayBlockingQueue是一個阻塞式的隊列,繼承自AbstractBlockingQueue,間接的實現了Queue接口和Collection接口。底層以數組的形式保存數據(實際上可看作一個循環數組)。常用的操作包括 add ,offer,put,remove,poll,take,peek。
一、類聲明
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable
1)AbstractQueue提供了Queue接口的默認實現。
2)BlockingQueue接口定義了阻塞隊列必須實現的方法。
3)通過實現 java.io.Serializable 接口以啟用其序列化功能。未實現此接口的類將無法使其任何狀態序列化或反序列化。序列化接口沒有方法或字段,僅用於標識可序列化的語義。
二、成員變量
private final E[] items;//底層數據結構 private int takeIndex;//用來為下一個take/poll/remove的索引(出隊) private int putIndex;//用來為下一個put/offer/add的索引(入隊)private int count;//隊列中元素的個數 private final ReentrantLock lock;//鎖 private final Condition notEmpty;//等待出隊的條件 private final Condition notFull;//等待入隊的條件
三、構造方法
ArrayBlockingQueue提供了兩個構造方法:
/** * 創造一個隊列,指定隊列容量,指定模式 * @param fair * true:先來的線程先操作 * false:順序隨機 */ public ArrayBlockingQueue(int capacity, booleanfair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity];//初始化類變量數組items lock = new ReentrantLock(fair);//初始化類變量鎖lock notEmpty = lock.newCondition();//初始化類變量notEmpty Condition notFull = lock.newCondition();//初始化類變量notFull Condition } /** * 創造一個隊列,指定隊列容量,默認模式為非公平模式 * @param capacity <1會拋異常 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
ArrayBlockingQueue的組成:一個對象數組+1把鎖ReentrantLock+2個條件Condition
三、成員方法
- 入隊方法
ArrayBlockingQueue的添加數據方法有add,put,offer這3個方法,總結如下:
add方法內部調用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true
offer方法如果隊列滿了,返回false,否則返回true
add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列裏的數據才有可能被喚醒。
這3個方法內部都會使用可重入鎖保證原子性。
1)add方法:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
2)offer方法:
在隊尾插入一個元素, 如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false。因為使用的是ReentrantLock重入鎖,所以需要顯式地加鎖和釋放鎖。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length)//數組滿了 return false; else {//數組沒滿 insert(e);//插入一個元素 return true; } } finally { lock.unlock(); } }
在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的線程。
/** * 在隊尾插入一個元素,並設置了超時等待的時間 * 如果數組已滿,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前線程被中斷 */ public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒 final ReentrantLock lock = this.lock; /* * lockInterruptibly(): * 1、 在當前線程沒有被中斷的情況下獲取鎖。 * 2、如果獲取成功,方法結束。 * 3、如果鎖無法獲取,當前線程被阻塞,直到下面情況發生: * 1)當前線程(被喚醒後)成功獲取鎖 * 2)當前線程被其他線程中斷 * * lock() * 獲取鎖,如果鎖無法獲取,當前線程被阻塞,直到鎖可以獲取並獲取成功為止。 */ lock.lockInterruptibly();//加可中斷的鎖 try { for (;;) { if (count != items.length) {//隊列未滿 insert(e); return true; } if (nanos <= 0)//已超時 return false; try { /* * 進行等待: * 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個for(;;)循環 * 2、超時-->繼續當前這個for(;;)循環 * 3、被中斷-->之後直接執行catch部分的代碼 */ nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,線程也可能被喚醒) } catch (InterruptedException ie) {//在等待的過程中線程被中斷 notFull.signal(); // 喚醒其他未被中斷的線程 throw ie; } } } finally { lock.unlock(); } }
無論是第一個offer方法還是第二個offer方法都調用了insert方法,insert方法的步驟是首先添加元素,然後利用inc函數進行索引的添加,最後會喚醒因為隊列中沒有數據而等待被阻塞的獲取數據的方法。
private void insert(E x) { items[putIndex] = x; // 元素添加到數組裏 putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0 ++count; // 元素個數+1 notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列裏沒有數據,被阻塞。這個時候隊列insert了一條數據,需要調用signal進行通知 }
其中inc函數來改變索引的增加:
final int inc(int i) { return (++i == items.length) ? 0 : I; }
3)put方法
/** * 在隊尾插入一個元素 * 如果隊列滿了,一直阻塞,直到數組不滿了或者線程被中斷 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length)//隊列滿了,一直阻塞在這裏 /* * 一直等待條件notFull,即被其他線程喚醒 * (喚醒其實就是,有線程將一個元素出隊了,然後調用notFull.signal()喚醒其他等待這個條件的線程,同時隊列也不慢了) */ notFull.await(); } catch (InterruptedException ie) {//如果被中斷 notFull.signal(); // 喚醒其他等待該條件(notFull,即入隊)的線程 throw ie; } insert(e); } finally { lock.unlock(); } }
- 出隊方法
ArrayBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。
ArrayBlockingQueue的刪除數據方法有poll,take,remove這3個方法,總結如下:
poll方法對於隊列為空的情況,返回null,否則返回隊列頭部元素。
remove方法取的元素是基於對象的下標值,刪除成功返回true,否則返回false。
poll方法和remove方法不會阻塞線程。
take方法對於隊列為空的情況,會阻塞並掛起當前線程,直到有數據加入到隊列中。
這3個方法內部都會調用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。
1)poll方法
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程 try { return (count == 0) ? null : extract(); // 如果隊列裏沒元素了,返回null,否則調用extract方法 } finally { lock.unlock(); // 釋放鎖,讓其他線程可以調用poll方法 } }
poll方法內部調用extract方法:
private E extract() { final E[] items = this.items; E x = items[takeIndex];//獲取出隊元素 items[takeIndex] = null;//將出隊元素位置置空 /* * 第一次出隊的元素takeIndex==0,第二次出隊的元素takeIndex==1 * (註意:這裏出隊之後,並沒有將後面的數組元素向前移) */ takeIndex = inc(takeIndex); --count;//數組元素個數-1 notFull.signal();//數組已經不滿了,喚醒其他等待notFull條件的線程 return x;//返回出隊的元素 }
同樣地notfull標誌表示數組已經不滿,可以執行被阻塞的入隊操作。
2)take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程 try { while (count == 0) // 如果隊列空,阻塞當前線程,並加入到條件對象notEmpty的等待隊列裏 notEmpty.await(); // 線程阻塞並被掛起,同時釋放鎖 return extract(); // 調用extract方法 } finally { lock.unlock(); // 釋放鎖,讓其他線程可以調用take方法 } }
3)remove方法
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程 try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素 if (o.equals(items[i])) { // 兩個對象相等的話 removeAt(i); // 調用removeAt方法 return true; // 刪除成功,返回true } } return false; // 刪除成功,返回false } finally { lock.unlock(); // 釋放鎖,讓其他線程可以調用remove方法 } }
以及
void removeAt(int i) { final Object[] items = this.items; if (i == takeIndex) { // 如果要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,然後取索引+1即可 items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // 如果要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值 for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; // 元素個數-1 notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知 }
Java源碼解析——集合框架(二)——ArrayBlockingQueue