1. 程式人生 > >Java併發基礎-併發工具類(二)

Java併發基礎-併發工具類(二)

併發工具類

本系列文章主要講解Java併發相關的內容,包括同步、鎖、訊號量、阻塞佇列、執行緒池等,整體思維導圖如下:

系列文章列表:

本文主要以例項講解Semaphore、阻塞佇列等內容。

Semaphore

基本概念和用途

Semaphore常稱訊號量,其維護了一個許可集,可以用來控制執行緒併發數。執行緒呼叫acquire()方法去或者許可證,然後執行相關任務,任務完成後,呼叫release()方法釋放該許可證,讓其他阻塞的執行緒可以執行。 Semaphore可以用於流量控制,尤其是一些公共資源有限的場景,比如資料庫連線。假設我們上面的賬戶餘額管理中的賬戶修改操作涉及到去更改mysql資料庫,為了避免資料庫併發太大,我們進行相關限制。 常用方法

Semaphore(int permits):構造方法,初始化許可證數量 void acquire():獲取許可證 void release():釋放許可證 int availablePermits() :返回此訊號量中當前可用的許可證數。 int getQueueLength():返回正在等待獲取許可證的執行緒數。 boolean hasQueuedThreads() :是否有執行緒正在等待獲取許可證。 void reducePermits(int reduction) :減少reduction個許可證。是個protected方法。 Collection getQueuedThreads()
:返回所有等待獲取許可證的執行緒集合。是個protected方法。

執行示例

雖然在程式碼中設定了20個執行緒去執行,但同時設定了許可證的數量為5,因而實際的最大併發數還是5

package com.aidodoo.java.concurrent;

import java.util.concurrent.*;

/**
 * Created by zhangkh on 2018/9/9.
 */
public class SemaphoreDemo {
    public static void main(String[] args){
        Semaphore semaphore=new Semaphore(5);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Account account=new Account();
        for(int i=0;i<20;i++){
            SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore);
            executorService.submit(spender);
        }

        executorService.shutdown();
    }
}
class SpenderWithSemaphore implements Runnable {
    private final Account account;
    private final Semaphore semaphore;

    public SpenderWithSemaphore(Account account, Semaphore semaphore) {
        this.account = account;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try{
            semaphore.acquire();
            System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000));
            Thread.sleep(2000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
//            System.out.println(String.format("%s release a premit",Thread.currentThread().getName()));
            semaphore.release();
        }
    }
}

獲取許可證後,模擬操作mysql,我們讓執行緒睡眠2秒,程式輸出如下:

pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql

可以看到前面5個執行緒同一時間1536480858獲得許可證,然後執行操作,並不是20個執行緒一起操作,這樣能降低對mysql資料庫的影響。 如果把上面Semaphore的構造方法中的許可證數量改為20,大家可以看到20個執行緒的執行時間基本一致。

原始碼實現

Semaphore實現直接基於AQS,有公平和非公平兩種模式。公平模式即按照呼叫acquire()的順序依次獲得許可證,遵循FIFO(先進先出),非公平模式是搶佔式的,誰先搶到先使用。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

獲取許可證 acquire()方法最終呼叫父類AQS中的acquireSharedInterruptibly方法。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)              //(1)
        doAcquireSharedInterruptibly(arg);      //(2)
}

(1):呼叫tryAcquireShared,嘗試去獲取許可證 (2):如果獲取失敗,則呼叫doAcquireSharedInterruptibly,將執行緒加入到等待佇列中 tryAcquireShared方法由Semaphore的內部類,同時也是AQS的子類去實現,即NonfairSyncFairSync,下面我們以NonfairSync為例說明其實現。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared方法如下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();             //(1)
        int remaining = available - acquires;   //(2)
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) (3)
            return remaining;
    }
}

(1):獲取state的值,也就是總許可證數量 (2):計算本次申請後,剩餘的許可證數量 (3):如果剩餘的許可證數量大於0且通過CASstate的值修改成功後,返回剩餘的許可證數量,否則繼續迴圈阻塞。

釋放許可證 release()方法的呼叫最終會呼叫父類AQSreleaseShared()方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {        //(1)
        doReleaseShared();              //(2)
        return true;
    }
    return false;
}

