BlockingQueue(阻塞佇列)詳解
注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。
一. 前言
在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
二. 認識BlockingQueue
阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。
多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒),
如上圖所示:當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。
如上圖所示:當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。
這也是我們在多執行緒環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞執行緒,什麼時候需要喚醒執行緒,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
三. BlockingQueue的核心方法:
1.放入資料
(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執行方法
的執行緒);
(2)offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往佇列中加入BlockingQueue,則返回失敗。
(3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
2. 獲取資料
(1)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null;
(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間
超時還沒有資料可取,返回失敗。
(3)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入;
(4)drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。
四. 常見BlockingQueue
在瞭解了BlockingQueue的基本功能後,讓我們來看看BlockingQueue家庭大致有哪些成員?
1. ArrayBlockingQueue
基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長陣列,以便快取佇列中的資料物件,這是一個常用的阻塞佇列,除了一個定長陣列外,ArrayBlockingQueue內部還儲存著兩個整形變數,分別標識著佇列的頭部和尾部在陣列中的位置。
ArrayBlockingQueue在生產者放入資料和消費者獲取資料,都是共用同一個鎖物件,由此也意味著兩者無法真正並行執行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以採用分離鎖,從而實現生產者和消費者操作的完全並行執行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給程式碼帶來額外的複雜性外,其在效能上完全佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而後者則會生成一個額外的Node物件。這在長時間內需要高效併發地處理大批量資料的系統中,其對於GC的影響還是存在一定的區別。而在建立ArrayBlockingQueue時,我們還可以控制物件的內部鎖是否採用公平鎖,預設採用非公平鎖。
2.LinkedBlockingQueue
基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。
下面的程式碼演示瞭如何使用BlockingQueue:
(1) 測試類
1 import java.util.concurrent.BlockingQueue; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.LinkedBlockingQueue; 5 6 public class BlockingQueueTest { 7 8 public static void main(String[] args) throws InterruptedException { 9 // 宣告一個容量為10的快取佇列 10 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); 11 12 //new了三個生產者和一個消費者 13 Producer producer1 = new Producer(queue); 14 Producer producer2 = new Producer(queue); 15 Producer producer3 = new Producer(queue); 16 Consumer consumer = new Consumer(queue); 17 18 // 藉助Executors 19 ExecutorService service = Executors.newCachedThreadPool(); 20 // 啟動執行緒 21 service.execute(producer1); 22 service.execute(producer2); 23 service.execute(producer3); 24 service.execute(consumer); 25 26 // 執行10s 27 Thread.sleep(10 * 1000); 28 producer1.stop(); 29 producer2.stop(); 30 producer3.stop(); 31 32 Thread.sleep(2000); 33 // 退出Executor 34 service.shutdown(); 35 } 36 }
(2)生產者類
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.atomic.AtomicInteger; 5 6 /** 7 * 生產者執行緒 8 * 9 * @author jackyuj 10 */ 11 public class Producer implements Runnable { 12 13 private volatile boolean isRunning = true;//是否在執行標誌 14 private BlockingQueue queue;//阻塞佇列 15 private static AtomicInteger count = new AtomicInteger();//自動更新的值 16 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 17 18 //建構函式 19 public Producer(BlockingQueue queue) { 20 this.queue = queue; 21 } 22 23 public void run() { 24 String data = null; 25 Random r = new Random(); 26 27 System.out.println("啟動生產者執行緒!"); 28 try { 29 while (isRunning) { 30 System.out.println("正在生產資料..."); 31 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數 32 33 data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1 34 System.out.println("將資料:" + data + "放入佇列..."); 35 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間為2s,如果超過2s還沒加進去返回true 36 System.out.println("放入資料失敗:" + data); 37 } 38 } 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 Thread.currentThread().interrupt(); 42 } finally { 43 System.out.println("退出生產者執行緒!"); 44 } 45 } 46 47 public void stop() { 48 isRunning = false; 49 } 50 }
(3)消費者類
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 5 /** 6 * 消費者執行緒 7 * 8 * @author jackyuj 9 */ 10 public class Consumer implements Runnable { 11 12 private BlockingQueue<String> queue; 13 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 14 15 //建構函式 16 public Consumer(BlockingQueue<String> queue) { 17 this.queue = queue; 18 } 19 20 public void run() { 21 System.out.println("啟動消費者執行緒!"); 22 Random r = new Random(); 23 boolean isRunning = true; 24 try { 25 while (isRunning) { 26 System.out.println("正從佇列獲取資料..."); 27 String data = queue.poll(2, TimeUnit.SECONDS);//有資料時直接從佇列的隊首取走,無資料時阻塞,在2s內有資料,取走,超過2s還沒資料,返回失敗 28 if (null != data) { 29 System.out.println("拿到資料:" + data); 30 System.out.println("正在消費資料:" + data); 31 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 32 } else { 33 // 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。 34 isRunning = false; 35 } 36 } 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 Thread.currentThread().interrupt(); 40 } finally { 41 System.out.println("退出消費者執行緒!"); 42 } 43 } 44 45 46 }
3. DelayQueue
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
使用場景:
DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連線佇列。
4. PriorityBlockingQueue
基於優先順序的阻塞佇列(優先順序的判斷通過建構函式傳入的Compator物件來決定),但需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實現PriorityBlockingQueue時,內部控制執行緒同步的鎖採用的是公平鎖。
5. SynchronousQueue
一種無緩衝的等待佇列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那麼對不起,大家都在集市等待。相對於有緩衝的BlockingQueue來說,少了一箇中間經銷商的環節(緩衝區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說採用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應效能可能會降低。
宣告一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
如果採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO佇列來阻塞多餘的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue預設):SynchronousQueue採用非公平鎖,同時配合一個LIFO佇列來管理多餘的生產者和消費者,而後一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的資料永遠都得不到處理。
五. 小結
BlockingQueue不光實現了一個完整佇列所具有的基本功能,同時在多執行緒環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程式設計師可以忽略這些細節,關注更高階的功能。
相關推薦
Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者
【轉載】BlockingQueue(阻塞佇列)詳解
注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資
BlockingQueue(阻塞佇列)詳解
注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高
BlockingQueue(阻塞佇列)詳解(一個生產者消費者的例項)
注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。一. 前言 在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳
生產者和消費者之BlockingQueue(阻塞佇列)詳解
注意:該隨筆內容完全引自http://wsmajunfeng.iteye.com/blog/1629354,寫的很好,非常感謝,複製過來算是個積累,怕以後找不到。 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何
Java中 BlockingQueue(阻塞佇列)的使用
引言 Queue介面與List、Set同一級別,都是繼承了Collection介面。LinkedList實現了Queue接 口。Queue介面窄化了對LinkedList的方法的訪問許可權(即在方法中的引數型別如果是Queue時,就完全只能訪問Queue介面所
VMware虛擬機文件(後綴)詳解
blank mdk 記錄 tom 而不是 我們 sun 編輯 right VMware虛擬機文件(後綴)詳解 虛擬機的文件管理由VMware Workstation來執行,一個虛擬機一般以一系列文件的形式儲存在宿主機中,這些文件一般在由workstation為虛
#26 Linux kernel(內核)詳解與uname、lsmod、modinfo、depmod、insmod、rmmod、modprobe...命令用法
linux kernel(內核)詳解與uname、lsmod、modinfo、depmod、insmod、rmmod、modprobe...命令用法Linux kernel: 內核設計流派: 單內核設計,但是充分借鑒了微內核體系設計的優點,為內核引入了模塊化機制,內核高度模塊化; 內核被模塊化之
C++11Mutex(互斥鎖)詳解
AR c++ 條件 oid 簡單 但是 資源 void AD 多個線程訪問同一資源時,為了保證數據的一致性,最簡單的方式就是使用 mutex(互斥鎖)。 (1).直接操作 mutex,即直接調用 mutex 的 lock / unlock 函數。此例順帶使用了 boost:
java 內部類(inner class)詳解
ron isp https nerd 對象 重寫 prot print 元素 優點 ⒈ 內部類對象可以訪問創建它的對象的實現,包括私有數據; ⒉ 內部類不為同一包的其他類所見,具有很好的封裝性; ⒊ 使用內部類可以很方便的編寫事件驅動程序; ⒋ 匿名內部類可以方便的定義運行
調用高德地圖API(熱力圖)詳解
ocs use map asc type contain maps key script 具體腳本語言如下: <!doctype html> <html> <head> <meta charset="utf-8">
Android 學習之《第一行程式碼》第二版 筆記(十一)詳解廣播機制(一)
一、廣播機制簡介 1. 四大元件之一 2. Android 提供了一套完整的API,允許應用程式自由地傳送和接收廣播。 A. 傳送廣播藉助Intent B. 接收廣播藉助廣播接收器(Broadcast Receiver) 3. 廣播型別: A. 標準廣播: 完全非同步執行
Android 學習之《第一行程式碼》第二版 筆記(十二)詳解廣播機制(二)
廣播的最佳實踐——實現強制下線功能 思路:在介面上彈出一個對話方塊,讓使用者無法進行任何操作,必須點選對話方塊中的確定按鈕,然後回到登入介面即可。 一、效果圖 1. 登入介面並輸入賬號密碼 2. 登陸後的介面 3. 強制下線 4. 退回登陸的介面
蘋果個人公司型別開發者賬號申請(99美元)詳解
談到蘋果開發者賬號,我們需要區分一下個人賬號、公司賬號和企業賬號這三種,還有一種是教育賬號,這個就不多說了。 個人賬號:個人申請用於開發蘋果app所使用的賬號,僅限於個人使用,申請比較容易,$99。 公司賬號:以公司的名義申請的開發者賬
Android VCard聯絡人備份恢復(匯入/匯出)詳解
原文地址為: Android VCard聯絡人備份恢復(匯入/匯出)詳解 首先我們簡單的看下在Android中聯絡人的儲存結構. 工作環境:android 2.3.3聯絡人的主要資料存放在raw_contacts和data表裡,它兩構成主從表關係。 raw_contacts表結構
替換空格(空白字元)詳解
替換空格(空白字元)詳解 import java.util.regex.Matcher; import java.util.regex.Pattern; public class RegexpTest { private static String str2; /*
二叉樹(Binary Tree)詳解
二叉樹本身就是遞迴定義的(wikipedia): To actually define a binary tree in general, we must allow for the possibility that only one of the children may be empty.
自動編碼器(Autoencoder)、降噪自動編碼器(Denoising Autoencoder)詳解
在瞭解降噪自動編碼器之前,我們先了解一下自動編碼器。 自動編碼器(Autoencoder): 自動編碼器和PCA等方法都屬於降維方法。PCA降維方法有著一定侷限性,主要是隻對線性可分的資料降維效果較好。這種情況下,人們希望提出一種新的簡單的、自動的、可以對非線性可分資料進行的特徵提取方法
Bootm(cmd_bootm.c)詳解
一、在開始之前先說明一下bootm相關的東西。 1、首先說明一下,S3C2410架構下的bootm只對sdram中的核心映象檔案進行操作(好像AT91架構提供了一段從flash複製核心映象的程式碼,不過針對s3c2410架構就沒有這段程式碼,雖然可以在u-boot下
曼哈頓距離(Manhattan Distance )詳解
概念 曼哈頓距離——兩點在南北方向上的距離加上在東西方向上的距離,即d(i,j)=|xi-xj|+|yi-yj|。對於一個具有正南正北、正東正西方向規則佈局的城鎮街道,從一點到達另一點的距離正是在南北方向上旅行的距離加上在東西方向上旅行的距離,因此,曼哈頓距離又稱為計程車距離。 ——引用自百度&nb