ArrayBlockingQueue原始碼解析(1)
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
注意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock原始碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock原始碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結》
1、對於ArrayBlockingQueue需要掌握以下幾點
建立
入隊(新增元素)
出隊(刪除元素)
2、建立
public ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity)
使用方法:
Queue<String> abq = new ArrayBlockingQueue<String>(2);
Queue<String> abq = new ArrayBlockingQueue<String>(2,true);
通過使用方法,可以看出ArrayBlockingQueue支援ReentrantLock的公平鎖模式與非公平鎖模式,對於這兩種模式,檢視本文開頭的文章即可。
原始碼如下:
private final E[] items;//底層資料結構 private int takeIndex;//用來為下一個take/poll/remove的索引(出隊) private int putIndex;//用來為下一個put/offer/add的索引(入隊) private int count;//佇列中元素的個數 /* * Concurrency control uses the classic two-condition algorithm found in any * textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock;//鎖 /** Condition for waiting takes */ private final Condition notEmpty;//等待出隊的條件 /** Condition for waiting puts */ private final Condition notFull;//等待入隊的條件
/** * 創造一個佇列,指定佇列容量,指定模式 * @param fair * true:先來的執行緒先操作 * false:順序隨機 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity];//初始化類變數陣列items lock = new ReentrantLock(fair);//初始化類變數鎖lock notEmpty = lock.newCondition();//初始化類變數notEmpty Condition notFull = lock.newCondition();//初始化類變數notFull Condition } /** * 創造一個佇列,指定佇列容量,預設模式為非公平模式 * @param capacity <1會拋異常 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
注意:
ArrayBlockingQueue的組成:一個物件陣列+1把鎖ReentrantLock+2個條件Condition
在檢視原始碼的過程中,也要模仿帶條件鎖的使用,這個雙條件鎖模式是很經典的模式
3、入隊
3.1、public boolean offer(E e)
原理:
在隊尾插入一個元素, 如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false
使用方法:
abq.offer("hello1");
原始碼:
/** * 在隊尾插入一個元素, * 如果佇列沒滿,立即返回true; * 如果佇列滿了,立即返回false * 注意:該方法通常優於add(),因為add()失敗直接拋異常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length)//陣列滿了 return false; else {//陣列沒滿 insert(e);//插入一個元素 return true; } } finally { lock.unlock(); } }
private void insert(E x) { items[putIndex] = x;//插入元素 putIndex = inc(putIndex);//putIndex+1 ++count;//元素數量+1 /** * 喚醒一個執行緒 * 如果有任意一個執行緒正在等待這個條件,那麼選中其中的一個區喚醒。 * 在從等待狀態被喚醒之前,被選中的執行緒必須重新獲得鎖 */ notEmpty.signal(); }
/** * i+1,陣列下標+1 */ final int inc(int i) { return (++i == items.length) ? 0 : i; }
程式碼非常簡單,流程看註釋即可,只有一點注意點:
在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的執行緒,可以發現這類似於生產者-消費者模式
3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
原理:
在隊尾插入一個元素,,如果陣列已滿,則進入等待,直到出現以下三種情況:
被喚醒
等待時間超時
當前執行緒被中斷
使用方法:
try { abq.offer("hello2",1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
原始碼:
/** * 在隊尾插入一個元素, * 如果陣列已滿,則進入等待,直到出現以下三種情況: * 1、被喚醒 * 2、等待時間超時 * 3、當前執行緒被中斷 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒 final ReentrantLock lock = this.lock; /* * lockInterruptibly(): * 1、 在當前執行緒沒有被中斷的情況下獲取鎖。 * 2、如果獲取成功,方法結束。 * 3、如果鎖無法獲取,當前執行緒被阻塞,直到下面情況發生: * 1)當前執行緒(被喚醒後)成功獲取鎖 * 2)當前執行緒被其他執行緒中斷 * * lock() * 獲取鎖,如果鎖無法獲取,當前執行緒被阻塞,直到鎖可以獲取並獲取成功為止。 */ lock.lockInterruptibly();//加可中斷的鎖 try { for (;;) { if (count != items.length) {//佇列未滿 insert(e); return true; } if (nanos <= 0)//已超時 return false; try { /* * 進行等待: * 在這個過程中可能發生三件事: * 1、被喚醒-->繼續當前這個for(;;)迴圈 * 2、超時-->繼續當前這個for(;;)迴圈 * 3、被中斷-->之後直接執行catch部分的程式碼 */ nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,執行緒也可能被喚醒) } catch (InterruptedException ie) {//在等待的過程中執行緒被中斷 notFull.signal(); // 喚醒其他未被中斷的執行緒 throw ie; } } } finally { lock.unlock(); } }
注意:
awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去檢視AQS的原始碼。
lockInterruptibly()與lock()的區別見註釋
免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 Google guava cache原始碼解析1--構建快取器(1)
【推薦】 利用jstack定位典型效能問題例項