JAVA學習筆記(併發程式設計 - 柒)- J.U.C元件2
J.U.C-FutureTask
在Java中一般通過繼承Thread類或者實現Runnable介面這兩種方式來建立執行緒,但是這兩種方式都有個缺陷,就是不能在執行完成後獲取執行的結果,因此Java 1.5之後提供了Callable和Future介面,通過它們就可以在任務執行完畢之後得到任務的執行結果。
而FutureTask則是J.U.C中的類,但不是AQS的子類,FutureTask是一個可刪除的非同步計算類。這個類提供了Future介面的的基本實現,使用相關方法啟動和取消計算,查詢計算是否完成,並檢索計算結果。只有在計算完成時才能使用get方法檢索結果;如果計算尚未完成,get方法將會阻塞。一旦計算完成,計算就不能重新啟動或取消(除非使用runAndReset方法呼叫計算)。
Runnable與Callable以及Future介面對比:
Runnable是一個介面,在它裡面只聲明瞭一個run()方法。由於run()方法返回值為void型別,所以在執行完任務之後無法返回任何結果:
public interface Runnable {
public abstract void run();
}
Callable介面也只聲明瞭一個方法,這個方法叫做call()。Callable介面定義如下:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看到Callable是個泛型介面,泛型V就是要call()方法返回的型別。Callable介面和Runnable介面很像,都可以被另外一個執行緒執行,但是正如前面所說的,Runnable不會返回資料也不能丟擲異常。
Future也是一個介面,Future介面代表非同步計算的結果,通過Future介面提供的方法可以檢視非同步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還可以取消執行。說白了Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成以及獲取執行結果。其中執行結果通過get方法獲取,該方法會阻塞直到任務返回結果。Future介面的定義如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future介面中聲明瞭5個方法,下面依次解釋每個方法的作用:
cancel()方法用來取消非同步任務的執行。如果非同步任務已經完成或者已經被取消,或者由於某些原因不能取消,則會返回false。如果任務還沒有被執行,則會返回true並且非同步任務不會被執行。如果任務已經開始執行了但是還沒有執行完成,若mayInterruptIfRunning為true,則會立即中斷執行任務的執行緒並返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執行執行緒。
isCanceled()方法用於判斷任務是否被取消,如果任務在結束(正常執行結束或者執行異常結束)前被取消則返回true,否則返回false。
isDone()方法用於判斷任務是否已經完成,如果完成則返回true,否則返回false。需要注意的是:任務執行過程中發生異常、任務被取消也屬於任務已完成,也會返回true。
get()方法用於獲取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成。如果任務被取消則會丟擲CancellationException異常,如果任務執行過程發生異常則會丟擲ExecutionException異常,如果阻塞等待過程中被中斷則會丟擲InterruptedException異常。
get(long timeout,Timeunit unit)是帶超時時間的get()版本,如果阻塞等待過程中超時則會丟擲TimeoutException異常。
綜上,Future主要提供了三種功能:
- 判斷任務是否完成;
- 能夠中斷任務;
- 能夠獲取任務執行結果。
因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了下面的FutureTask。FutureTask的父類是RunnableFuture,而RunnableFuture則繼承了Runnable和Future這兩個介面。所以由此可知,FutureTask最終也屬於是Callable型別的任務。如果往FutureTask的建構函式傳入Runnable的話,也會被轉換成Callable型別。
FutureTask繼承圖如下:
可以看到,FutureTask實現了RunnableFuture介面,則RunnableFuture介面繼承了Runnable介面和Future介面,所以FutureTask既能當做一個Runnable直接被Thread執行,也能作為Future用來得到Callable的計算結果。
使用場景:
假設有一個很費時的邏輯需要計算,並且需要返回計算的結果,但這個結果又不是馬上需要的。那麼這時就可以使用FutureTask,用另外一個執行緒去進行計算,而當前執行緒在得到這個計算結果之前,就可以去執行其他的操作,等到需要這個結果時再通過Future得到即可。
FutureTask有兩個構造器,支援傳入Callable和Runnable型別,在使用 Runnable 時,需要多指定一個返回結果型別:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
使用示例
1.Future基本使用示例:
@Slf4j
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
// 使用lambda建立callable任務,使用Future接收任務執行的結果
Future<String> future = executorService.submit(() -> {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
});
log.info("do something in main");
Thread.sleep(1000);
// 獲取執行結果
String result = future.get();
log.info("result: {}", result);
executorService.shutdown();
}
}
2.FutureTask基本使用示例:
@Slf4j
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
// 構建FutureTask例項,使用lambda建立callable任務
FutureTask<String> futureTask = new FutureTask<>(() -> {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
});
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(futureTask);
log.info("do something in main");
Thread.sleep(1000);
// 獲取執行結果
String result = futureTask.get();
log.info("result: {}", result);
executorService.shutdown();
}
}
從以上兩個示例可以看到,Future和FutureTask的使用方式是很相似的,畢竟FutureTask就是Future的一個實現。
J.U.C-ForkJoin
Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架,其思想和map-reduce非常類似。
我們再通過Fork和Join這兩個單詞來理解下Fork/Join框架,Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。Fork/Join的執行流程圖如下:
工作竊取演算法:
Fork/Join框架主要採用的是工作竊取(work-stealing)演算法,該演算法是指某個執行緒從其他佇列裡竊取任務來執行。工作竊取的執行流程圖如下:
那麼為什麼需要使用工作竊取演算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應,比如A執行緒負責處理A佇列裡的任務。但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。
工作竊取演算法的優點是充分利用執行緒進行平行計算,並減少了執行緒間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且消耗了更多的系統資源,比如建立多個執行緒和多個雙端佇列。
所以對於Fork/Join框架而言,當一個任務正在等待它使用join操作建立的子任務的結束時,執行這個任務的執行緒(工作執行緒)查詢其他未被執行的任務並開始它的執行。通過這種方式,執行緒充分利用它們的執行時間,從而提高了應用程式的效能。
為實現這個目標,Fork/Join框架執行的任務有以下侷限性:
- 任務只能使用fork()和join()操作,作為同步機制。如果使用其他同步機制,工作執行緒不能執行其他任務,當它們在同步操作時。比如,在Fork/Join框架中,你使任務進入睡眠,那麼在這睡眠期間內,正在執行這個任務的工作執行緒將不會執行其他任務。
- 任務不應該執行I/O操作,如讀或寫資料檔案。
- 任務不能丟擲檢查異常,它必須包括必要的程式碼來處理它們。
Fork/Join框架的核心主要是以下兩個類:
- ForkJoinPool:它實現ExecutorService介面和work-stealing演算法。它管理工作執行緒和提供關於任務的狀態和它們執行的資訊。
- ForkJoinTask: 它是將在ForkJoinPool中執行的任務的基類。它提供在任務中執行fork()和join()操作的機制,並且這兩個方法控制任務的狀態。通常, 為了實現你的Fork/Join任務,你將實現兩個子類的子類的類:RecursiveAction對於沒有返回結果的任務和RecursiveTask 對於返回結果的任務。
Fork/Join使用示例,完成1+2+3+4…+n的計算,程式碼如下:
package org.zero.concurrency.demo.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
private ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任務足夠小就直接計算任務
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大於閾值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待任務執行結束合併其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合併子任務
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務,計算1+2+3+4...+100
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//執行一個任務
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
J.U.C-BlockingQueue
在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題,從名字也可以知道它是執行緒安全的。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。
首先,最基本的來說, BlockingQueue 是一個先進先出的佇列(Queue),為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支援當獲取佇列元素但是佇列為空時,會阻塞等待佇列中有元素再返回;也支援新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。所以 BlockingQueue 主要應用於生產者消費者場景。
BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。
BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用,總結如下表:
- | Throws exception | Special value | Blocks | Times out |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Insert | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
說明:
- Throws Exceptions :如果不能立即執行就丟擲異常
- Special Value:如果不能立即執行就返回一個特殊的值(null 或 true/false,取決於具體的操作)
- Blocks:如果不能立即執行就阻塞等待此操作,直到這個操作成功
- Times Out:如果不能立即執行就阻塞一段時間,直到成功或者超時指定時間
BlockingQueue 的實現類:
ArrayBlockingQueue:它是一個有界的阻塞佇列,內部實現是陣列,需在初始化時指定容量大小,一旦指定大小就不能再變。採用FIFO方式儲存元素:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
...
}
DelayQueue:阻塞內部元素,DelayQueue內部元素必須實現Delayed介面,Delayed介面又繼承了Comparable介面,原因在於DelayQueue內部元素需要排序,一般情況下按元素過期時間優先順序排序:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
DalayQueue內部採用PriorityQueue與ReentrantLock實現:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
...
}
LinkedBlockingQueue:使用獨佔鎖實現的阻塞佇列,大小配置可選,如果初始化時指定了大小,那麼它就是有邊界的。不指定就無邊界(最大整型值)。內部實現是連結串列,採用FIFO形式儲存資料。
public LinkedBlockingQueue() {
// 不指定大小,無邊界採用預設值,最大整型值
this(Integer.MAX_VALUE);
}
PriorityBlockingQueue:帶優先順序的×××阻塞佇列,無邊界佇列,允許插入null。插入的物件必須實現Comparator介面,佇列優先順序的排序規則就是按照我們對Comparable介面的實現來指定的。我們可以從PriorityBlockingQueue中獲取一個迭代器,但這個迭代器並不保證能按照優先順序的順序進行迭代:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
//必須實現Comparator介面
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
...
}
SynchronousQueue:同步阻塞佇列,只能插入一個元素,×××非快取佇列,不儲存元素。其內部並沒有資料快取空間,你不能呼叫peek()方法來看佇列中是否有資料元素,當然遍歷這個佇列的操作也是不允許的:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
...
}