LinkedBlockingQueue原始碼淺析
阿新 • • 發佈:2018-12-13
一個由連結串列結構組成的有界阻塞佇列
成員變數
// 允許的最大容量 private final int capacity; // 當前節點個數,因為有兩個鎖,所以節點個數採用原子類 private final AtomicInteger count = new AtomicInteger(); // 連結串列的頭節點 transient Node<E> head; // 連結串列的尾節點 private transient Node<E> last; // 出隊鎖 private final ReentrantLock takeLock = new ReentrantLock(); // 出隊鎖的等待佇列 private final Condition notEmpty = takeLock.newCondition(); // 入隊鎖 private final ReentrantLock putLock = new ReentrantLock(); // 入隊鎖的等待佇列 private final Condition notFull = putLock.newCondition();
構造方法
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 頭節點不儲存元素 last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
入隊
// 入隊,阻塞 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 獲取入隊鎖 putLock.lockInterruptibly(); try { // 如果個數已滿,則當前執行緒進入隊鎖的等待佇列 while (count.get() == capacity) { notFull.await(); } // 新增元素後,個數加1 enqueue(node); c = count.getAndIncrement(); // 如果當前容量還未滿,則通知入隊鎖等待佇列的執行緒 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果新增元素前的個數為0,則通知出隊鎖等待佇列的執行緒 if (c == 0) signalNotEmpty(); } // 入隊,與put相比有個超時等待,超過時間就取消入隊 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } // 入隊,不會超時等待或者阻塞等待,容量未滿時新增元素,容量已滿時取消新增 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
出隊
// 出隊,阻塞
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 獲取出隊鎖
takeLock.lockInterruptibly();
try {
// 如果當前容量為0,則執行緒進出隊鎖的等待佇列
while (count.get() == 0) {
notEmpty.await();
}
// 先出隊,再進行個數操作
x = dequeue();
c = count.getAndDecrement();
// 如果容量超過1,則喚醒notEmpty等待佇列的執行緒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果之前個數已滿,則喚醒notFull等待佇列的執行緒
if (c == capacity)
signalNotFull();
return x;
}
// 出隊,超時等待,超時後取消出隊
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 出隊,非阻塞、非超時等待,如果個數為0,則取消出隊
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
包含
// 包含
public boolean contains(Object o) {
if (o == null) return false;
// 獲取入隊鎖和出隊鎖
fullyLock();
try {
// 遍歷連結串列,找出是否包含元素
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
刪除
// 刪除
public boolean remove(Object o) {
if (o == null) return false;
// 獲取入隊鎖和出隊鎖
fullyLock();
try {
// 遍歷連結串列刪除元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}