(1):嘗試釋放許可證 (2):如果釋放許可證成功,則通知阻塞的執行緒,讓其執行 tryReleaseShared方法很簡單,基本上是nonfairTryAcquireShared的逆過程,即增加許可證的數量,並通過CAS修改state的值。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

BlockingQueue

基本概念

阻塞佇列主要是解決如何高效安全傳輸資料的問題,此外能降低程式耦合度,讓程式碼邏輯更加清晰。 其繼承了Queue,並在其基礎上支援了兩個附加的操作:

  • 當佇列為空時,獲取元素的執行緒會阻塞,等待佇列變為非空
  • 當佇列滿時,新增元素的執行緒會阻塞,等待佇列可用

比較典型的使用場景是生產者和消費者。 BlockingQueue根據對於不能立即滿足但可能在將來某一時刻可以滿足的操作,提供了不同的處理方法,進而導致眾多的api操作:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()} not applicable not applicable

Throws exception:指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 Special value:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null Blocks:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。 Time out:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

整體架構和類圖

Java併發包根據不同的結構和功能提供了不同的阻塞佇列,整體類圖如下: 其中BlockingQueue有如下子類:

  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。

其中BlockingDeque有一個子類:

  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。 BlockingDeque作為雙端佇列,針對頭部元素,還提供瞭如下方法:
First Element (Head)
Throws exception Special value Blocks Times out
Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
Remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
Examine getFirst() peekFirst() not applicable not applicable

針對尾部元素

Last Element (Tail)
Throws exception Special value Blocks Times out
Insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
Remove removeLast() pollLast() takeLast() pollLast(time, unit)
Examine getLast() peekLast() not applicable not applicable

使用示例

一個典型的生產者和消費者例項如下,一個BlockingQueue可以安全地與多個生產者和消費者一起使用,Producer執行緒呼叫NumerGenerator.getNextNumber()生成自增整數,不斷地寫入數字,然後Consumer迴圈消費。

package com.aidodoo.java.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by zhangkh on 2018/7/17.
 */
public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new ArrayBlockingQueue(1024,true);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 5; i++) {
            executorService.submit(new Producer(queue));
        }
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Consumer(queue));
        }
        Thread.sleep(30 * 1000L);
        executorService.shutdown();
    }
}

class Producer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Producer.class.getName());
    protected BlockingQueue queue = null;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for(int i=0;i<3;i++){
                int num = NumerGenerator.getNextNumber();
                queue.put(num);
                Thread.sleep(1000);
                logger.info("{} producer put {}", Thread.currentThread().getName(), num);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


class Consumer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Consumer.class.getName());

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int ele = (int) queue.take();
                logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class NumerGenerator{
    private static AtomicInteger count = new AtomicInteger();
    public static Integer getNextNumber(){
        return count.incrementAndGet();
    }
}

程式輸出如下:

18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15

其他BlockingQueue子類的使用可參考對應的Java Api

原始碼分析

由於BlockingQueue相關的子類眾多,我們僅以ArrayBlockingQueue從原始碼角度分析相關實現。 構造方法 ArrayBlockingQueue中定義的成員變數如下:

final Object[] items; 
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null

各變數的解釋如下,以便了解後續的程式碼:

  • items用於儲存具體的元素
  • takeIndex元素索引,用於記錄下次獲取元素的位置
  • putIndex元素索引,用於記錄下次插入元素的位置
  • count用於記錄當前佇列中元素的個數
  • notEmpty條件變數,此處為獲取元素的條件,即佇列不能為空,否則執行緒阻塞
  • notFull條件變數,此處為插入元素的條件,即佇列不能已滿,否則執行緒阻塞
  • itrs用於維護迭代器相關內容

內部結構如下:

構造方法如下:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);  //(1)                               
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];      //(2)               
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();         //(3)
    notFull =  lock.newCondition();         //(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {                 //(5)
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

(1):預設情況下,非公平模式,即搶佔式 (2):陣列初始化 (3)/(4):條件變數初始化 (5):如果構造方法中,含有初始化集合的話,則將對應元素新增到內部陣列,並更改countputIndex的值。

插入資料 插入資料,我們主要看put()方法的實現,重點看生產者和消費者插入和獲取資料時,執行緒何時阻塞,同時又何時喚醒。

public void put(E e) throws InterruptedException {
    checkNotNull(e);                        //(1)
    final ReentrantLock lock = this.lock;   //(2)
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();                //(3)
        enqueue(e);
    } finally {
        lock.unlock();                      //(4)
    }
}

