1. 程式人生 > >Java 8 與併發(二)

Java 8 與併發(二)

一、並行流與並行排序

Java 8中可以在介面不變的情況下,將流改為並行流,方便在多執行緒中進行集合中的資料處理。

1.1 使用並行流過濾資料

下面示例統計1~1000000內所有質數的數量。下面是一個判斷質數的函式:

public class PrimeUtil {
    public static boolean isPrime(int number) {
        int tmp = number;
        if(tmp<2) {
            return false;
        }
        for(int i=2; Math.
sqrt(tmp)>=i; i++) { if(tmp%i==0) { return false; } } return true; } }

接著,使用函數語言程式設計統計給定範圍內所有的質數:

IntStream.range(1,100000).filter(PrimeUtil::isPrime).count();

上述程式碼是序列的,將它改造成平行計算非常簡單,只需要將流並行化即可:

IntStream.range(1,100000).parallel.filter(
PrimeUtil::isPrime).count();

上述程式碼中,首先parallel()方法得到一個並行流,接著,在並行流上進行過濾,此時,PrimeUtil.isPrime()函式會被多執行緒併發呼叫,應用於流中的所有元素。

1.2 從集合得到並行流

在函數語言程式設計中,可以從集合得到一個流或者並行流。下面這段程式碼試圖統計集合內所有學生的平均分:

List<Student> ss = new ArrayList<Student>();
double ave = ss.stream().mapToInt(s->s.score).average
().getAsDouble();

從集合物件List中,我們使用stream()方法可以得到一個流。如果希望將這段程式碼並行化,則可以使用parallelStream()函式。

double ave = ss.parallelStream().mapToInt(s->s.score).average().getAsDouble();

1.3 並行排序

在Java 8中,可以使用新增的Arrays.parallelSort()方法直接使用並行排序。
比如,可以這樣使用:

int[] arr = new int[1000000];
Arrays.parallelSort(arr);

除了並行排序外,Arrays中還增加了一些API用於陣列中的資料的賦值,比如:

public static void setAll(int[] arr, IntUnaryOperator generator)

這是一個函式式味道很濃的介面,它的第2個引數是一個函式式介面。如果想給陣列中每一個元素都附上一個隨機值,可以這麼做:

Random r = new Random();
Arrays.setAll(arr, r.nextInt());

以上過程是序列的。只要使用setAll()對應的並行版本,就可以將它執行在多個CPU上:

Random r = new Random();
Arrays.parallelSetAll(arr, r.nextInt());

二、增強的Future:CompletableFuture

CompletableFuture是Java 8新增的一個超大型工具類。它實現了Future介面,也實現了CompletionStage介面。CompletionStage介面擁有多達40種方法,是為了函數語言程式設計中的流式呼叫準備的。通過CompletionStage提供的介面,可以在一個執行結果上進行多次流式呼叫,以此可以得到最終結果。比如,可以在一個CompletionStage上進行如下呼叫:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println())

這一連串的呼叫就會挨個執行。

2.1 完成了就通知我

CompletableFuture和Future一樣,可以作為函式呼叫的契約。如果向CompletableFuture請求一個數據,如果資料還沒有準備好,請求執行緒就會等待。通過CompletableFuture,可以手動設定CompletableFuture的完成狀態。

//義了一個AskThread執行緒。它接收一個CompletableFuture作為其建構函式,
//它的任務是計算CompletableFuture表示的數字的平方,並將其列印。
public static class AskThread implements Runnable {
    CompletableFuture<Integer> re = null;
    public AskThread(CompletableFuture<Integer> re) {
        this.re = re;
    }

    @Override
    public void run() {
        int myRe = 0;
        try {
            //此時阻塞,因為CompletableFuture中根本沒有它所需要的資料,整個CompletableFuture處於未完成狀態
            myRe = re.get() * re.get();
        } catch(Exception e) {
        }
        System.out.println(myRe);
    }
}

    public static void main(String[] args) throws InterruptedException {
        //建立一個CompletableFuture物件例項,將這個物件例項傳遞給AskThread執行緒,並啟動這個執行緒
        final CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(new AskThread(future)).start();
        //模擬長時間的計算過程
        Thread.sleep(1000);
        //將最終資料載入CompletableFuture,並標記為完成狀態
        //告知完成結果
        future.complete(60);
}

2.2 非同步執行任務

通過CompletableFuture提供的進一步封裝,很容易實現Future模式那樣的非同步呼叫。比如:

public static Integer calc(Integer para) {
    try {
        //模擬一個長時間執行
        Thread.sleep(1000);
    } catch(InterruptedException  e) {
    }
    return para*para;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final CompletableFuture<Integer> future = new CompletableFuture.supplyAsync(() -> calc(50));
    System.out.println(future.get());
}

上述程式碼中,使用CompletableFuture.supplyAsync()方法構造一個CompletableFuture例項,在supplyAsync函式中,它會在一個新的執行緒中,執行傳入的引數。在這裡,它會執行calc()方法。而calc()方法的執行可能是比較慢的,但是這不影響CompletableFuture例項的構造速度,因此supplyAsync()會立即返回,它返回CompletableFuture物件例項就可以作為這次呼叫的契約,在將來任何場合,用於獲得最終的計算結果。最後一行程式碼試圖獲得calc()的計算結果,如果當前計算沒有完成,則呼叫get()方法的執行緒就會等待。
在CompletableFuture中,類似的工廠方法有以下幾個:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

其中supplyAsync()方法用於那些需要有返回值的場景,比如計算某個資料等。而runAsync()方法用於沒有返回值的場景,比如,僅僅是簡單地執行某一個非同步動作。

在這兩對方法中,都有一個方法可以接收一個Executor引數。這就使我們讓Supplier<U>或者Runnable在指定的執行緒池中工作。如果不指定,則在預設的系統公共的ForkJoinPool.common執行緒池中執行(在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以獲得一個公共ForkJoin執行緒池。這個公共的執行緒池中的所有執行緒都是Daemon執行緒。這意味著如果主執行緒退出,這些執行緒無論是否執行完畢,都會退出系統)。

2.3 流式呼叫

CompletionStage的約40個介面是為函數語言程式設計做準備的。在這裡,看一下如何使用這些介面進行函式式的流式API呼叫:

public static Integer calc(Integer para) {
    try {
    //模擬一個長時間執行
    Thread.sleep(1000);
    } catch(InterruptedException  e) {
    }
    return para*para;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
        .thenApply((i)->Integer.toString(i))
        .thenApply((str)->"\"" +str + "\"")
        .thenAccept(System.out::println);
    future.get();
}

上述程式碼中,使用supplyAsync()函式執行一個非同步任務。接著連續使用流式呼叫對任務的處理結果進行再加工,直到最後的結果輸出。

這裡,執行CompletableFuture.get()方法,目的是等待calc()函式執行完成。不過不進行這個等待呼叫,由於CompletableFuture非同步執行的緣故,主函式不等calc()方法執行完畢就會退出,隨著主執行緒的結束,所有的Daemon執行緒都會立即退出,從而導致calc()方法無法正常完成。

2.4 CompletableFuture中的異常處理

如果CompletableFuture在執行過程中遇到異常,我們可以用函數語言程式設計的風格處理這些異常。CompletableFuture提供了一個異常處理方法exceptionally():

public static Integer calc(Integer para) {
        return para/0;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final CompletableFuture<Void> future = CompletableFuture
        .supplyAsync(() -> calc(50))
        //對當前的CompletableFuture進行異常處理
        .exceptionally(ex-> {
            System.out.println(ex.toString());
            return 0;
        })
        .thenApply((i)->Integer.toString(i))
        .thenApply((str)->"\"" +str + "\"")
        .thenAccept(System.out::println);
    future.get();
}

2.5 組合多個CompletableFuture

CompletableFuture還允許將多個CompletableFuture進行組合。一種方法是使用thenCompose(),它的簽名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

一個CompletableFuture可以在執行完成後,將執行結果通過Function傳遞給下一個CompletionStage進行處理(Function介面返回新的CompletionStage例項):

public static Integer calc(Integer para) {
    return para/2;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
        .thenCompose((i)->CompletableFuture.supplyAsync(()->calc(i)))
        .thenApply((str)->"\"" +str + "\"")
        .thenAccept(System.out::println);
    future.get();
}

上述程式碼中,將處理後的結果傳遞給thenCompose(),並進一步傳遞給後續新生成的CompletableFuture例項,以上程式碼的輸出如下:

"12"

另外一種組合多個CompletableFuture的方法是thenCombine(),它的簽名如下:

public <U,V> CompletableFuture<U> thenCombime
  (CompletionStage<? extends U> other,
  BiFunction<? super T, ? super U,? extends V> fn)

方法thenCombime()首先完成當前CompletableFuture和other的執行。接著,將這兩者的執行結果傳遞給BiFunction(該介面接收兩個引數,並有一個返回值),並返回代表BiFunction例項的CompletableFuture物件:

public static Integer calc(Integer para) {
    return para/2;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50));
    CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc25));
    CompletableFuture<Void> future = intFuture.thenCombine(intFuture2, (i,j) -> (i+j))
        .thenApply((str)->"\"" +str + "\"")
        .thenAccept(System.out::println);
    future.get();
}

上述程式碼中,首先生成兩個CompletableFuture例項,接著使用thenCombine()組合這兩個CompletableFuture,將兩者的執行結果進行累加,並將其累加結果轉為字串,並輸出,上述程式碼的輸出是:

"37"

三、讀寫鎖的改進:StampedLock

StamppedLock是Java 8中引入的一種新的鎖機制。讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全併發。但是,讀和寫之間依然是衝突的。讀鎖會完全阻塞寫鎖,它使用的依然是悲觀鎖的策略,如果有大量的讀執行緒,它也有可能引起寫執行緒的“飢餓”。而StampedLock提供了一種樂觀的讀策略。這種樂觀策略的鎖非常類似無鎖的操作,使得樂觀鎖完全不會阻塞寫執行緒。

3.1 StampedLock使用示例

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();

   void move(double deltaX, double deltaY) { // an exclusively locked method
     //使用writeLock()函式可以申請寫鎖
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }

   double distanceFromOrigin() { // A read-only method
     //試圖嘗試一次樂觀讀
     long stamp = sl.tryOptimisticRead();
     double currentX = x, currentY = y;
     //判斷這個stamp是否在讀過程發生期間被修改過
     if (!sl.validate(stamp)) {
        //使用readLock()獲得悲觀的讀鎖
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

上述程式碼出自JDK的官方文件。它定義了一個Point類,內部有兩個元素x和y,表示點的座標。第3行定義了StampedLock鎖。第15行定義的distanceFromOrigin()方法是一個只讀方法,它只會讀取Point的x和y座標。在讀取時,首先使用了StampedLock.tryOptimisticRead()方法。這個方法表示試圖嘗試一次樂觀讀。它會返回一個類似於時間的郵戳整數stamp。這個stamp就可以作為這一次鎖獲取的憑證。

接著,在第17行,讀取x和y的值。當然,這時並不確定這個x和y是否是一致的(在讀取x的時候,可能其他執行緒改寫了y的值,使得currentX和currentY處於不一致的狀態)。因此,我們必須在18行,使用validate()方法,判斷這個stamp是否在讀過程發生期間被修改過。如果stamp沒有被修改過,則認為這次讀取的過程中,可能被其他執行緒改寫了資料,因此,有可能出現了髒讀。如果出現這種情況,我們可以像處理CAS操作那樣在一個死迴圈中一直使用樂觀讀,直到成功為止。
也可以升級鎖的級別。在本例中,我們升級樂觀鎖的級別,將樂觀鎖變為悲觀鎖。在第19行,當判斷樂觀讀失敗後,使用readLock()獲得悲觀的讀鎖,並進一步讀取資料。如果當前物件正在被修改,則讀鎖的申請可能導致執行緒掛起。

寫入的情況可以參考第5行定義的move()函式。使用writeLock()函式可以申請寫鎖。這裡的含義和讀寫鎖是類似的。

在退出臨界區時,不要忘記釋放寫鎖(第11行)或者讀鎖(第24行)。

3.2 StampedLock的小陷阱

StampedLock內部實現時,使用類似於CAS操作的死迴圈反覆嘗試的策略。在它掛起執行緒時,使用的是Unsafe.park()函式,而park()函式在遇到執行緒中斷時,會直接返回(不同於Thread.sleep(),它不會丟擲異常)。而在StampedLock的死迴圈邏輯中,沒有處理有關中斷的邏輯。因此,這就會導致阻塞在park()上的執行緒被中斷後,會再次進入迴圈。而當退出條件得不到滿足時,就會發生瘋狂佔用CPU的情況。下面演示了這個問題:

public class StampedLockCUPDemo {
    static Thread[] holdCpuThreads = new Thread[3];
    static final StampedLock lock = new StampedLock();
    public static void main(String[] args) throws InterruptedException {
        new Thread() {
            public void run(){
                long readLong = lock.writeLock();
                LockSupport.parkNanos(6100000000L);
                lock.unlockWrite(readLong);
            }
   	}.start();
    	Thread.sleep(100);
    	for( int i = 0; i < 3; ++i) {
            holdCpuThreads [i] = new Thread(new HoldCPUReadThread());
            holdCpuThreads [i].start();
        }
        Thread.sleep(10000);
        for(int i=0; i<3; i++) {
            holdCpuThreads [i].interrupt();
        }
    }
    private static class HoldCPUReadThread implements Runnable {
        public void run() {
        long lockr = lock.readLock();
        System.out.println(Thread.currentThread().getName() + " get read lock");
        lock.unlockRead(lockr);
       }
    }
}

在上述程式碼中,首先開啟執行緒佔用寫鎖(第7行),為了演示效果,這裡使用寫執行緒不釋放鎖而一直等待。接著,開啟3個讀執行緒,讓它們請求讀鎖。此時,由於寫鎖的存在,所有讀執行緒都會被最終掛起。讀執行緒因為 park() 的操作進入了等待狀態,這種情況是正常的。

而在10秒鐘以後(程式碼在17行執行了10秒等待),系統中斷了這3個讀執行緒,之後,就會發現,CPU佔用率極有可能會飆升。這是因為中斷導致 park() 函式返回,使執行緒再次進入執行狀態。
此時,這個執行緒的狀態是RUNNABLE,這是我們不願意看到的,它會一直存在並耗盡CPU資源,直到自己搶佔到了鎖。

四、原子類的增強

無鎖的原子類操作使用系統的CAS指令,有著遠遠超越鎖的效能。在Java 8中引入了LongAddr類,這個類也在java.util.concurrent.atomic包下,因此,它也是使用了CAS指令。

4.1 更快的原子類:LongAddr

AtomicInteger的基本實現機制,它們都是在一個死迴圈內,不斷嘗試修改目標值,知道修改成功。如果競爭不激烈,那麼修改成功的概率就很高,否則,修改失敗的概率就很高。在大量修改失敗時,這些原子操作就會進行多次迴圈嘗試,因此效能會受到影響。

當競爭激烈的時候,為了進一步提高系統的效能,一種基本方案就是可以使用熱點分離,將競爭的資料進行分解,基於這個思路,可以想到一種對傳統AtomicInteger等原子類的改進方法。雖然在CAS操作中沒有鎖,但是像減小鎖粒度這種分離熱點的思想依然可以使用。一種可行的方案就是仿造ConcurrentHashMap,將熱點資料分離。比如,可以將AtomicInteger的內部核心資料value分離成一個數組,每個執行緒訪問時,通過雜湊等演算法對映到其中一個數字進行計