java併發-特大疑問-阻塞佇列(BlockingQueue)
阿新 • • 發佈:2018-12-22
java併發-阻塞佇列(BlockingQueue)
何為阻塞佇列
A {@link java.util.Queue} that additionally supports operations
that wait for the queue to become non-empty when retrieving an
element, and wait for space to become available in the queue when
storing an element.\
即:
- 在新增元素的時候,在佇列滿的時候會處於等待狀態,知道佇列有空加才去新增
- 在移除元素的時候,當佇列為空即佇列長隊為0的時候,將等待知道佇列不為空才可移除佇列node
何處用到了阻塞佇列?
- 執行緒池,ThreadPoolExecutor使用的是BlockingQueue
實現類
- ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
- LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
- PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
- DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
- SynchronousQueue:一個不儲存元素的阻塞佇列
- LinkedTransferQueue 個由連結串列結構組成的無界阻塞佇列
- LinkedBlockingQueue 一個由連結串列結構組成的雙向阻塞佇列
詳解LinkedBlockingQueue(tomcat執行緒池用的是LinkedBlockingQueue)
入隊出隊流程
init
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); //queue的最大容量 this.capacity = capacity; //last和head指向同一個引用 last = head = new Node<E>(null); }
enqueue
- 第一次入隊,那麼last和head就分道揚鑣了
- 每一次入隊,last都指向最後一個node
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
dequeue
- 每一次出隊操作,head.next的引用就指向佇列的下一個,head.item永遠都為空,head.next才是正真的隊首
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
執行緒安全
採用的是AQS的ReentrantLock來實現
- ReentrantLock(putLock)
- notFull(Codition)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
//區域性變數保證 可重複讀,與外界隔離,有點事務的隔離性
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
//入隊操作
enqueue(node);
c = count.getAndIncrement();
//這裡必須要通知其他put執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//不懂
if (c == 0)
signalNotEmpty();
}
- ReentrantLock(takeLock)
- notEmpty(Codition)
//同上
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//不懂
if (c == capacity)
signalNotFull();
return x;
}
入隊和出隊採用不同的鎖,通過AtomicInteger來保證佇列技術器的執行緒安全。不同操作使用不同的鎖,效率很高。通過Codition來進行執行緒間
通訊,及時喚醒。這裡採用了兩把鎖,相比一把鎖更加高效,take和put操作互不影響。
疑問
這裡的c在在put操作後,個人感覺不可能為0,不懂作者的意圖
if (c == 0)
signalNotEmpty();
這裡的c在在take操作後,個人感覺不可能為capacity,不懂作者的意圖
if (c == capacity)
signalNotFull();
return x;