private void enqueue(E x) {
final Object[] items = this.items;
    items[putIndex] = x;                    //(5)
    if (++putIndex == items.length)         //(6)
        putIndex = 0;
    count++;                                //(7)
    notEmpty.signal();                      //(8)
}

(1):非空檢查,插入的元素不能為null,否則丟擲NullPointerException (2):獲取互斥鎖 (3):如果當前佇列的元素個數等於佇列總長度,即佇列已滿,則通過條件變數,釋放和notFull相關的鎖,當前執行緒阻塞。當前執行緒喚醒的條件如下:

  • 其他某個執行緒呼叫此 Conditionsignal() 方法,並且碰巧將當前執行緒選為被喚醒的執行緒;
  • 或者其他某個執行緒呼叫此 ConditionsignalAll() 方法;
  • 或者其他某個執行緒中斷當前執行緒,且支援中斷執行緒的掛起;
  • 或者發生“虛假喚醒”

(5):如果佇列未滿,則將元素新增的putIndex索引的位置 (6):putIndex增加1後和佇列長度相等,即已到達佇列尾部,則putIndex0 (7):佇列已有元素數量加1 (8):通知notEmpty條件變數,喚醒等待獲取元素的執行緒 (4):釋放互斥鎖 可以看到ArrayBlockingQueue每次插入元素後,都會去喚醒等待獲取元素的執行緒。

獲取資料 take()方法原始碼如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;   //(1)
    lock.lockInterruptibly();
    try {
        while (count == 0)                  
            notEmpty.await();               //(2)
        return dequeue();
    } finally {
        lock.unlock();                      //(9)
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];             //(3)
    items[takeIndex] = null;                //(4)
    if (++takeIndex == items.length)
        takeIndex = 0;                      //(5)
    count--;                                //(6)
    if (itrs != null)
        itrs.elementDequeued();             //(7)
    notFull.signal();                       //(8)
    return x;
}

(1):獲取互斥鎖 (2):如果count0,即佇列為空,則釋放互斥鎖,然後掛起當前執行緒 (3):根據takeIndex索引到陣列中獲取具體的值,並賦值給x (4):賦值完成後,takeIndex索引位置資料置null,便於回收 (5):takeIndex1,然後和佇列長度比較,如果相等,即已經讀取到佇列尾部,takeIndex0 (6):獲取後,將佇列元素個數count1 (7):維護和queue相關的迭代器 (8):喚醒等待插入元素的執行緒 (9):釋放互斥鎖 可以看到ArrayBlockingQueue每次獲取元素後,都會喚醒等待插入元素的執行緒。

迭代器 在分析原始碼前,我們先看在一個迭代器的示例

package com.aidodoo.java.concurrent;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Created by zhangkh on 2018/9/10.
 */
public class ArrayBlockingQueueIterDemo {
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<String> queue=new ArrayBlockingQueue(5);
            queue.put("hadoop");
            queue.put("spark");
            queue.put("storm");
            queue.put("flink");

            Iterator<String> iterator1 = queue.iterator();
            System.out.println( queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println();
            while(iterator1.hasNext()) {
                System.out.println(iterator1.next());
            }
            System.out.println();
            Iterator<String> iterator2 = queue.iterator();
            while(iterator2.hasNext()) {
                System.out.println(iterator2.next());
            }
        }
}

程式輸出如下:

hadoop
spark
storm

hadoop
flink

flink

我們結合這個示例來具體分析資料插入和獲取時,內部成員變數的值 當分別插入hadoopsparkstormflink四個元素後,內部變數的值如下: 此時,ArrayBlockingQueue的成員變數的值itrsnull。 呼叫iterator()方法後,原始碼如下:

public Iterator<E> iterator() {
    return new Itr();                   //(1)
}

Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();                            //(2)
try {
    if (count == 0) {                   //(3)
        cursor = NONE;
        nextIndex = NONE;
        prevTakeIndex = DETACHED;
    } else {
        final int takeIndex = ArrayBlockingQueue.this.takeIndex;
        prevTakeIndex = takeIndex;
        nextItem = itemAt(nextIndex = takeIndex);   //(4)
        cursor = incCursor(takeIndex);              //(5)
        if (itrs == null) {
            itrs = new Itrs(this);                  //(6)
        } else {
            itrs.register(this);                    //(7)
            itrs.doSomeSweeping(false);
        }
        prevCycles = itrs.cycles;
    }
} finally {
    lock.unlock();                                  //(8)
}

} (1):呼叫內部類Itr的構造方法 (2):獲取外部類即ArrayBlockingQueue的鎖 (3):沒有沒有元素,初始化變數值。內部類Itr的成員變數如下:

/** Index to look for new nextItem; NONE at end */
private int cursor;

/** Element to be returned by next call to next(); null if none */
private E nextItem;

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;

/** Last element returned; null if none or not detached. */
private E lastItem;

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;

/** Previous value of iters.cycles */
private int prevCycles;

(4):將外部類的takeIndex賦值給內部類nextIndex,並獲取陣列具體的值賦值給nextItem (5):計算遊標cursor的下個值,其中incCursor方法如下:

private int incCursor(int index) {
    // assert lock.getHoldCount() == 1;
    if (++index == items.length)
        index = 0;
    if (index == putIndex)
        index = NONE;
    return index;
}

(6):註冊,主要是維護連結串列 (7):清理itrs (8):釋放外部類的互斥鎖 在上面的示例中,呼叫iterator()方法後,Itr的內部變數值如下:

由於後面三次呼叫了queue.take(),依次輸出hadoopsparkstorm後,相關成員變數的值見圖片標識,重點關注takeIndex=3

當呼叫next()方法時,程式碼如下:

public E next() {
    final E x = nextItem;
    if (x == null)
        throw new NoSuchElementException();
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
        if (!isDetached())          //(1)
            incorporateDequeues();
        lastRet = nextIndex;
        final int cursor = this.cursor;
        if (cursor >= 0) {
            nextItem = itemAt(nextIndex = cursor);
            this.cursor = incCursor(cursor);
        } else {
            nextIndex = NONE;
            nextItem = null;
        }
    } finally {
        lock.unlock();
    }
    return x;
}

其中(1)處的isDetached方法如下

boolean isDetached() {
    // assert lock.getHoldCount() == 1;
    return prevTakeIndex < 0;
}

由於我們示例中初始化Itr的時候的prevTakeIndex0,故isDetached返回為false,程式將呼叫incorporateDequeues方法,根據註釋我們也知道,該方法主要是調整和迭代器相關的內部索引。

/**
 * Adjusts indices to incorporate all dequeues since the last
 * operation on this iterator.  Call only from iterating thread.
 */
private void incorporateDequeues() {
    final int cycles = itrs.cycles;
    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    final int prevCycles = this.prevCycles;
    final int prevTakeIndex = this.prevTakeIndex;

    if (cycles != prevCycles || takeIndex != prevTakeIndex) {
        final int len = items.length;
        // how far takeIndex has advanced since the previous
        // operation of this iterator
        long dequeues = (cycles - prevCycles) * len
            + (takeIndex - prevTakeIndex);

        // Check indices for invalidation
        if (invalidated(lastRet, prevTakeIndex, dequeues, len))
            lastRet = REMOVED;
        if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
            nextIndex = REMOVED;
        if (invalidated(cursor, prevTakeIndex, dequeues, len))
            cursor = takeIndex;

        if (cursor < 0 && nextIndex < 0 && lastRet < 0)
            detach();
        else {
            this.prevCycles = cycles;
            this.prevTakeIndex = takeIndex;
        }
    }
}

注意cursor = takeIndex這句程式碼,將外部內的takeIndex賦值給cursor,這樣子將佇列和迭代器資料讀取進行了同步。 對於iterator1,第一次呼叫next()方法時,cursor被賦值為3首先將nextItem的值保持在x變數中,即hadoop字串。 然後設定nextItemcursor的值

nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);

設定完成後,nextItemflink,cursor為-1。 最後返回儲存在x變數中的值,即返回hadoop字串。 第二次呼叫next()方法時,輸出的值即上次儲存的nextItem值,即flink字串。 迭代器執行過程中,相關變數內容如下: 至於iterator2迭代器,各位可以自己去分析,不再贅述。

本文主要以例項講解Semaphore、阻塞佇列,並分析了相關核心原始碼實現。

本文參考

關於作者 愛程式設計、愛鑽研、愛分享、愛生活 關注分散式、高併發、資料探勘 如需捐贈,請掃碼