java併發包訊息佇列
訊息佇列常用於有生產者和消費者兩類角色的多執行緒同步場景
BlockingQueue也是java.util.concurrent下的主要用來控制執行緒同步的工具。
主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。
插入:
1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則丟擲異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻塞直到BlockingQueue裡面有空間再繼續.
讀取:
4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止
其他
int remainingCapacity();返回佇列剩餘的容量,在佇列插入和獲取的時候,不要瞎搞,數 據可能不準
boolean remove(Object o); 從佇列移除元素,如果存在,即移除一個或者更多,佇列改 變了返回true
public boolean contains(Object o); 檢視佇列是否存在這個元素,存在返回true
int drainTo(Collection<? super E> c); 傳入的集合中的元素,如果在佇列中存在,那麼將 佇列中的元素移動到集合中
int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區別在於,制定了移 動的數量
案例:
package blockingqueue; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { String consumer = Thread.currentThread().getName(); System.out.println(consumer); //如果佇列為空,會阻塞當前執行緒 String temp = queue.take(); System.out.println(consumer + "消費者 get a product:" + temp); } catch (Exception e) { e.printStackTrace(); } } } |
package blockingqueue; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { String temp = "A Product, 生產執行緒:" + Thread.currentThread().getName(); queue.put(temp);//如果佇列是滿的話,會阻塞當前執行緒 System.out.println("生產者 I have made a product: " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } } |
package blockingqueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Test { public static void main(String[] args) throws Exception { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2); // BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); // 不設定的話,LinkedBlockingQueue預設大小為Integer.MAX_VALUE // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); for (int i = 0; i < 3; i++) { new Thread(producer, "Producer" + (i + 1)).start(); } for (int i = 0; i < 5; i++) { new Thread(consumer, "Consumer" + (i + 1)).start(); } Thread.sleep(5000); new Thread(producer, "Producer" + (5)).start(); } } |
BlockingQueue有四個具體的實現類,常用的兩種實現類為:
1、ArrayBlockingQueue:一個由陣列支援的有界阻塞佇列,規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的。
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。
LinkedBlockingQueue和ArrayBlockingQueue區別:
LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.
生產者消費者的示例程式碼:
見程式碼 TestBlockingQueue TestBlockingQueueConsumer TestBlockingQueueProducer
package blockingqueue; import java.util.Random; import java.util.concurrent.BlockingQueue; public class TestBlockingQueueProducer implements Runnable { BlockingQueue<String> queue; Random random = new Random(); public TestBlockingQueueProducer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(random.nextInt(10)); String task = Thread.currentThread().getName() + " made a product " + i; System.out.println(task); queue.put(task); //阻塞方法 } catch (InterruptedException e) { e.printStackTrace(); } } } } |
package blockingqueue; import java.util.Random; import java.util.concurrent.BlockingQueue; public class TestBlockingQueueConsumer implements Runnable { BlockingQueue<String> queue; Random random = new Random(); public TestBlockingQueueConsumer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { Thread.sleep(random.nextInt(10)); System.out.println(Thread.currentThread().getName()+ "trying..."); String temp = queue.take();//如果佇列為空,會阻塞當前執行緒 int remainingCapacity = queue.remainingCapacity(); System.out.println(Thread.currentThread().getName() + " get a job " +temp); // System.out.println("佇列中的元素個數: "+ remainingCapacity); } catch (InterruptedException e) { e.printStackTrace(); } } } |
package blockingqueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class TestBlockingQueue { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2); // BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); // 不設定的話,LinkedBlockingQueue預設大小為Integer.MAX_VALUE // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue); TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue); for (int i = 0; i < 3; i++) { new Thread(producer, "Producer" + (i + 1)).start(); } for (int i = 0; i < 5; i++) { new Thread(consumer, "Consumer" + (i + 1)).start(); } } } |