Java併發基礎-併發工具類(二)
併發工具類
本系列文章主要講解Java
併發相關的內容,包括同步、鎖、訊號量、阻塞佇列、執行緒池等,整體思維導圖如下:
系列文章列表:
本文主要以例項講解Semaphore
、阻塞佇列等內容。
Semaphore
基本概念和用途
Semaphore
常稱訊號量,其維護了一個許可集,可以用來控制執行緒併發數。執行緒呼叫acquire()
方法去或者許可證,然後執行相關任務,任務完成後,呼叫release()
方法釋放該許可證,讓其他阻塞的執行緒可以執行。
Semaphore
可以用於流量控制,尤其是一些公共資源有限的場景,比如資料庫連線。假設我們上面的賬戶餘額管理中的賬戶修改操作涉及到去更改mysql
資料庫,為了避免資料庫併發太大,我們進行相關限制。
常用方法
Semaphore
(int
permits
):構造方法,初始化許可證數量
void
acquire()
:獲取許可證
void
release()
:釋放許可證
int
availablePermits()
:返回此訊號量中當前可用的許可證數。
int
getQueueLength()
:返回正在等待獲取許可證的執行緒數。
boolean
hasQueuedThreads()
:是否有執行緒正在等待獲取許可證。
void
reducePermits
(int
reduction
) :減少reduction
個許可證。是個protected
方法。
Collection
getQueuedThreads()
protected
方法。
執行示例
雖然在程式碼中設定了20
個執行緒去執行,但同時設定了許可證的數量為5
,因而實際的最大併發數還是5
。
package com.aidodoo.java.concurrent; import java.util.concurrent.*; /** * Created by zhangkh on 2018/9/9. */ public class SemaphoreDemo { public static void main(String[] args){ Semaphore semaphore=new Semaphore(5); ExecutorService executorService = Executors.newFixedThreadPool(20); Account account=new Account(); for(int i=0;i<20;i++){ SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore); executorService.submit(spender); } executorService.shutdown(); } } class SpenderWithSemaphore implements Runnable { private final Account account; private final Semaphore semaphore; public SpenderWithSemaphore(Account account, Semaphore semaphore) { this.account = account; this.semaphore = semaphore; } @Override public void run() { try{ semaphore.acquire(); System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000)); Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); }finally { // System.out.println(String.format("%s release a premit",Thread.currentThread().getName())); semaphore.release(); } } }
獲取許可證後,模擬操作mysql
,我們讓執行緒睡眠2
秒,程式輸出如下:
pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql
可以看到前面5
個執行緒同一時間1536480858
獲得許可證,然後執行操作,並不是20
個執行緒一起操作,這樣能降低對mysql
資料庫的影響。
如果把上面Semaphore
的構造方法中的許可證數量改為20
,大家可以看到20
個執行緒的執行時間基本一致。
原始碼實現
Semaphore
實現直接基於AQS
,有公平和非公平兩種模式。公平模式即按照呼叫acquire()
的順序依次獲得許可證,遵循FIFO
(先進先出),非公平模式是搶佔式的,誰先搶到先使用。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
獲取許可證
acquire()
方法最終呼叫父類AQS
中的acquireSharedInterruptibly
方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //(1)
doAcquireSharedInterruptibly(arg); //(2)
}
(1
):呼叫tryAcquireShared
,嘗試去獲取許可證
(2
):如果獲取失敗,則呼叫doAcquireSharedInterruptibly
,將執行緒加入到等待佇列中
tryAcquireShared
方法由Semaphore
的內部類,同時也是AQS
的子類去實現,即NonfairSync
和FairSync
,下面我們以NonfairSync
為例說明其實現。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
而nonfairTryAcquireShared
方法如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //(1)
int remaining = available - acquires; //(2)
if (remaining < 0 ||
compareAndSetState(available, remaining)) (3)
return remaining;
}
}
(1
):獲取state
的值,也就是總許可證數量
(2
):計算本次申請後,剩餘的許可證數量
(3
):如果剩餘的許可證數量大於0
且通過CAS
將state
的值修改成功後,返回剩餘的許可證數量,否則繼續迴圈阻塞。
釋放許可證
release()
方法的呼叫最終會呼叫父類AQS
的releaseShared()
方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //(1)
doReleaseShared(); //(2)
return true;
}
return false;
}
(1
):嘗試釋放許可證
(2
):如果釋放許可證成功,則通知阻塞的執行緒,讓其執行
tryReleaseShared
方法很簡單,基本上是nonfairTryAcquireShared
的逆過程,即增加許可證的數量,並通過CAS
修改state
的值。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
BlockingQueue
基本概念
阻塞佇列主要是解決如何高效安全傳輸資料的問題,此外能降低程式耦合度,讓程式碼邏輯更加清晰。
其繼承了Queue
,並在其基礎上支援了兩個附加的操作:
- 當佇列為空時,獲取元素的執行緒會阻塞,等待佇列變為非空
- 當佇列滿時,新增元素的執行緒會阻塞,等待佇列可用
比較典型的使用場景是生產者和消費者。
BlockingQueue
根據對於不能立即滿足但可能在將來某一時刻可以滿足的操作,提供了不同的處理方法,進而導致眾多的api
操作:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek()} | not applicable | not applicable |
Throws
exception
:指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException
異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException
異常
Special
value
:插入方法會返回是否成功,成功則返回true
。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
Blocks
:當阻塞佇列滿時,如果生產者執行緒往佇列裡put
元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take
元素,佇列也會阻塞消費者執行緒,直到佇列可用。
Time
out
:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
整體架構和類圖
Java
併發包根據不同的結構和功能提供了不同的阻塞佇列,整體類圖如下:
其中BlockingQueue
有如下子類:
ArrayBlockingQueue
:一個由陣列結構組成的有界阻塞佇列。DelayQueue
:一個使用優先順序佇列實現的無界阻塞佇列。PriorityBlockingQueue
:一個支援優先順序排序的無界阻塞佇列。SynchronousQueue
:一個不儲存元素的阻塞佇列。LinkedBlockingQueue
:一個由連結串列結構組成的有界阻塞佇列。
其中BlockingDeque
有一個子類:
LinkedBlockingDeque
:一個由連結串列結構組成的雙向阻塞佇列。BlockingDeque
作為雙端佇列,針對頭部元素,還提供瞭如下方法:
First Element (Head) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
Remove | removeFirst() | pollFirst() | takeFirst() | pollFirst(time, unit) |
Examine | getFirst() | peekFirst() | not applicable | not applicable |
針對尾部元素
Last Element (Tail) | ||||
Throws exception | Special value | Blocks | Times out | |
Insert | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
Remove | removeLast() | pollLast() | takeLast() | pollLast(time, unit) |
Examine | getLast() | peekLast() | not applicable | not applicable |
使用示例
一個典型的生產者和消費者例項如下,一個BlockingQueue
可以安全地與多個生產者和消費者一起使用,Producer
執行緒呼叫NumerGenerator
.getNextNumber()
生成自增整數,不斷地寫入數字,然後Consumer
迴圈消費。
package com.aidodoo.java.concurrent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by zhangkh on 2018/7/17.
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(1024,true);
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 5; i++) {
executorService.submit(new Producer(queue));
}
for (int i = 0; i < 3; i++) {
executorService.submit(new Consumer(queue));
}
Thread.sleep(30 * 1000L);
executorService.shutdown();
}
}
class Producer implements Runnable {
Logger logger = LoggerFactory.getLogger(Producer.class.getName());
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for(int i=0;i<3;i++){
int num = NumerGenerator.getNextNumber();
queue.put(num);
Thread.sleep(1000);
logger.info("{} producer put {}", Thread.currentThread().getName(), num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
Logger logger = LoggerFactory.getLogger(Consumer.class.getName());
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
int ele = (int) queue.take();
logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NumerGenerator{
private static AtomicInteger count = new AtomicInteger();
public static Integer getNextNumber(){
return count.incrementAndGet();
}
}
程式輸出如下:
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15
其他BlockingQueue
子類的使用可參考對應的Java
Api
。
原始碼分析
由於BlockingQueue
相關的子類眾多,我們僅以ArrayBlockingQueue
從原始碼角度分析相關實現。
構造方法
ArrayBlockingQueue
中定義的成員變數如下:
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null
各變數的解釋如下,以便了解後續的程式碼:
items
用於儲存具體的元素takeIndex
元素索引,用於記錄下次獲取元素的位置putIndex
元素索引,用於記錄下次插入元素的位置count
用於記錄當前佇列中元素的個數notEmpty
條件變數,此處為獲取元素的條件,即佇列不能為空,否則執行緒阻塞notFull
條件變數,此處為插入元素的條件,即佇列不能已滿,否則執行緒阻塞itrs
用於維護迭代器相關內容
內部結構如下:
構造方法如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false); //(1)
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //(2)
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); //(3)
notFull = lock.newCondition(); //(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) { //(5)
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
(1
):預設情況下,非公平模式,即搶佔式
(2
):陣列初始化
(3
)/(4
):條件變數初始化
(5
):如果構造方法中,含有初始化集合的話,則將對應元素新增到內部陣列,並更改count
和putIndex
的值。
插入資料
插入資料,我們主要看put()
方法的實現,重點看生產者和消費者插入和獲取資料時,執行緒何時阻塞,同時又何時喚醒。
public void put(E e) throws InterruptedException {
checkNotNull(e); //(1)
final ReentrantLock lock = this.lock; //(2)
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //(3)
enqueue(e);
} finally {
lock.unlock(); //(4)
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; //(5)
if (++putIndex == items.length) //(6)
putIndex = 0;
count++; //(7)
notEmpty.signal(); //(8)
}
(1
):非空檢查,插入的元素不能為null
,否則丟擲NullPointerException
(2
):獲取互斥鎖
(3
):如果當前佇列的元素個數等於佇列總長度,即佇列已滿,則通過條件變數,釋放和notFull
相關的鎖,當前執行緒阻塞。當前執行緒喚醒的條件如下:
- 其他某個執行緒呼叫此
Condition
的signal()
方法,並且碰巧將當前執行緒選為被喚醒的執行緒; - 或者其他某個執行緒呼叫此
Condition
的signalAll()
方法; - 或者其他某個執行緒中斷當前執行緒,且支援中斷執行緒的掛起;
- 或者發生“虛假喚醒”
(5
):如果佇列未滿,則將元素新增的putIndex
索引的位置
(6
):putIndex
增加1
後和佇列長度相等,即已到達佇列尾部,則putIndex
置0
(7
):佇列已有元素數量加1
(8
):通知notEmpty
條件變數,喚醒等待獲取元素的執行緒
(4
):釋放互斥鎖
可以看到ArrayBlockingQueue
每次插入元素後,都會去喚醒等待獲取元素的執行緒。
獲取資料
take()
方法原始碼如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //(1)
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //(2)
return dequeue();
} finally {
lock.unlock(); //(9)
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //(3)
items[takeIndex] = null; //(4)
if (++takeIndex == items.length)
takeIndex = 0; //(5)
count--; //(6)
if (itrs != null)
itrs.elementDequeued(); //(7)
notFull.signal(); //(8)
return x;
}
(1
):獲取互斥鎖
(2
):如果count
為0
,即佇列為空,則釋放互斥鎖,然後掛起當前執行緒
(3
):根據takeIndex
索引到陣列中獲取具體的值,並賦值給x
(4
):賦值完成後,takeIndex
索引位置資料置null
,便於回收
(5
):takeIndex
加1
,然後和佇列長度比較,如果相等,即已經讀取到佇列尾部,takeIndex
置0
(6
):獲取後,將佇列元素個數count
減1
(7
):維護和queue
相關的迭代器
(8
):喚醒等待插入元素的執行緒
(9
):釋放互斥鎖
可以看到ArrayBlockingQueue
每次獲取元素後,都會喚醒等待插入元素的執行緒。
迭代器 在分析原始碼前,我們先看在一個迭代器的示例
package com.aidodoo.java.concurrent;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhangkh on 2018/9/10.
*/
public class ArrayBlockingQueueIterDemo {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> queue=new ArrayBlockingQueue(5);
queue.put("hadoop");
queue.put("spark");
queue.put("storm");
queue.put("flink");
Iterator<String> iterator1 = queue.iterator();
System.out.println( queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println();
while(iterator1.hasNext()) {
System.out.println(iterator1.next());
}
System.out.println();
Iterator<String> iterator2 = queue.iterator();
while(iterator2.hasNext()) {
System.out.println(iterator2.next());
}
}
}
程式輸出如下:
hadoop
spark
storm
hadoop
flink
flink
我們結合這個示例來具體分析資料插入和獲取時,內部成員變數的值
當分別插入hadoop
、spark
、storm
、flink
四個元素後,內部變數的值如下:
此時,ArrayBlockingQueue
的成員變數的值itrs
為null
。
呼叫iterator()
方法後,原始碼如下:
public Iterator<E> iterator() {
return new Itr(); //(1)
}
Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock(); //(2)
try {
if (count == 0) { //(3)
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex); //(4)
cursor = incCursor(takeIndex); //(5)
if (itrs == null) {
itrs = new Itrs(this); //(6)
} else {
itrs.register(this); //(7)
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock(); //(8)
}
}
(1
):呼叫內部類Itr
的構造方法
(2
):獲取外部類即ArrayBlockingQueue
的鎖
(3
):沒有沒有元素,初始化變數值。內部類Itr
的成員變數如下:
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
(4
):將外部類的takeIndex
賦值給內部類nextIndex
,並獲取陣列具體的值賦值給nextItem
(5
):計算遊標cursor
的下個值,其中incCursor
方法如下:
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
(6
):註冊,主要是維護連結串列
(7
):清理itrs
(8
):釋放外部類的互斥鎖
在上面的示例中,呼叫iterator()
方法後,Itr
的內部變數值如下:
由於後面三次呼叫了queue
.take()
,依次輸出hadoop
、spark
、storm
後,相關成員變數的值見圖片標識,重點關注takeIndex
=3
。
當呼叫next()
方法時,程式碼如下:
public E next() {
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached()) //(1)
incorporateDequeues();
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
其中(1
)處的isDetached
方法如下
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}
由於我們示例中初始化Itr
的時候的prevTakeIndex
為0
,故isDetached
返回為false
,程式將呼叫incorporateDequeues
方法,根據註釋我們也知道,該方法主要是調整和迭代器相關的內部索引。
/**
* Adjusts indices to incorporate all dequeues since the last
* operation on this iterator. Call only from iterating thread.
*/
private void incorporateDequeues() {
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// Check indices for invalidation
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
注意cursor
= takeIndex
這句程式碼,將外部內的takeIndex
賦值給cursor
,這樣子將佇列和迭代器資料讀取進行了同步。
對於iterator1
,第一次呼叫next()
方法時,cursor
被賦值為3
首先將nextItem
的值保持在x
變數中,即hadoop
字串。
然後設定nextItem
和cursor
的值
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
設定完成後,nextItem
為flink
,cursor
為-1
。
最後返回儲存在x
變數中的值,即返回hadoop
字串。
第二次呼叫next()
方法時,輸出的值即上次儲存的nextItem
值,即flink
字串。
迭代器執行過程中,相關變數內容如下:
至於iterator2
迭代器,各位可以自己去分析,不再贅述。
本文主要以例項講解Semaphore
、阻塞佇列,並分析了相關核心原始碼實現。
本文參考
關於作者 愛程式設計、愛鑽研、愛分享、愛生活 關注分散式、高併發、資料探勘 如需捐贈,請掃碼