015.多執行緒-併發佇列
阿新 • • 發佈:2018-12-27
在併發佇列上JDK提供了兩套實現,
一個是以ConcurrentLinkedQueue為代表的高效能佇列,
一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue。
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一個適用於高併發場景下的佇列,
通過無鎖的方式,實現了高併發狀態下的高效能,
通常ConcurrentLinkedQueue效能好於BlockingQueue。
它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先進先出的原則。
頭是最先加入的,尾是最近加入的,該佇列不允許null元素。
-
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)
public boolean add(E e) { return offer(e); }
-
poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。
package cn.qbz.thread; import java.util.concurrent.ConcurrentLinkedQueue; public class Test111904 { public static void main
BlockingQueue
在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
當佇列滿時,儲存元素的執行緒會等待佇列可用。
阻塞佇列常用於生產者和消費者的場景
ArrayBlockingQueue
ArrayBlockingQueue是一個有邊界的阻塞佇列,它的內部實現是一個數組。
有邊界的意思是它的容量是有限的,
我們必須在其初始化的時候指定它的容量大小,
容量大小一旦指定就不可改變。
package cn.qbz.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test111905 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue(2);
queue.add("test1");
queue.offer("test2");
Boolean isOffer = queue.offer("test3");
System.out.println("新增test3:"+isOffer);
isOffer = queue.offer("test4", 1, TimeUnit.SECONDS);
System.out.println("新增test4:"+isOffer);
for (int i = 0; i < 4; i++) {
System.out.println(queue.poll());
}
}
}
其中,add和offer的區別是:
當超出佇列界限時,add會丟擲異常,offer只是返回false。
LinkedBlockingQueue
LinkedBlockingQueue阻塞佇列大小的配置是可選的,
如果我們初始化時指定一個大小,它就是有邊界的,
如果不指定,它就是無邊界的。說是無邊界,
其實是採用了預設大小為Integer.MAX_VALUE的容量 。
它的內部實現是一個連結串列。
code of demo:
package cn.qbz.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Test111906 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new LinkedBlockingDeque(3);
Produce111906 p1 = new Produce111906(queue);
Consumer111906 c1 = new Consumer111906(queue);
Thread produce1 = new Thread(p1);
Thread produce2 = new Thread(p1);
Thread consumer1 = new Thread(c1);
produce1.start();
produce2.start();
consumer1.start();
Thread.sleep(1000 * 10);
p1.stop();
c1.stop();
}
}
class Produce111906 implements Runnable {
private BlockingQueue<String> queue;
private volatile Boolean flag = true;
private AtomicInteger count = new AtomicInteger();
public Produce111906(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
while (flag) {
try {
int data = count.incrementAndGet();
System.out.println("開始生產資料:" + data);
Boolean isOffer = queue.offer(data + "", 2, TimeUnit.SECONDS);
if (isOffer) {
System.out.println("寫入資料:" + data + "成功");
} else {
System.out.println("寫入資料:" + data + "失敗");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("退出本次生產");
}
}
}
public void stop() {
flag = false;
}
}
class Consumer111906 implements Runnable {
private BlockingQueue<String> queue;
private volatile Boolean flag = true;
public Consumer111906(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
while (flag) {
System.out.println("開始消費資料");
try {
String data = queue.poll(2, TimeUnit.SECONDS);
if (data != null) {
System.out.println("消費成功:" + data);
} else {
System.out.println("消費失敗");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
flag = false;
}
}