JAVA併發程式設計:阻塞佇列-ArrayBlockingQueue
阿新 • • 發佈:2018-11-30
生活
有很多的不快樂,其實是源自不滿足,而不滿足,很多時候是源自於心不定,而心不定則是因為不清楚究竟自己要什麼,不清楚要什麼的結果就是什麼都想要,結果什麼都沒得到。
生產者消費者模式
生產者和消費者問題是執行緒模型中一個經典問題:
生產者和消費者在同一個時間段內共用一塊記憶體區域,由生產者在這塊記憶體區域建立消費者需要的資料,由消費者取走資料並消費。
對於生產者消費者模型的應用例項,JDK1.5提供了阻塞佇列。
下面來看下ArrayBlockingQueue,這是一個有界的,陣列結構的阻塞佇列。
ArrayBlockingQueue 成員組成
先來看下ArrayBlockingQueue的成員組成:
//元素容器
final Object[] items;
//出隊索引
int takeIndex;
//入隊索引
int putIndex;
//佇列中元素個數
int count;
//鎖
final ReentrantLock lock;
//出隊條件
private final Condition notEmpty;
//入隊條件
private final Condition notFull;
組成明瞭,出隊和入隊各有一個條件,互不干擾。
ArrayBlockingQueue 之建立
//建立指定長度的非公平 有界陣列阻塞佇列 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //可以指定fair為true建立公平的佇列 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //可以傳入集合 構建一個初始化佇列 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
ArrayBlockingQueue 之生產
不阻塞生產:
//呼叫offer,如果滿了就報錯 public boolean add(E e) { return super.add(e); } // public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //滿了就返回false; if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
阻塞生產:
//阻塞一定時間,不能入隊就返回false,可以中斷
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
}
// 一直阻塞直到入隊或者被中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//當佇列滿時等待
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
核心入隊方法:
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
//入隊成功後喚醒等到在NotEmpty上的執行緒
notEmpty.signal();
}
ArrayBlockingQueue 之消費
不阻塞消費:
// 沒有元素返回null,有就返回並移除佇列中該元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
//不阻塞返回資料 不移除佇列
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
阻塞消費:
//一直阻塞值到出隊 或者被中斷
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
//阻塞消費,超時後返回null,可以中斷
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return extract();
} finally {
lock.unlock();
}
}
出隊核心方法:
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
//移除元素後並沒有整體往前挪,只是索引+1
takeIndex = inc(takeIndex);
--count;
//出隊成功後喚醒等待在NotFull上的執行緒
notFull.signal();
return x;
}
例項
public class ABQTest {
public static class Bread{
String name;
String price;
public Bread(String name, String price) {
this.name = name;
this.price = price;
}
@Override
public String toString() {
return String.format("[麵包:%s,價格:%s]",name,price);
}
}
public static class Producer implements Runnable{
private ArrayBlockingQueue<Bread> queue;
private Bread bread;
private String name;
private CountDownLatch latch;
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(Long.valueOf(new Random().nextInt(1000)));
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
queue.put(bread);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("name:%s,生產:%s",name,bread.toString()));
}
public Producer(ArrayBlockingQueue<Bread> queue, Bread bread, String name, CountDownLatch latch) {
this.queue = queue;
this.bread = bread;
this.name = name;
this.latch = latch;
}
}
public static class Consumer implements Runnable{
private ArrayBlockingQueue<Bread> queue;
private String name;
private CountDownLatch latch;
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(Long.valueOf(new Random().nextInt(1000)));
} catch (InterruptedException e) {
e.printStackTrace();
}
Bread bread = null;
try {
bread=queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("name:%s,買了:%s",name,bread.toString()));
}
public Consumer(ArrayBlockingQueue<Bread> queue, String name, CountDownLatch latch) {
this.queue = queue;
this.name = name;
this.latch = latch;
}
}
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(4);
new Thread(new Producer(queue,new Bread("肉鬆","10"),"趙",latch)).start();
new Thread(new Producer(queue,new Bread("蛋黃","7"),"錢",latch)).start();
new Thread(new Producer(queue,new Bread("火腿","15"),"孫",latch)).start();
new Thread(new Producer(queue,new Bread("蔬菜","5"),"李",latch)).start();
new Thread(new Producer(queue,new Bread("香腸","8"),"周",latch)).start();
new Thread(new Consumer(queue,"A",latch)).start();
new Thread(new Consumer(queue,"B",latch)).start();
new Thread(new Consumer(queue,"C",latch)).start();
new Thread(new Consumer(queue,"D",latch)).start();
new Thread(new Consumer(queue,"E",latch)).start();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
}
name:錢,生產:[麵包:蛋黃,價格:7]
name:周,生產:[麵包:香腸,價格:8]
name:A,買了:[麵包:蛋黃,價格:7]
name:B,買了:[麵包:香腸,價格:8]
name:趙,生產:[麵包:肉鬆,價格:10]
name:E,買了:[麵包:肉鬆,價格:10]
name:孫,生產:[麵包:火腿,價格:15]
name:C,買了:[麵包:火腿,價格:15]
name:D,買了:[麵包:蔬菜,價格:5]
name:李,生產:[麵包:蔬菜,價格:5]