Queue和BlockingQueue的使用以及使用BlockingQueue實現生產者-消費者
Java提供了兩種新的容器型別:Queue和BlockingQueue。
Queue用於儲存一組等待處理的元素。它提供了幾種實現,包括:ConcurrentLinkedQueue,這是一個先進先出的對列,以及PriorityQueue,這是一個非併發的優先佇列。Queue上的操作不會阻塞,如果佇列為空,獲取元素的操作將返回空值。雖然可以用List來模擬一個Queue的行為----事實上正是通過LinkedList來實現Queue的行為的,但還需要一個Queue的類,因為它能去掉List的隨機訪問需求,從而實現更高效的併發。
BlockingQueue擴充套件了Queue,增加了可阻塞的插入和獲取操作。如果佇列為空那麼獲取元素的操作將會一直阻塞,直到隊列出現一個可以可用的元素。如果佇列已滿(對於有界佇列來說),那麼插入元素的操作將一直阻塞,直到佇列中出現可用的元素。在"生產者-消費者"設計模式中,阻塞隊還是非常有用的。
1.Queue的使用
Queue介面與List、Set同一級別,都是繼承了Collection介面。
Queue使用時要儘量避免Collection的add()和remove()方法,而是要使用offer()來加入元素,使用poll()來獲取並移出元素。它們的優點是通過返回值可以判斷成功與否,add()和remove()方法在失敗的時候會丟擲異常。 如果要使用前端而不移出該元素,使用element()或者peek()方法。
值得注意的是LinkedList類實現了Queue介面,因此我們可以把LinkedList當成Queue來用。
Queue的方法也非常簡單,就是三組(一個會丟擲異常,一個返回特殊值):
方法 | 丟擲異常 | 不會丟擲異常 |
插入 | boolean add(E e); | boolean offer(E e); |
移除(返回且移除頭元素) | E remove(); | E poll(); |
檢查(返回頭元素但不刪除) | E element(); | E peek(); |
例如:(poll()返回了null,remove()丟擲異常了)
package cn.qlq.thread.thirteen; importjava.util.LinkedList; import java.util.Queue; public class Demo1 { public static void main(String[] args) { Queue<String> queue = new LinkedList<String>(); String poll = queue.poll(); System.out.println(poll); String remove = queue.remove(); System.out.println(remove); } }
結果:
null
Exception in thread "main" java.util.NoSuchElementException
at java.util.LinkedList.removeFirst(LinkedList.java:268)
at java.util.LinkedList.remove(LinkedList.java:683)
at cn.qlq.thread.thirteen.Demo1.main(Demo1.java:11)
2.BlockingQueue的使用
BlockingQueue繼承Queue介面,位於併發包下,對Queue介面進行了擴充套件。
package java.util.concurrent; import java.util.Collection; import java.util.Queue; public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
阻塞佇列提供了可阻塞的put和take方法,以及支援定時的offer和poll方法。如果佇列已經滿了,那麼put方法將阻塞到有空間可用;如果佇列為空,那麼take方法將會阻塞到有元素可用。佇列可以是有界的,也可以是無界的,無界佇列永遠不會充滿,因此無界佇列的put方法永遠也不會阻塞。(offer方法如果資料項不能新增到佇列中,就會返回一個失敗狀態。這樣就能夠建立更多靈活的策略來處理負荷過載的情況,例如減輕負載,將多餘的工作項序列化並寫入磁碟,減少生產者執行緒的數量,或者通過某種方式來抑制生產者執行緒)
BlockingQueue簡化了生產者-消費者模式的設計過程,消費者不需要知道生產者是誰,生產者也不需要知道生產者是誰;而且支援任意數量的生產者與消費者。一種最常見的生產者-消費者設計模式就是執行緒池與工作佇列的組合,在Executor任務執行框架中就體現了這種模式。
一個經典的例子:以洗盤子為例子,一個人洗完盤子把盤子放在盤架上,另一個人負責從盤架上取出盤子並把他們烘乾。在這個例子中,盤架就相當於一個阻塞佇列。如果盤架上沒有盤子,消費者會一直等待,如果盤架滿了,生產者會一直等待。我們可以將這種類比擴充套件為多個生產者與多個消費者,每個工人只需要與盤架打交道。人們不需要知道誰是生產者誰是消費者。
生產者和消費者的角色是相對的,某種環境下的生產者在另一種不同的環境中可能會變為消費者。比如烘乾盤子的人將"消費"洗乾淨的溼盤子,而產生烘乾的盤子。第三個人把洗乾淨的盤子整理好,在這種情況下,烘乾盤子的人是生產者也是消費者,從而就有了兩個共享的佇列(每個對壘對列可能阻塞烘乾工作的執行)。
JDK中有多個BlockingQueue的實現,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO佇列,二者分別於LinkedList和ArrayList類似,但比同步List擁有更好的同步效能。PriorityBlockingQueue佇列是一個按優先順序排列的佇列,這個佇列可以根據元素的自然順序來比較元素(如果他們實現了Comparable方法),也可以使用Comparator來比較。
還有一個是SynchronousQueue,實際上它不是一個真正的佇列,因為它不會維護佇列中元素的儲存空間,與其他佇列不同的是,它維護一組執行緒,這些執行緒在等待把元素加入或移除佇列。如果以洗盤子為例,那麼久相當於沒有盤架而是直接將洗好的盤子放入下一個空閒的烘乾機中。這種方式看似很奇怪,由於可以直接交付工作降低了將資料從生產者移到消費者的延遲。因為SynchronousQueue沒有儲存功能,因此put和take會一直阻塞,直到有另一個執行緒準備好參與到交付過程,僅當有足夠多的消費者,並且總是有一個消費者準備獲取交付工作時,才適合使用同步佇列。
BlockingQueue中的方法:
BlockingQueue既然是Queue的子介面,必然有Queue中的方法,上面已經列了。看一下BlockingQueue中特有的方法:
(1)void put(E e) throws InterruptedException
把e新增進BlockingQueue中,如果BlockingQueue中沒有空間,則呼叫執行緒被阻塞,進入等待狀態,直到BlockingQueue中有空間再繼續
(2)void take() throws InterruptedException
取走BlockingQueue裡面排在首位的物件,如果BlockingQueue為空,則呼叫執行緒被阻塞,進入等待狀態,直到BlockingQueue有新的資料被加入
(3)int drainTo(Collection<? super E> c, int maxElements)
一次性取走BlockingQueue中的資料到c中,可以指定取的個數。通過該方法可以提升獲取資料效率,不需要多次分批加鎖或釋放鎖
2.1 ArrayBlockingQueue的簡單使用
基於陣列的阻塞佇列,必須指定佇列大小。比較簡單。ArrayBlockingQueue中只有一個ReentrantLock物件,這意味著生產者和消費者無法並行執行。建立ArrayBlockingQueue可以指定鎖的公平性,預設是非公平鎖,如下原始碼:
final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
例如:基於ArrayBlockingQueue的單生產單消費模式:
package cn.qlq.thread.thirteen; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Demo2 { private static int num ; private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class); public static void main(String[] args) throws InterruptedException { final BlockingQueue<String> strings = new ArrayBlockingQueue<>(1);//必須指定容量(指定容器最多為1) Thread producer = new Thread(new Runnable() { @Override public void run() { try { for (int i=0;i<5;i++) { String ele = "ele"+(++num); strings.put(ele); LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele); } } catch (InterruptedException e) { e.printStackTrace(); } } },"producer"); producer.start(); Thread consumer = new Thread(new Runnable() { @Override public void run() { try { for (int i=0;i<5;i++) { Thread.sleep(1*1000); String take = strings.take(); LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take); } } catch (InterruptedException e) { e.printStackTrace(); } } },"consumer"); consumer.start(); } }
結果:(可以看到生產者放進元素之後會等元素被拿走之後才會繼續生成元素)
11:00:04 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele1
11:00:05 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele2
11:00:06 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele3
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele4
11:00:07 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele3
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->producer put ele->ele5
11:00:08 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele4
11:00:09 [cn.qlq.thread.thirteen.Demo2]-[INFO] ThreadName ->consumer take ele->ele5
2.2 LinkedBlockingQueue 簡單使用
類似於LinkedList,基於連結串列的阻塞佇列。此佇列如果不指定容量大小,預設採用Integer.MAX_VALUE(可以理解為無限佇列)。此外LinkedBlockingList有兩個鎖,意味著生產者和消費者都有自己的鎖。如下原始碼:
private transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
例如:基於LinkedBlockingQueue的多生產多消費模式:
package cn.qlq.thread.thirteen; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Demo3 { private static int num ; private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class); public static void main(String[] args) throws InterruptedException { final BlockingQueue<String> strings = new LinkedBlockingQueue<>(3); Runnable producerRun = new Runnable() { @Override public synchronized void run() {//加同步避免出現執行緒非安全 try { for (int i=0;i<5;i++) { Thread.sleep(1000); String ele = "ele"+(++num); strings.put(ele); LOGGER.info("ThreadName ->{} put ele->{}",Thread.currentThread().getName(),ele); } } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread producer = new Thread(producerRun,"producer"); producer.start(); Thread producer2 = new Thread(producerRun,"producer2"); producer2.start(); Runnable consumerRun = new Runnable() { @Override public void run() { try { for (int i=0;i<5;i++) { Thread.sleep(3000); String take = strings.take(); LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take); } } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread consumer = new Thread(consumerRun,"consumer"); Thread consumer1 = new Thread(consumerRun,"consumer1"); consumer.start(); consumer1.start(); } }
結果:
11:46:47 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele1
11:46:48 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele2
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele1
11:46:49 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele3
11:46:50 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele4
11:46:51 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer put ele->ele5
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele3
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele4
11:46:52 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele6
11:46:53 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele7
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele8
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele6
11:46:55 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele5
11:46:56 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele9
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele7
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele8
11:46:58 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->producer2 put ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer take ele->ele10
11:47:01 [cn.qlq.thread.thirteen.Demo3]-[INFO] ThreadName ->consumer1 take ele->ele9
2.3 PriorityBlockingQueue簡單使用
PriorityBlockingQueue 是一個按優先順序排列的阻塞佇列,類似於TreeSet,看到tree,可以按順序進行排列,就要想到兩個介面。Comparable(集合中元素實現這個介面,元素自身具備可比性),Comparator(比較器,傳入容器構造方法中,容器具備可比性)。
其內部只有一個Lock,所以生產消費者不能同時作業,而且預設的容量是11,其構造方法也可以傳入一個比較器,如下原始碼:
/** * Default array capacity. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); }
測試按年齡逆序排列:
package cn.qlq.thread.thirteen; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; public class Demo4 { public static void main(String[] args) throws InterruptedException { BlockingQueue<Person> persons = new PriorityBlockingQueue<Person>(3); persons.put(new Person(20,"張三")); persons.put(new Person(22,"李四")); persons.put(new Person(21,"王五")); persons.put(new Person(18,"八卦")); System.out.println(persons.take()); System.out.println(persons.take()); System.out.println(persons.take()); System.out.println(persons.take()); } } class Person implements Comparable<Person>{ private int age; private String name; public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "Person [age=" + age + ", name=" + name + "]"; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Person(int age, String name) { super(); this.age = age; this.name = name; } @Override public int compareTo(Person o) {//返回-1表示排在他前面,返回1表示排在他前面 if(o.getAge() > this.getAge() ){ return 1; }else if(o.getAge() < this.getAge()){ return -1; } return 0; } }
結果:
Person [age=22, name=李四]
Person [age=21, name=王五]
Person [age=20, name=張三]
Person [age=18, name=八卦]
2.4 SynchronousQueue簡單使用
前面已經介紹了,SynchronousQueue實際上它不是一個真正的佇列,因為它不會維護佇列中元素的儲存空間,與其他佇列不同的是,它維護一組執行緒,這些執行緒在等待把元素加入或移除佇列。適用於生產者少消費者多的情況。
例如:
ArrayBlockingQueue有一個數組儲存佇列元素:
/** The queued items */ final Object[] items;
LinedBlockingQueue有一個內部Node類儲存元素:
/** * Linked list node class */ static class Node<E> { E item;
Node<E> next; Node(E x) { item = x; } }
PriorityBlockingQueue有一個數組用於儲存元素
private transient Object[] queue;
可以這麼理解,SynchronousQueue是生產者直接把資料給消費者(消費者直接從生產者這裡拿資料)。換句話說,每一個插入操作必須等待一個執行緒對應的移除操作。SynchronousQueue又有兩種模式:
1、公平模式
採用公平鎖,並配合一個FIFO佇列(Queue)來管理多餘的生產者和消費者
2、非公平模式
採用非公平鎖,並配合一個LIFO棧(Stack)來管理多餘的生產者和消費者,這也是SynchronousQueue預設的模式
如下原始碼:
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); }
transferer 是一個內部類用於在生產者和消費者之間傳遞資料
abstract static class Transferer { /** * Performs a put or take. **/ abstract Object transfer(Object e, boolean timed, long nanos); }
例如:直接put元素會阻塞
package cn.qlq.thread.thirteen; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Demo5 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class); public static void main(String[] args) throws InterruptedException { BlockingQueue<String> persons = new SynchronousQueue<String>(); persons.put("1"); LOGGER.info("放入元素 1"); LOGGER.info("獲取元素 "+persons.take()); } }
結果:(執行緒會一直處於阻塞狀態,由於沒有消費者執行緒消費元素所以一直處於阻塞,所以不會執行LOGGER.info()的程式碼)
解決辦法:生產元素之前,先開啟消費者執行緒:(也就是必須確保生產的元素有消費者在take(),否則會阻塞)
package cn.qlq.thread.thirteen; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Demo5 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class); public static void main(String[] args) throws InterruptedException { final BlockingQueue<String> strings = new SynchronousQueue<String>(); Thread consumer = new Thread(new Runnable() { @Override public void run() { try { String take = strings.take(); LOGGER.info("ThreadName ->{} take ele->{}",Thread.currentThread().getName(),take); } catch (InterruptedException e) { e.printStackTrace(); } } },"consumer"); consumer.start(); strings.put("1"); LOGGER.info("放入元素 1"); } }
結果:(正常列印資訊,並且程序也結束)