1. 程式人生 > >LinkedBlockingQueue原始碼淺析

LinkedBlockingQueue原始碼淺析

一個由連結串列結構組成的有界阻塞佇列

成員變數

    // 允許的最大容量
    private final int capacity;

    // 當前節點個數,因為有兩個鎖,所以節點個數採用原子類
    private final AtomicInteger count = new AtomicInteger();

    // 連結串列的頭節點
    transient Node<E> head;

    // 連結串列的尾節點
    private transient Node<E> last;

    // 出隊鎖
    private final ReentrantLock takeLock = new ReentrantLock();

    // 出隊鎖的等待佇列
    private final Condition notEmpty = takeLock.newCondition();

    // 入隊鎖
    private final ReentrantLock putLock = new ReentrantLock();

    // 入隊鎖的等待佇列
    private final Condition notFull = putLock.newCondition();

構造方法

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
	// 頭節點不儲存元素
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

入隊

    // 入隊,阻塞
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
	// 獲取入隊鎖
        putLock.lockInterruptibly();
        try {
	    // 如果個數已滿,則當前執行緒進入隊鎖的等待佇列
            while (count.get() == capacity) {
                notFull.await();
            }
	    // 新增元素後,個數加1
            enqueue(node);
            c = count.getAndIncrement();
	    // 如果當前容量還未滿,則通知入隊鎖等待佇列的執行緒
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
	// 如果新增元素前的個數為0,則通知出隊鎖等待佇列的執行緒
        if (c == 0)
            signalNotEmpty();
    }

    // 入隊,與put相比有個超時等待,超過時間就取消入隊
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    // 入隊,不會超時等待或者阻塞等待,容量未滿時新增元素,容量已滿時取消新增
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

出隊

    // 出隊,阻塞
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
	// 獲取出隊鎖
        takeLock.lockInterruptibly();
        try {
	    // 如果當前容量為0,則執行緒進出隊鎖的等待佇列
            while (count.get() == 0) {
                notEmpty.await();
            }
	    // 先出隊,再進行個數操作
            x = dequeue();
            c = count.getAndDecrement();
	    // 如果容量超過1,則喚醒notEmpty等待佇列的執行緒
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
	// 如果之前個數已滿,則喚醒notFull等待佇列的執行緒
        if (c == capacity)
            signalNotFull();
        return x;
    }

    // 出隊,超時等待,超時後取消出隊
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    // 出隊,非阻塞、非超時等待,如果個數為0,則取消出隊
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

包含

    // 包含
    public boolean contains(Object o) {
        if (o == null) return false;
	// 獲取入隊鎖和出隊鎖
        fullyLock();
        try {
	    // 遍歷連結串列,找出是否包含元素
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }

刪除

    // 刪除
    public boolean remove(Object o) {
        if (o == null) return false;
        // 獲取入隊鎖和出隊鎖
        fullyLock();
        try {
	    // 遍歷連結串列刪除元素
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }