高併發第十三彈:J.U.C 佇列 SynchronousQueue.ArrayBlockingQueue.LinkedBlockingQueue.LinkedTransferQueue
因為下一節會說執行緒池,要用執行緒池 那麼執行緒池有個很重要的引數 就是Queue的選擇
常用的佇列其實就兩種:
先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。 後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。
常用queue的分類:
ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。
LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
SynchronousQueue:一個不儲存元素的阻塞佇列。
DealyQueue:一個使用優先順序佇列實現的無界阻塞佇列。
這幾個queue都是
extends AbstractQueue<E> implements BlockingQueue<E> {
AbstractQueue<E>:優先佇列
AbstractQueue是 Java Collections Framework 的成員,是一個基於優先順序堆的極大
AbstractQueue的add,remove,element方法分別基於offer,poll,peek的實現,但是當佇列為null時,丟擲異常,而不是返回false或null。offer,poll,peek,並沒有實現待子類擴充套件。清空,迴圈poll,直到為空。addAll為迴圈遍歷集合元素,add到佇列;
總結:記住這是一個優先佇列即可
BlockingQueue:阻塞佇列
主要應用場景:生產者消費者模型,是執行緒安全的
多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景: 如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。 如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。 這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大讓我們一起來見識下它的常用方法:BlockingQueue的核心方法:放入資料: offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納, 則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒) offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中 加入BlockingQueue,則返回失敗。 put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷 直到BlockingQueue裡面有空間再繼續.獲取資料: poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間, 取不到時返回null; poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內, 佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。 take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到 BlockingQueue有新的資料被加入; drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數), 通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
BlockingQueue提供了四套方法,分別來進行插入、移除、檢查。每套方法在不能立刻執行時都有不同的反應。
- Throws Exceptions :如果不能立即執行就丟擲異常。
- Special Value:如果不能立即執行就返回一個特殊的值。
- Blocks:如果不能立即執行就阻塞
- Times Out:如果不能立即執行就阻塞一段時間,如果過了設定時間還沒有被執行,則返回一個值
所以我們先來介紹以下具體子類
ArrayBlockingQueue :一個由陣列支援的有界佇列初始化時指定容量大小,一旦指定大小就不能再變.
基本結構
顧名思義 這是一個底層由陣列來儲存資料的
/** The queued items */ final Object[] items;
同時使用ReentrantLock 來確保併發安全的
/** Main lock guarding all access */ final ReentrantLock lock;
構造方法
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
構造方法實際是通Lock 來確定公平性的
ArrayBlockingQueue詳解具體方法
2. LinkedBlockingQueue 一個由連結串列結構組成的有界阻塞佇列。
實現了一個內部類
/** * Linked list node class */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
構造方法
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
大小配置可選,如果初始化時指定了大小,那麼它就是有邊界的。不指定就無邊界(最大整型值)。內部實現是連結串列,採用FIFO形式儲存資料。
詳細方法深入理解LinkedBlockingQueue
3.LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。
LinkedBlockingDeque是雙向連結串列實現的雙向併發阻塞佇列。該阻塞佇列同時支援FIFO和FILO兩種操作方式,即可以從佇列的頭和尾同時操作(插入/刪除);並且,該阻塞佇列是支援執行緒安全。
此外,LinkedBlockingDeque還是可選容量的(防止過度膨脹),即可以指定佇列的容量。如果不指定,預設容量大小等於Integer.MAX_VALUE。
其實就多了一個可頭可尾的操作
4. PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
支援優先順序排序,那麼肯定需要排序的 所以 須是實現Comparable介面,佇列通過這個介面的compare方法確定物件的priority。當前和其他物件比較,如果compare方法返回負數,那麼在佇列裡面的優先順序就比較高.優先順序中傳入的實體物件
比較規則:當前物件和其他物件做比較,當前優先順序大就返回-1,優先順序小就返回1
構造方法
add方法
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
比較有趣也就是擴容方法了
/** * Tries to grow array to accommodate at least one more element * (but normally expand by about 50%), giving up (allowing retry) * on contention (which we expect to be rare). Call only while * holding lock. * * @param array the heap array * @param oldCap the length of the array */ private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
其先放開了鎖,然後通過CAS設定allocationSpinLock來判斷哪個執行緒獲得了擴容許可權,如果沒搶到許可權就會讓出CPU使用權。最後還是要鎖住開始真正的擴容。擴容許可權爭取到了就是計算大小,分配陣列。暫不肯定為什麼這麼麻煩要分配陣列的時候釋放鎖,暫猜測這樣做效率會更高。
測試類
public class ObjectBean implements Comparable<ObjectBean> { private String name; private Integer age; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "ObjectBean [name=" + name + ", age=" + age + "]"; } @Override public int compareTo(ObjectBean o) { return this.age.compareTo(o.getAge()); }