1. 程式人生 > >Java8函數語言程式設計3-並行與併發

Java8函數語言程式設計3-並行與併發

如果將程式比作馬拉車,那麼並行可以看做試圖用很多馬拉同一輛車,以提高車速,而單純的併發則是一匹馬同時拉很多車,要做到每輛車兼顧且不翻車。處理很多馬拉很多車的問題,可以看作是處理並行與併發的問題。在Java8函數語言程式設計的支援下,如何處理並行與併發問題呢?

一、資料並行化

1、並行化流操作

對於流操作,支援並行化只需要多呼叫一個方法parallelStream。

public int parallelArraySum() {
    return albums.parallelStream()
                 .flatMap(Album::getTracks)
                 .mapToInt(Track::getLength)
                 .sum();
}

在知道這個方法之後,我們都會傾向於在所有的流方法後加上這個方法,以體現程式碼的“並行化”特徵。然而,在實際運用中如果資料量較少(100?),使用該方法會嚴重降低效率,只有在處理大量資料時,該方法再能發揮出並行的優勢。
使用蒙特卡洛模擬法並行化模擬擲骰子事件:

public Map<Integer, Double> parallelDiceRolls() {
    double fraction = 1.0 / N;
    return IntStream.range(0, N) 
                    .parallel() 
                    .mapToObj(twoDiceThrows()) 
                    .collect(groupingBy(side -> side, 
                             summingDouble(n -> fraction))); 
}

使用流的並行化方法確實可以大量減少程式碼行數。

2、限制

呼叫流的並行方法有如下限制:
(1)Reduce方法的限制:Reduce的初始值必須是組合函式的恆等值。說人話就是初始值與其他值做reduce操作時,結果等於其他值。此外,reduce操作必須符合結合律
(2)避免持有鎖。流的並行方法會自行加鎖,不要自找麻煩。

3、並行化陣列操作

Java8引入了一些針對陣列的並行化操作,這些方法新增在Arrays中。
ArrayParallel
比如,建立一個數組並初始化陣列中的元素:

public static double[] parallelInitialize(int size) {
    double
[] values = new double[size]; Arrays.parallelSetAll(values, i -> i); return values; }

二、編寫併發程式

1、非阻塞式IO

通常情況下,我們的應用程式是這樣工作的:使用者與伺服器建立TCP連線,伺服器呼叫方法向用戶傳輸資料,這個方法會阻塞當前執行緒。這樣的方法叫做阻塞式IO。阻塞式IO的缺點是當有大量使用者時,使用者會和伺服器建立大量連線,擴充套件性不是很好。
非阻塞式IO,又叫非同步IO,對於讀寫的呼叫立即返回,真正讀寫的操作在另一個執行緒完成,這樣就可以同時執行其他任務。

2、回撥

public class ChatVerticle extends Verticle {
    public void start() {
        vertx.createNetServer()
            .connectHandler(socket -> {
                container.logger().info("socket connected");
                socket.dataHandler(new User(socket, this));
             }).listen(10_000);
        container.logger().info("ChatVerticle started");
    }
}

為connectHandler輸入一個lambda表示式,當有使用者連線時會呼叫該表示式,這是一個回撥。使用回撥的好處是不需要手動管理執行緒,Vert.x自動幫助我們管理執行緒。

3、訊息傳遞

在Vert.x中,我們通過事件匯流排傳遞訊息。
首先需要註冊一個回撥來寫入訊息,registerHandler方法將一個程式和一個地址相關聯,有訊息發給該地址時,就將之作為引數傳遞給處理程式。

eventBus.registerHandler(user, (Message<String> msg) -> {
    sendClient(msg.body());
});

eventBus是Vert.x的事件匯流排,它允許在verticle物件之間以非阻塞式IO的方式傳遞訊息。
當想把訊息傳送給某一個使用者時,可以使用代表那個使用者的地址傳送訊息。

eventBus.send(user, name.get() +‘ >’ + message);

使用publish方法可以實現群發功能。

private void broadcastMessage(String message) {
    String name = this.name.get();
    eventBus.publish(name + ".followers", name +‘ >’ + message);
}

verticle物件向事件匯流排傳送訊息通訊,不需要新增鎖或synchronized關鍵字。
對於併發問題,我現在並沒有解決這類問題的實際經驗,因此還不能完全理解java對這類問題多出的支援,還是要繼續提高姿勢水平。

參考並建議閱讀《Java 8函數語言程式設計》