1. 程式人生 > >ArrayBlockingQueue原始碼解析(1)

ArrayBlockingQueue原始碼解析(1)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


注意:在閱讀本文之前或在閱讀的過程中,需要用到ReentrantLock,內容見《第五章 ReentrantLock原始碼解析1--獲得非公平鎖與公平鎖lock()》《第六章 ReentrantLock原始碼解析2--釋放鎖unlock()》《第七章 ReentrantLock總結

1、對於ArrayBlockingQueue需要掌握以下幾點

  • 建立

  • 入隊(新增元素)

  • 出隊(刪除元素)

2、建立

  • public ArrayBlockingQueue(int capacity, boolean fair)

  • public ArrayBlockingQueue(int capacity)

使用方法:

  • Queue<String> abq = new ArrayBlockingQueue<String>(2);

  • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

通過使用方法,可以看出ArrayBlockingQueue支援ReentrantLock的公平鎖模式與非公平鎖模式,對於這兩種模式,檢視本文開頭的文章即可。

原始碼如下:

    private final E[] items;//底層資料結構
    private int takeIndex;//用來為下一個take/poll/remove的索引(出隊)
    private int putIndex;//用來為下一個put/offer/add的索引(入隊)
    private int count;//佇列中元素的個數

    /*
     * Concurrency control uses the classic two-condition algorithm found in any
     * textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;//鎖
    /** Condition for waiting takes */
    private final Condition notEmpty;//等待出隊的條件
    /** Condition for waiting puts */
    private final Condition notFull;//等待入隊的條件

    /**
     * 創造一個佇列,指定佇列容量,指定模式
     * @param fair
     * true:先來的執行緒先操作
     * false:順序隨機
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];//初始化類變數陣列items
        lock = new ReentrantLock(fair);//初始化類變數鎖lock
        notEmpty = lock.newCondition();//初始化類變數notEmpty Condition
        notFull = lock.newCondition();//初始化類變數notFull Condition
    }

    /**
     * 創造一個佇列,指定佇列容量,預設模式為非公平模式
     * @param capacity <1會拋異常
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

注意:

  • ArrayBlockingQueue的組成:一個物件陣列+1把鎖ReentrantLock+2個條件Condition

  • 在檢視原始碼的過程中,也要模仿帶條件鎖的使用,這個雙條件鎖模式是很經典的模式

3、入隊

3.1、public boolean offer(E e)

原理:

  • 在隊尾插入一個元素, 如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false

使用方法:

  • abq.offer("hello1");

原始碼:

    /**
     * 在隊尾插入一個元素,
     * 如果佇列沒滿,立即返回true;
     * 如果佇列滿了,立即返回false
     * 注意:該方法通常優於add(),因為add()失敗直接拋異常
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//陣列滿了
                return false;
            else {//陣列沒滿
                insert(e);//插入一個元素
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;//插入元素
        putIndex = inc(putIndex);//putIndex+1
        ++count;//元素數量+1
        /**
         * 喚醒一個執行緒
         * 如果有任意一個執行緒正在等待這個條件,那麼選中其中的一個區喚醒。
         * 在從等待狀態被喚醒之前,被選中的執行緒必須重新獲得鎖
         */
        notEmpty.signal();
    }

    /**
     * i+1,陣列下標+1
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

程式碼非常簡單,流程看註釋即可,只有一點注意點:

  • 在插入元素結束後,喚醒等待notEmpty條件(即獲取元素)的執行緒,可以發現這類似於生產者-消費者模式

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在隊尾插入一個元素,,如果陣列已滿,則進入等待,直到出現以下三種情況:

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷

使用方法:

        try {
            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 在隊尾插入一個元素,
     * 如果陣列已滿,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前執行緒被中斷
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {

        if (e == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);//將超時時間轉換為納秒
        final ReentrantLock lock = this.lock;
        /*
         * lockInterruptibly():
         * 1、 在當前執行緒沒有被中斷的情況下獲取鎖。
         * 2、如果獲取成功,方法結束。
         * 3、如果鎖無法獲取,當前執行緒被阻塞,直到下面情況發生:
         * 1)當前執行緒(被喚醒後)成功獲取鎖
         * 2)當前執行緒被其他執行緒中斷
         * 
         * lock()
         * 獲取鎖,如果鎖無法獲取,當前執行緒被阻塞,直到鎖可以獲取並獲取成功為止。
         */
        lock.lockInterruptibly();//加可中斷的鎖
        try {
            for (;;) {
                if (count != items.length) {//佇列未滿
                    insert(e);
                    return true;
                }
                if (nanos <= 0)//已超時
                    return false;
                try {
                    /*
                     * 進行等待:
                     * 在這個過程中可能發生三件事:
                     * 1、被喚醒-->繼續當前這個for(;;)迴圈
                     * 2、超時-->繼續當前這個for(;;)迴圈
                     * 3、被中斷-->之後直接執行catch部分的程式碼
                     */
                    nanos = notFull.awaitNanos(nanos);//進行等待(在此過程中,時間會流失,在此過程中,執行緒也可能被喚醒)
                } catch (InterruptedException ie) {//在等待的過程中執行緒被中斷
                    notFull.signal(); // 喚醒其他未被中斷的執行緒
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }

注意:

  • awaitNanos(nanos)是AQS中的一個方法,這裡就不詳細說了,有興趣的自己去檢視AQS的原始碼。

  • lockInterruptibly()與lock()的區別見註釋



免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 Google guava cache原始碼解析1--構建快取器(1)
【推薦】 利用jstack定位典型效能問題例項