LinkedBlockingQueue原始碼分析
簡介
LinkedBlockingQueue以連結串列(單鏈表)為基礎實現的佇列,先進先出,head是佇列中存在最久的元素,tail是最新加入的元素,新增元素從隊尾新增;刪除元素,從隊頭刪除; 不容許插入null;執行緒安全,用ReentrantLock實現執行緒安全;用法和ArrayBlockingQueue一致
AtomicInteger類
AtomicInteger,一個提供原子操作的Integer的類。在Java語言中,++i和i++操作並不是執行緒安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種執行緒安全的加減操作介面。
public final int get() //獲取當前的值 public final int getAndSet(int newValue)//獲取當前的值,並設定新的值 public final int getAndIncrement()//獲取當前的值,並自增 public final int getAndDecrement() //獲取當前的值,並自減 public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值
建構函式
建構函式:預設容量是Integer.MAX;也可以指定固定容量 Java是自右向左逐一賦值的,比如:A=B=C=0,首先給C賦值0,即C=0,然後B=C;最後A=B。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //先給head賦值;再給last賦值head;(last,head都指向同一記憶體地址),此時head,last節點的值都為null,並且next都指向null; last = head = new Node<E>(null); }
新增 add(),offer(),put()
add()方法:佇列沒滿之前,直接新增,返回true;否則報佇列已滿異常;
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer():是否新增成功;佇列已滿,新增失敗,返回false;未滿,新增成功,返回true;
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //判斷容量是否已滿 if (count.get() == capacity) return false; int c = -1; //建立新的節點 Node<E> node = new Node<E>(e); //插入鎖 final ReentrantLock putLock = this.putLock; putLock.lock(); try { //佇列未滿 if (count.get() < capacity) { enqueue(node); //獲取當前的值,並且自增;新增成功之後,+1判斷佇列是否已滿;未滿,喚醒一個在await佇列的執行緒; c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //非空 喚醒一個在await佇列的執行緒; if (c == 0) signalNotEmpty(); return c >= 0; }
enqueue():在隊尾新增元素節點;在連結串列末端加上一個新節點; Java是自右向左逐一賦值的,比如:A=B=C=0,首先給C賦值0,即C=0,然後B=C;最後A=B。
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//先給last.next賦值node;即此時head,last節點的next引用都指向node節點;
//再給last賦值last.next(即node節點);即last = node;
//綜上所述:佇列初始化後,佇列插入第一個元素時,就給head的next引用賦值了;非第一次插入元素,只改變last;
last = last.next = node;
}
put():插入時,佇列已滿,則執行緒阻塞;否則直接插入;
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//佇列已滿,則執行緒阻塞
while (count.get() == capacity) {
notFull.await();
}
//在佇列尾插入新節點
enqueue(node);
c = count.getAndIncrement();
//判斷容量是否已滿;
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//佇列是否為空;
if (c == 0)
signalNotEmpty();
}
獲取隊頭元素:element(),peek()
element():佇列不為空,返回佇列頭的元素值;否則報異常;
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
peek():佇列為空,返回null;不為空,返回佇列頭的元素值;
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//head物件next引用在插入操作呼叫enqueue()就賦值了,指向佇列頭節點;
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
刪除 poll(),take(),remove()
poll():從隊尾刪除,佇列為空,直接返回null;不為空,返回刪除的節點的值;
public E poll() {
final AtomicInteger count = this.count;
//佇列為空
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//佇列不為空
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue():返回佇列頭結點的值;
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//此時head 的值為null,next指向佇列頭結點;賦值給節點h;
Node<E> h = head;
//佇列頭節點;
Node<E> first = h.next;
h.next = h; // help GC
//將head賦值
head = first;
//取出值x;
E x = first.item;
//把值置為null,稱為正式的頭結點,其next引用指向佇列頭結點;
first.item = null;
return x;
}
take():如果佇列為空,執行緒阻塞;不為空,直接返回佇列頭結點對應的值;
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove():從佇列中刪除某個物件,true 佇列存在該元素,並刪除成功;false 佇列不存在該元素;
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
//從頭結點開始遍歷,確認是否存在該物件
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
unlink()方法:
//p節點時要刪除的節點;trail是p節點的上一個節點
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//將p節點的值設為null;
p.item = null;
//trail指向p節點的下一個節點;
trail.next = p.next;
//如果p是隊尾節點,則last=trail;
if (last == p)
last = trail;
//非滿佇列 喚醒對應await佇列一個執行緒
if (count.getAndDecrement() == capacity)
notFull.signal();
}
以上就是LinkedBlockingQueue的主要方法分析,如有問題,請多指教,謝謝!