1. 程式人生 > >Java ArrayBlockingQueue原始碼解析

Java ArrayBlockingQueue原始碼解析

ArrayBlockingQueue是Java併發框架中阻塞佇列的最基本的實現,分析這個類就可以知道併發框架中是如何實現阻塞的。

筆者工作了一兩年之後,還不知道阻塞是如何實現的,當然有一個原因是前期學習的東西比較雜,前後端的東西的懂一點,但是瞭解的不夠深刻,我覺得這是程式設計學習的禁忌,不管是前端還是後端,在工作3年的時候,你應該有一個方向是拿得出手,見得了人的。

轉回整體,ArrayBlockingQueue實現阻塞佇列的關鍵在與,對鎖(Lock)和等待條件(Condition)的使用的使用,這兩個實現的基本功能類似域wait()和notify(),是wait()和notify()的高階用法。

本文我們主要分析ArrayBlockingQueue的3個核心方法,put(),take()和poll()。

首先是put()

/**
     * 從佇列的尾部插入元素,如果佇列已滿,將阻塞等待到佇列有空間的時候進行插入操作。
     *
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();// 獲得當前執行緒的鎖
        try {
            while (count == items.length)// 迴圈等待
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();//釋放鎖
        }
    }

 這個方法在當前執行緒沒有中斷的情況下,獲取鎖,接著對陣列容量進行判斷,如果容量已滿,則迴圈等待帶容量騰出來為止,最後釋放當前執行緒鎖。這樣的業務邏輯就產生了這樣的場景,執行緒一進入到該方法,成功插入佇列,釋放鎖,假設剛好容量滿;執行緒二進入該方法,迴圈等待;執行緒三從容器中獲取元素;執行緒二判斷容量未滿,插入,釋放鎖。如果有多個執行緒在等待的時候,會出現什麼情況呢,從程式碼的邏輯來看,當多個執行緒都在阻塞等待的時候,要看誰首先搶到鎖,也就是消費方法是搶佔式的。

其次時take()

/**
     * 返回佇列頭部的元素,如果佇列為空,阻塞等待其他執行緒往當前容器放入元素為止。
     */
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//  獲取當前執行緒的鎖
        try {
            while (count == 0)// 阻塞等待
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();// 釋放鎖
        }
    }

這個過程和put()方法類似,這裡就不羅嗦了。

最後poll(),poll有兩個過載的方法,有引數和無引數,先講無引數的。

/**
     * 獲取佇列頭部的元素,當佇列為空的時候,返回null值
     *
     */
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();// 鎖定
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();// 解鎖
        }
    }

 這個方法是不阻塞的,當佇列未空的時候,直接返回null值,所以實現中只是一個鎖的簡單使用,防止併發問題。

poll(...)

/**
     * 獲取佇列的頭部元素,在指定時間之內阻塞等待,如果超出阻塞時間佇列仍然空,則返回null值。
     *
     */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();// 獲取鎖
        try {
            while (count == 0) {// 指定時間內迴圈等待
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return extract();
        } finally {
            lock.unlock();// 解鎖
        }
    }

 這個方法的邏輯和其他方法的不同之處就在Condition的一個時間計數器方法awaitNanos(...),這裡先將時間大小根據時間單位換算成納秒的數值,當佇列容量為0是,使用Condition.awaitNanos(...),進行技術,超時後返回空。

總之,ArrayBlokingQueue使用的Java的現實鎖(Lock)配合Condition進行阻塞,使用Condition進行時間技術。而在併發框架中其他的阻塞和時間技術,也同樣是用這兩個物件API來實現。