1. 程式人生 > >LinkedBlockingQueue原始碼分析

LinkedBlockingQueue原始碼分析

簡介

LinkedBlockingQueue以連結串列(單鏈表)為基礎實現的佇列,先進先出,head是佇列中存在最久的元素,tail是最新加入的元素,新增元素從隊尾新增;刪除元素,從隊頭刪除; 不容許插入null;執行緒安全,用ReentrantLock實現執行緒安全;用法和ArrayBlockingQueue一致

AtomicInteger類

AtomicInteger,一個提供原子操作的Integer的類。在Java語言中,++i和i++操作並不是執行緒安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種執行緒安全的加減操作介面。

public final int get() //獲取當前的值 public final int getAndSet(int newValue)//獲取當前的值,並設定新的值 public final int getAndIncrement()//獲取當前的值,並自增 public final int getAndDecrement() //獲取當前的值,並自減 public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值

建構函式

建構函式:預設容量是Integer.MAX;也可以指定固定容量 Java是自右向左逐一賦值的,比如:A=B=C=0,首先給C賦值0,即C=0,然後B=C;最後A=B。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
	//先給head賦值;再給last賦值head;(last,head都指向同一記憶體地址),此時head,last節點的值都為null,並且next都指向null;
    last = head = new Node<E>(null);
}

新增 add(),offer(),put()

add()方法:佇列沒滿之前,直接新增,返回true;否則報佇列已滿異常;

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer():是否新增成功;佇列已滿,新增失敗,返回false;未滿,新增成功,返回true;

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
	//判斷容量是否已滿
    if (count.get() == capacity)
        return false;
    int c = -1;
	//建立新的節點
    Node<E> node = new Node<E>(e);
	//插入鎖
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
		//佇列未滿
        if (count.get() < capacity) {
            enqueue(node);
			//獲取當前的值,並且自增;新增成功之後,+1判斷佇列是否已滿;未滿,喚醒一個在await佇列的執行緒;
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
	//非空 喚醒一個在await佇列的執行緒;
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

enqueue():在隊尾新增元素節點;在連結串列末端加上一個新節點; Java是自右向左逐一賦值的,比如:A=B=C=0,首先給C賦值0,即C=0,然後B=C;最後A=B。

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
	//先給last.next賦值node;即此時head,last節點的next引用都指向node節點;
	//再給last賦值last.next(即node節點);即last = node; 
	//綜上所述:佇列初始化後,佇列插入第一個元素時,就給head的next引用賦值了;非第一次插入元素,只改變last;
    last = last.next = node;
}

put():插入時,佇列已滿,則執行緒阻塞;否則直接插入;

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//佇列已滿,則執行緒阻塞
        while (count.get() == capacity) {
            notFull.await();
        }
		//在佇列尾插入新節點
        enqueue(node);
        c = count.getAndIncrement();
		//判斷容量是否已滿;
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
	//佇列是否為空;
    if (c == 0)
        signalNotEmpty();
}

獲取隊頭元素:element(),peek()

element():佇列不為空,返回佇列頭的元素值;否則報異常;

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

peek():佇列為空,返回null;不為空,返回佇列頭的元素值;

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
		//head物件next引用在插入操作呼叫enqueue()就賦值了,指向佇列頭節點;
        return (count.get() > 0) ? head.next.item : null;
    } finally {
        takeLock.unlock();
    }
}

刪除 poll(),take(),remove()

poll():從隊尾刪除,佇列為空,直接返回null;不為空,返回刪除的節點的值;

public E poll() {
    final AtomicInteger count = this.count;
	//佇列為空
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
		//佇列不為空
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue():返回佇列頭結點的值;

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
	//此時head 的值為null,next指向佇列頭結點;賦值給節點h;
    Node<E> h = head;
	//佇列頭節點;
    Node<E> first = h.next;
    h.next = h; // help GC
	//將head賦值
    head = first;
	//取出值x;
    E x = first.item;
	//把值置為null,稱為正式的頭結點,其next引用指向佇列頭結點;
    first.item = null;
    return x;
}

take():如果佇列為空,執行緒阻塞;不為空,直接返回佇列頭結點對應的值;

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

remove():從佇列中刪除某個物件,true 佇列存在該元素,並刪除成功;false 佇列不存在該元素;

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
		//從頭結點開始遍歷,確認是否存在該物件
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

unlink()方法:

//p節點時要刪除的節點;trail是p節點的上一個節點
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
	//將p節點的值設為null;
    p.item = null;
	//trail指向p節點的下一個節點;
    trail.next = p.next;
	//如果p是隊尾節點,則last=trail;
    if (last == p)
        last = trail;
	//非滿佇列 喚醒對應await佇列一個執行緒
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

以上就是LinkedBlockingQueue的主要方法分析,如有問題,請多指教,謝謝!