1. 程式人生 > >LinkedBlockingQueue原始碼解析(2)

LinkedBlockingQueue原始碼解析(2)

此文已由作者趙計剛授權網易雲社群釋出。

歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。


3.3、public void put(E e) throws InterruptedException

原理:

  • 在隊尾插入一個元素,如果佇列滿了,一直阻塞,直到佇列不滿了或者執行緒被中斷

使用方法:

        try {
            abq.put("hello1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 在隊尾插一個元素
     * 如果佇列滿了,一直阻塞,直到佇列不滿了或者執行緒被中斷
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;//入隊鎖
        final AtomicInteger count = this.count;//當前佇列中的元素個數
        putLock.lockInterruptibly();//加鎖
        try {
            while (count.get() == capacity) {//如果佇列滿了 
                /*
                 * 加入notFull等待佇列,直到佇列元素不滿了,
                 * 被其他執行緒使用notFull.signal()喚醒
                 */
                notFull.await();
            }
            enqueue(e);//入隊
            c = count.getAndIncrement();//入隊數量+1
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

 

4、出隊

4.1、public E poll()

原理:

  • 如果沒有元素,直接返回null;如果有元素,出隊

使用方法:

abq.poll();

原始碼:

    /**
     * 出隊: 
     * 1、如果沒有元素,直接返回null 
     * 2、如果有元素,出隊
     */
    public E poll() {
        final AtomicInteger count = this.count;// 獲取元素數量
        if (count.get() == 0)// 沒有元素
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();// 獲取出隊鎖
        try {
            if (count.get() > 0) {// 有元素
                x = dequeue();// 出隊
                // 元素個數-1(注意:該方法是一個無限迴圈,直到減1成功為止,且返回舊值)
                c = count.getAndDecrement();
                if (c > 1)// 還有元素(如果舊值c==1的話,那麼通過上邊的操作之後,佇列就空了)
                    notEmpty.signal();// 喚醒等待在notEmpty佇列中的其中一條執行緒
            }
        } finally {
            takeLock.unlock();// 釋放出隊鎖
        }
        if (c == capacity)// c == capacity是怎麼發生的?如果佇列是一個滿佇列,注意:上邊的c返回的是舊值
            signalNotFull();
        return x;
    }

    /**
     * 從佇列頭部移除一個節點
     */
    private E dequeue() {
        Node<E> h = head;//獲取頭節點:x==null
        Node<E> first = h.next;//將頭節點的下一個節點賦值給first
        h.next = h; // 將當前將要出隊的節點置null(為了使其做head節點做準備)
        head = first;//將當前將要出隊的節點作為了頭節點
        E x = first.item;//獲取出隊節點的值
        first.item = null;//將出隊節點的值置空
        return x;
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

注意:出隊邏輯如果不懂,檢視最後總結部分的圖

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 從隊頭刪除一個元素,如果佇列不空,出隊;如果佇列已空且已經超時,返回null;如果佇列已空且時間未超時,則進入等待,直到出現以下三種情況:

    • 被喚醒

    • 等待時間超時

    • 當前執行緒被中斷

使用方法:

        try {
            abq.poll(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

原始碼:

    /**
     * 從佇列頭部刪除一個元素,
     * 如果佇列不空,出隊;
     * 如果佇列已空,判斷時間是否超時,如果已經超時,返回null
     * 如果佇列已空且時間未超時,則進入等待,直到出現以下三種情況:
     * 1、被喚醒
     * 2、等待時間超時
     * 3、當前執行緒被中斷
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {//如果佇列沒有元素
                if (nanos <= 0)//已經超時
                    return null;
                /*
                 * 進行等待:
                 * 在這個過程中可能發生三件事:
                 * 1、被喚醒-->繼續當前這個while迴圈
                 * 2、超時-->繼續當前這個while迴圈
                 * 3、被中斷-->丟擲異常
                 */
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//出隊
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 


免費領取驗證碼、內容安全、簡訊傳送、直播點播體驗包及雲伺服器等套餐

更多網易技術、產品、運營經驗分享請點選


相關文章:
【推薦】 Foxman, 基於微核架構的 Mock 解決方案
【推薦】 Spring快取穿透問題修復
【推薦】 windows系統下npm升級的正確姿勢以及原理