Java 8 與併發(二)
一、並行流與並行排序
Java 8中可以在介面不變的情況下,將流改為並行流,方便在多執行緒中進行集合中的資料處理。
1.1 使用並行流過濾資料
下面示例統計1~1000000內所有質數的數量。下面是一個判斷質數的函式:
public class PrimeUtil {
public static boolean isPrime(int number) {
int tmp = number;
if(tmp<2) {
return false;
}
for(int i=2; Math. sqrt(tmp)>=i; i++) {
if(tmp%i==0) {
return false;
}
}
return true;
}
}
接著,使用函數語言程式設計統計給定範圍內所有的質數:
IntStream.range(1,100000).filter(PrimeUtil::isPrime).count();
上述程式碼是序列的,將它改造成平行計算非常簡單,只需要將流並行化即可:
IntStream.range(1,100000).parallel.filter( PrimeUtil::isPrime).count();
上述程式碼中,首先parallel()方法得到一個並行流,接著,在並行流上進行過濾,此時,PrimeUtil.isPrime()函式會被多執行緒併發呼叫,應用於流中的所有元素。
1.2 從集合得到並行流
在函數語言程式設計中,可以從集合得到一個流或者並行流。下面這段程式碼試圖統計集合內所有學生的平均分:
List<Student> ss = new ArrayList<Student>();
double ave = ss.stream().mapToInt(s->s.score).average ().getAsDouble();
從集合物件List中,我們使用stream()方法可以得到一個流。如果希望將這段程式碼並行化,則可以使用parallelStream()函式。
double ave = ss.parallelStream().mapToInt(s->s.score).average().getAsDouble();
1.3 並行排序
在Java 8中,可以使用新增的Arrays.parallelSort()方法直接使用並行排序。
比如,可以這樣使用:
int[] arr = new int[1000000];
Arrays.parallelSort(arr);
除了並行排序外,Arrays中還增加了一些API用於陣列中的資料的賦值,比如:
public static void setAll(int[] arr, IntUnaryOperator generator)
這是一個函式式味道很濃的介面,它的第2個引數是一個函式式介面。如果想給陣列中每一個元素都附上一個隨機值,可以這麼做:
Random r = new Random();
Arrays.setAll(arr, r.nextInt());
以上過程是序列的。只要使用setAll()對應的並行版本,就可以將它執行在多個CPU上:
Random r = new Random();
Arrays.parallelSetAll(arr, r.nextInt());
二、增強的Future:CompletableFuture
CompletableFuture是Java 8新增的一個超大型工具類。它實現了Future介面,也實現了CompletionStage介面。CompletionStage介面擁有多達40種方法,是為了函數語言程式設計中的流式呼叫準備的。通過CompletionStage提供的介面,可以在一個執行結果上進行多次流式呼叫,以此可以得到最終結果。比如,可以在一個CompletionStage上進行如下呼叫:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println())
這一連串的呼叫就會挨個執行。
2.1 完成了就通知我
CompletableFuture和Future一樣,可以作為函式呼叫的契約。如果向CompletableFuture請求一個數據,如果資料還沒有準備好,請求執行緒就會等待。通過CompletableFuture,可以手動設定CompletableFuture的完成狀態。
//義了一個AskThread執行緒。它接收一個CompletableFuture作為其建構函式,
//它的任務是計算CompletableFuture表示的數字的平方,並將其列印。
public static class AskThread implements Runnable {
CompletableFuture<Integer> re = null;
public AskThread(CompletableFuture<Integer> re) {
this.re = re;
}
@Override
public void run() {
int myRe = 0;
try {
//此時阻塞,因為CompletableFuture中根本沒有它所需要的資料,整個CompletableFuture處於未完成狀態
myRe = re.get() * re.get();
} catch(Exception e) {
}
System.out.println(myRe);
}
}
public static void main(String[] args) throws InterruptedException {
//建立一個CompletableFuture物件例項,將這個物件例項傳遞給AskThread執行緒,並啟動這個執行緒
final CompletableFuture<Integer> future = new CompletableFuture<>();
new Thread(new AskThread(future)).start();
//模擬長時間的計算過程
Thread.sleep(1000);
//將最終資料載入CompletableFuture,並標記為完成狀態
//告知完成結果
future.complete(60);
}
2.2 非同步執行任務
通過CompletableFuture提供的進一步封裝,很容易實現Future模式那樣的非同步呼叫。比如:
public static Integer calc(Integer para) {
try {
//模擬一個長時間執行
Thread.sleep(1000);
} catch(InterruptedException e) {
}
return para*para;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final CompletableFuture<Integer> future = new CompletableFuture.supplyAsync(() -> calc(50));
System.out.println(future.get());
}
上述程式碼中,使用CompletableFuture.supplyAsync()方法構造一個CompletableFuture例項,在supplyAsync函式中,它會在一個新的執行緒中,執行傳入的引數。在這裡,它會執行calc()方法。而calc()方法的執行可能是比較慢的,但是這不影響CompletableFuture例項的構造速度,因此supplyAsync()會立即返回,它返回CompletableFuture物件例項就可以作為這次呼叫的契約,在將來任何場合,用於獲得最終的計算結果。最後一行程式碼試圖獲得calc()的計算結果,如果當前計算沒有完成,則呼叫get()方法的執行緒就會等待。
在CompletableFuture中,類似的工廠方法有以下幾個:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
其中supplyAsync()方法用於那些需要有返回值的場景,比如計算某個資料等。而runAsync()方法用於沒有返回值的場景,比如,僅僅是簡單地執行某一個非同步動作。
在這兩對方法中,都有一個方法可以接收一個Executor引數。這就使我們讓Supplier<U>
或者Runnable在指定的執行緒池中工作。如果不指定,則在預設的系統公共的ForkJoinPool.common執行緒池中執行(在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以獲得一個公共ForkJoin執行緒池。這個公共的執行緒池中的所有執行緒都是Daemon執行緒。這意味著如果主執行緒退出,這些執行緒無論是否執行完畢,都會退出系統)。
2.3 流式呼叫
CompletionStage的約40個介面是為函數語言程式設計做準備的。在這裡,看一下如何使用這些介面進行函式式的流式API呼叫:
public static Integer calc(Integer para) {
try {
//模擬一個長時間執行
Thread.sleep(1000);
} catch(InterruptedException e) {
}
return para*para;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
.thenApply((i)->Integer.toString(i))
.thenApply((str)->"\"" +str + "\"")
.thenAccept(System.out::println);
future.get();
}
上述程式碼中,使用supplyAsync()函式執行一個非同步任務。接著連續使用流式呼叫對任務的處理結果進行再加工,直到最後的結果輸出。
這裡,執行CompletableFuture.get()方法,目的是等待calc()函式執行完成。不過不進行這個等待呼叫,由於CompletableFuture非同步執行的緣故,主函式不等calc()方法執行完畢就會退出,隨著主執行緒的結束,所有的Daemon執行緒都會立即退出,從而導致calc()方法無法正常完成。
2.4 CompletableFuture中的異常處理
如果CompletableFuture在執行過程中遇到異常,我們可以用函數語言程式設計的風格處理這些異常。CompletableFuture提供了一個異常處理方法exceptionally():
public static Integer calc(Integer para) {
return para/0;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> calc(50))
//對當前的CompletableFuture進行異常處理
.exceptionally(ex-> {
System.out.println(ex.toString());
return 0;
})
.thenApply((i)->Integer.toString(i))
.thenApply((str)->"\"" +str + "\"")
.thenAccept(System.out::println);
future.get();
}
2.5 組合多個CompletableFuture
CompletableFuture還允許將多個CompletableFuture進行組合。一種方法是使用thenCompose(),它的簽名如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
一個CompletableFuture可以在執行完成後,將執行結果通過Function傳遞給下一個CompletionStage進行處理(Function介面返回新的CompletionStage例項):
public static Integer calc(Integer para) {
return para/2;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
.thenCompose((i)->CompletableFuture.supplyAsync(()->calc(i)))
.thenApply((str)->"\"" +str + "\"")
.thenAccept(System.out::println);
future.get();
}
上述程式碼中,將處理後的結果傳遞給thenCompose(),並進一步傳遞給後續新生成的CompletableFuture例項,以上程式碼的輸出如下:
"12"
另外一種組合多個CompletableFuture的方法是thenCombine(),它的簽名如下:
public <U,V> CompletableFuture<U> thenCombime
(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U,? extends V> fn)
方法thenCombime()首先完成當前CompletableFuture和other的執行。接著,將這兩者的執行結果傳遞給BiFunction(該介面接收兩個引數,並有一個返回值),並返回代表BiFunction例項的CompletableFuture物件:
public static Integer calc(Integer para) {
return para/2;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));
CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc25));
CompletableFuture<Void> future = intFuture.thenCombine(intFuture2, (i,j) -> (i+j))
.thenApply((str)->"\"" +str + "\"")
.thenAccept(System.out::println);
future.get();
}
上述程式碼中,首先生成兩個CompletableFuture例項,接著使用thenCombine()組合這兩個CompletableFuture,將兩者的執行結果進行累加,並將其累加結果轉為字串,並輸出,上述程式碼的輸出是:
"37"
三、讀寫鎖的改進:StampedLock
StamppedLock是Java 8中引入的一種新的鎖機制。讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全併發。但是,讀和寫之間依然是衝突的。讀鎖會完全阻塞寫鎖,它使用的依然是悲觀鎖的策略,如果有大量的讀執行緒,它也有可能引起寫執行緒的“飢餓”。而StampedLock提供了一種樂觀的讀策略。這種樂觀策略的鎖非常類似無鎖的操作,使得樂觀鎖完全不會阻塞寫執行緒。
3.1 StampedLock使用示例
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
//使用writeLock()函式可以申請寫鎖
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
//試圖嘗試一次樂觀讀
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
//判斷這個stamp是否在讀過程發生期間被修改過
if (!sl.validate(stamp)) {
//使用readLock()獲得悲觀的讀鎖
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
上述程式碼出自JDK的官方文件。它定義了一個Point類,內部有兩個元素x和y,表示點的座標。第3行定義了StampedLock鎖。第15行定義的distanceFromOrigin()方法是一個只讀方法,它只會讀取Point的x和y座標。在讀取時,首先使用了StampedLock.tryOptimisticRead()方法。這個方法表示試圖嘗試一次樂觀讀。它會返回一個類似於時間的郵戳整數stamp。這個stamp就可以作為這一次鎖獲取的憑證。
接著,在第17行,讀取x和y的值。當然,這時並不確定這個x和y是否是一致的(在讀取x的時候,可能其他執行緒改寫了y的值,使得currentX和currentY處於不一致的狀態)。因此,我們必須在18行,使用validate()方法,判斷這個stamp是否在讀過程發生期間被修改過。如果stamp沒有被修改過,則認為這次讀取的過程中,可能被其他執行緒改寫了資料,因此,有可能出現了髒讀。如果出現這種情況,我們可以像處理CAS操作那樣在一個死迴圈中一直使用樂觀讀,直到成功為止。
也可以升級鎖的級別。在本例中,我們升級樂觀鎖的級別,將樂觀鎖變為悲觀鎖。在第19行,當判斷樂觀讀失敗後,使用readLock()獲得悲觀的讀鎖,並進一步讀取資料。如果當前物件正在被修改,則讀鎖的申請可能導致執行緒掛起。
寫入的情況可以參考第5行定義的move()函式。使用writeLock()函式可以申請寫鎖。這裡的含義和讀寫鎖是類似的。
在退出臨界區時,不要忘記釋放寫鎖(第11行)或者讀鎖(第24行)。
3.2 StampedLock的小陷阱
StampedLock內部實現時,使用類似於CAS操作的死迴圈反覆嘗試的策略。在它掛起執行緒時,使用的是Unsafe.park()函式,而park()函式在遇到執行緒中斷時,會直接返回(不同於Thread.sleep(),它不會丟擲異常)。而在StampedLock的死迴圈邏輯中,沒有處理有關中斷的邏輯。因此,這就會導致阻塞在park()上的執行緒被中斷後,會再次進入迴圈。而當退出條件得不到滿足時,就會發生瘋狂佔用CPU的情況。下面演示了這個問題:
public class StampedLockCUPDemo {
static Thread[] holdCpuThreads = new Thread[3];
static final StampedLock lock = new StampedLock();
public static void main(String[] args) throws InterruptedException {
new Thread() {
public void run(){
long readLong = lock.writeLock();
LockSupport.parkNanos(6100000000L);
lock.unlockWrite(readLong);
}
}.start();
Thread.sleep(100);
for( int i = 0; i < 3; ++i) {
holdCpuThreads [i] = new Thread(new HoldCPUReadThread());
holdCpuThreads [i].start();
}
Thread.sleep(10000);
for(int i=0; i<3; i++) {
holdCpuThreads [i].interrupt();
}
}
private static class HoldCPUReadThread implements Runnable {
public void run() {
long lockr = lock.readLock();
System.out.println(Thread.currentThread().getName() + " get read lock");
lock.unlockRead(lockr);
}
}
}
在上述程式碼中,首先開啟執行緒佔用寫鎖(第7行),為了演示效果,這裡使用寫執行緒不釋放鎖而一直等待。接著,開啟3個讀執行緒,讓它們請求讀鎖。此時,由於寫鎖的存在,所有讀執行緒都會被最終掛起。讀執行緒因為 park() 的操作進入了等待狀態,這種情況是正常的。
而在10秒鐘以後(程式碼在17行執行了10秒等待),系統中斷了這3個讀執行緒,之後,就會發現,CPU佔用率極有可能會飆升。這是因為中斷導致 park() 函式返回,使執行緒再次進入執行狀態。
此時,這個執行緒的狀態是RUNNABLE,這是我們不願意看到的,它會一直存在並耗盡CPU資源,直到自己搶佔到了鎖。
四、原子類的增強
無鎖的原子類操作使用系統的CAS指令,有著遠遠超越鎖的效能。在Java 8中引入了LongAddr類,這個類也在java.util.concurrent.atomic包下,因此,它也是使用了CAS指令。
4.1 更快的原子類:LongAddr
AtomicInteger的基本實現機制,它們都是在一個死迴圈內,不斷嘗試修改目標值,知道修改成功。如果競爭不激烈,那麼修改成功的概率就很高,否則,修改失敗的概率就很高。在大量修改失敗時,這些原子操作就會進行多次迴圈嘗試,因此效能會受到影響。
當競爭激烈的時候,為了進一步提高系統的效能,一種基本方案就是可以使用熱點分離,將競爭的資料進行分解,基於這個思路,可以想到一種對傳統AtomicInteger等原子類的改進方法。雖然在CAS操作中沒有鎖,但是像減小鎖粒度這種分離熱點的思想依然可以使用。一種可行的方案就是仿造ConcurrentHashMap,將熱點資料分離。比如,可以將AtomicInteger的內部核心資料value分離成一個數組,每個執行緒訪問時,通過雜湊等演算法對映到其中一個數字進行計