Queue 佇列
之前學習過了阻塞佇列(BiockingQueue)這裡就不詳細介紹了。
這裡是佇列的結構,他們都實現了Queue這個介面。
阻塞佇列:阻塞佇列與普通佇列的區別在於,當佇列是空的時,從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時,往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素。同樣,試圖往已滿的阻塞佇列中新增新元素的執行緒同樣也會被阻塞,直到其他的執行緒使佇列重新變得空閒起來,如從佇列中移除一個或者多個元素,或者完全清空佇列。
1.ArrayDeque, (陣列雙端佇列)
2.PriorityQueue, (優先順序佇列)
4.DelayQueue, (延期阻塞佇列)(阻塞佇列實現了BlockingQueue介面)
5.ArrayBlockingQueue, (基於陣列的併發阻塞佇列)
6.LinkedBlockingQueue, (基於連結串列的FIFO阻塞佇列)
7.LinkedBlockingDeque, (基於連結串列的FIFO雙端阻塞佇列)
8.PriorityBlockingQueue, (帶優先順序的無界阻塞佇列)
9.SynchronousQueue (併發同步阻塞佇列)
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
今天主要講 ConcurrentLinkedQueue併發佇列
在併發程式設計中我們有時候需要使用執行緒安全的佇列。如果我們要實現一個執行緒安全的佇列有兩種實現方式一種是使用阻塞演算法,另一種是使用非阻塞演算法。使用阻塞演算法的佇列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現,而非阻塞的實現方式則可以使用迴圈CAS的方式來實現,下面我們一起來研究下Doug Lea是如何使用非阻塞的方式來實現執行緒安全佇列ConcurrentLinkedQueue的。
ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部,當我們獲取一個元素時,它會返回佇列頭部的元素。它採用了“wait-free”演算法來實現,該演算法在Michael & Scott演算法上進行了一些修改。
ConcurrentLinkedQueue的類圖如下:
ConcurrentLinkedQueue由head節點和tail節點組成,每個節點(Node)由節點元素(item)和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個next關聯起來,從而組成一張連結串列結構的佇列。
常用的方法 API
offer和poll
offer(E e) :將指定元素插入此佇列的尾部。
poll():獲取並移除此佇列的頭,如果此佇列為空,則返回 null。
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer後,佇列是否空?" + queue.isEmpty()); System.out.println("從佇列中poll:" + queue.poll()); System.out.println("pool後,佇列是否空?" + queue.isEmpty()); }View Code
ConcurrentLinkedQueue中的add() 和 offer() 完全一樣,都是往佇列尾部新增元素
peek():獲取但不移除此佇列的頭;如果此佇列為空,則返回 null
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer後,佇列是否空?" + queue.isEmpty()); System.out.println("從佇列中peek:" + queue.peek()); System.out.println("從佇列中peek:" + queue.peek()); System.out.println("從佇列中peek:" + queue.peek()); System.out.println("pool後,佇列是否空?" + queue.isEmpty()); }View Code
remove(Object o):從佇列中移除指定元素的單個例項(如果存在)
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer後,佇列是否空?" + queue.isEmpty()); System.out.println("從佇列中remove已存在元素 :" + queue.remove("哈哈哈")); System.out.println("從佇列中remove不存在元素:" + queue.remove("123")); System.out.println("remove後,佇列是否空?" + queue.isEmpty()); }View Code
size or isEmpty
size(): 返回此佇列中的元素數量
isEmpty(); 判斷是否有資料(即可認為佇列是否為空)
注意:
如果此佇列包含的元素數大於 Integer.MAX_VALUE,則返回 Integer.MAX_VALUE。
需要小心的是,與大多數 collection 不同,此方法不是 一個固定時間操作。由於這些佇列的非同步特性,確定當前的元素數需要進行一次花費 O(n) 時間的遍歷。
所以在需要判斷佇列是否為空時,儘量不要用 queue.size()>0,而是用 !queue.isEmpty()
比較size()和isEmpty() 效率的示例:
場景:10000個人去飯店吃飯,10張桌子供飯,分別比較size() 和 isEmpty() 的耗時
public class Test01ConcurrentLinkedQueue { public static void main(String[] args) throws InterruptedException { int peopleNum = 10000;//吃飯人數 int tableNum = 10;//飯桌數量 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); CountDownLatch count = new CountDownLatch(tableNum);//計數器 //將吃飯人數放入佇列(吃飯的人進行排隊) for(int i=1;i<=peopleNum;i++){ queue.offer("消費者_" + i); } //執行10個執行緒從佇列取出元素(10個桌子開始供飯) System.out.println("-----------------------------------開飯了-----------------------------------"); long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(tableNum); for(int i=0;i<tableNum;i++) { executorService.submit(new Dinner("00" + (i+1), queue, count)); } //計數器等待,知道佇列為空(所有人吃完) count.await(); long time = System.currentTimeMillis() - start; System.out.println("-----------------------------------所有人已經吃完-----------------------------------"); System.out.println("共耗時:" + time); //停止執行緒池 executorService.shutdown(); } private static class Dinner implements Runnable{ private String name; private ConcurrentLinkedQueue<String> queue; private CountDownLatch count; public Dinner(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count) { this.name = name; this.queue = queue; this.count = count; } @Override public void run() { //while (queue.size() > 0){ while (!queue.isEmpty()){ //從佇列取出一個元素 排隊的人少一個 System.out.println("【" +queue.poll() + "】----已吃完..., 飯桌編號:" + name); } count.countDown();//計數器-1 } } }View Code
執行結果:
使用size耗時:757ms
使用isEmpty耗時:210s
當資料量越大,這種耗時差距越明顯。所以這種判斷用isEmpty 更加合理
contains(Object o) :如果此佇列包含指定元素,則返回 true
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("123"); System.out.println(queue.contains("123")); System.out.println(queue.contains("234")); }View Code
toArray() :返回以恰當順序包含此佇列所有元素的陣列
toArray(T[] a) :返回以恰當順序包含此佇列所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Object[] objects = queue.toArray(); System.out.println(objects[0] + ", " + objects[1]); //將資料儲存到指定陣列 String[] strs = new String[2]; queue.toArray(strs); System.out.println(strs[0] + ", " + strs[1]); }View Code
iterator() :返回在此佇列元素上以恰當順序進行迭代的迭代器
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Iterator<String> iterator = queue.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } }View Code
ConcurrentLinkedQueue文件說明: