Java併發學習筆記(八)-LinkedBlockingQueue
阿新 • • 發佈:2019-01-07
LinkedBlockingQueue是由連結串列組成的阻塞佇列,先來看demo
public class LinkedBlockingQueueDemo { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); BlockingQueue<Bread> queue = new LinkedBlockingQueue<Bread>(10); for(int i = 0; i < 2; i++){ es.execute(new Baker(queue)); } for(int i = 0; i < 10; i++){ es.execute(new BreadConsumer(queue)); } es.shutdown(); } } class Baker implements Runnable{ private static int no = 0; private int id = ++no; private int count = 0; private BlockingQueue<Bread> queue; public Baker(BlockingQueue<Bread> queue){ this.queue = queue; } @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("麵包師" + id + "正準備做第" + ++count + "麵包"); Bread bread = new Bread(); // 滿佇列情況下,阻塞 try { queue.put(bread); System.out.println("麵包師" + id + "做的第" + count + "麵包是麵包"+ bread.getId()); } catch (InterruptedException e) { } } } } class BreadConsumer implements Runnable{ private static int no = 0; private int id = ++no; private int count = 0; private BlockingQueue<Bread> queue; public BreadConsumer(BlockingQueue<Bread> queue){ this.queue = queue; } @Override public void run() { for(int i = 0; i < 2; i++){ System.out.println("顧客 " + id + "準備買第" + ++count +"個麵包" ); Bread bread = null; // 空佇列情況下,阻塞 try { bread = queue.take(); } catch (InterruptedException e) { } <span style="white-space:pre"> </span>System.out.println("顧客" + id + "買到的第" +count+"個麵包是麵包" + bread.getId()); } } } class Bread { private static int count = 0; private int id = ++count; public int getId() { return id; } }
預設情況下,其容量為Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
也可以指定一個容量以免連結串列過度擴張
在LinkedBlockingQueue的連結串列裡,有兩個指標,分別指向佇列頭和佇列尾。與ArrayBlockingQueue不同的是,在LinkedBlockingQueue裡,有兩把鎖,分別鎖佇列頭和佇列尾。這樣做是有好處的,可以同時入隊和出隊,比ArrayBlockingQueue效能高public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
我們來看put阻塞方法,如果佇列滿,會一直阻塞,直到佇列不滿/** 連結串列頭 */ private transient Node<E> head; /** 連結串列尾 */ private transient Node<E> last; /** 出隊的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 入隊的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
往佇列裡插入資料時,首先會持有putLock鎖,如果當前佇列元素個數跟容量相等,阻塞,呼叫notFull.await。不然,入隊。入隊後如果元素個數小於佇列容量,會喚醒其它的阻塞的插入執行緒。最後一句,不明白,元素為0,為什麼會去執行喚醒空條件?求指教
佇列裡,插入元素,會插入隊尾
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
last = last.next = new Node<E>(x);
}
再來看出隊操作take,也是阻塞方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
出隊獲取的是takeLock,類似的,如果當前佇列元素個數為0,阻塞,呼叫notEmpty.await。不然,出隊。出隊後如果元素個數大於0,會喚醒其它的阻塞的出隊執行緒。出隊從隊頭出隊
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}