1. 程式人生 > >Java 8 Lambda(類庫篇——Streams API,Collector和並行)

Java 8 Lambda(類庫篇——Streams API,Collector和並行)

參考資料:

1、背景

自從lambda表示式成為Java語言的一部分之後,Java集合(Collections)API就面臨著大幅變化。為了不推到重來,所以對現有的API進行改進。

  • 為現有的介面(例如CollectionListStream)增加擴充套件方法;

  • 在類庫中增加新的(stream,即java.util.stream.Stream)抽象以便進行聚集(aggregation)操作;

  • 改造現有的型別使之可以提供流檢視(stream view);

  • 改造現有的型別使之可以容易的使用新的程式設計模式,這樣使用者就不必拋棄使用以久的類庫(並不是說集合API會常駐永存)。

  • 提供更加易用的並行(Parallelism)庫。

    供顯式(explicit)但非侵入(unobstrusive)的並行。

2、內部迭代和外部迭代

  • 目前的集合庫主要依賴於外部迭代:應用程式負責了做什麼和怎麼做

  • 外部迭代的迴圈是序列的,且集合框架無法對控制流程進行優化(排序、短路求值等)。

    外部迭代.png

  • 內部迭代,將流程控制權交給類庫:應用程式只負責做什麼,怎麼做交由類庫

    內部迭代.png

3、流(Stream)

  • 每個流代表一個值序列,流提供一系列常用的聚集操作,使得我們可以便捷的在它上面進行各種運算。

    java 7 實現:發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合

    List<Transaction> groceryTransactions = new Arraylist<>();

    for(Transaction t: transactions){

    if(t.getType() == Transaction.GROCERY){

    groceryTransactions.add(t);

    }

    }

    Collections.sort(groceryTransactions, new Comparator(){

    public int compare(Transaction t1, Transaction t2){

    return t2.getValue().compareTo(t1.getValue());

    }

    });

    List<Integer> transactionIds = new ArrayList<>();

    for(Transaction t: groceryTransactions){

    transactionsIds.add(t.getId());

    }

    java 8 的實現:發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合

    List<Integer> transactionsIds = transactions.parallelStream().

    filter(t -> t.getType() == Transaction.GROCERY).

    sorted(comparing(Transaction::getValue).reversed()).

    map(Transaction::getId).

    collect(toList());

  • 資料來源任何可以用Iterator描述的物件都可以成為流的資料來源,包括Collection型別、BufferedReader、Random、BitSet以及陣列(Arrays.stream());流中元素的遍歷順序與資料來源的遍歷順序一致。

  • 流的構成獲取一個數據源(source)→ 資料轉換(返回新的流)→執行操作獲取想要的結果。

    流管道的構成.png

  • 流的操作主要分為兩類:

    1. 中間(Intermediate):一個流可以後面跟隨零個或多個 intermediate 操作,每個操作返回一個新的流;這類操作都是惰性化的(lazy),並沒有真正開始流的遍歷。

      map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

    2. 終止(Terminal):一個流只能有一個 terminal 操作。Terminal 操作的執行,才會真正開始流的遍歷,並且會生成一個結果。

      forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

    3. 還有一類比較特別的操作Short-circuiting:用於處理操作一個無限大的流。

      anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

4、流和集合

  • 集合主要用來對其元素進行有效(effective)的管理和訪問(access),而流並不支援對其元素進行直接操作或直接訪問,只是通過宣告式操作在其上進行運算然後得到結果。

  • 無儲存:流並不儲存值;流的元素源自資料來源(可能是某個資料結構、生成函式或I/O通道等等),通過一系列計算步驟得到;

  • 天然的函式式風格(Functional in nature):對流的操作會產生一個結果,但流的資料來源不會被修改;

  • 惰性求值:多數流操作(包括過濾、對映、排序以及去重)都可以以惰性方式實現。這使得我們可以用一遍遍歷完成整個流水線操作,並可以用短路操作提供更高效的實現;

  • 無需上界(Bounds optional):不少問題都可以被表達為無限流(infinite stream):使用者不停地讀取流直到滿意的結果出現為止(比如說,列舉完美數這個操作可以被表達為在所有整數上進行過濾)。集合是有限的,但流不是(操作無限流時我們必需使用短路操作,以確保操作可以在有限時間內完成);

5、惰性

  • 假設我們現在要統計一個List<Person>裡面的男性個數

    1. publicint countMan(){  

    2. return personList.stream().filter(person -> person.getGender().equal(“男”)).count();  

    3. }  

  • Stream流處理的工作:獲取所有人;過濾“男性”;計數。

    • 我們最後呼叫count()就是獲取結果,相當於獲取新的List的size。實際上,Stream並沒有new一個新List。如果我們不呼叫count,那麼Stream什麼都不做。下面這段程式碼不會列印任何東西:

      1. publicvoid function(){  

      2.     personList.stream().filter(person -> {  

      3.         System.out.println(“Go!”);  

      4.         person.getGender().equal(“男”);  

      5.     });  

6、並行

  • java 8的Stream流的並行處理是以java 7推出的fork/join框架來處理的。

  • 流水線既可以序列執行也可以並行執行,並行或序列是流的屬性。

  • 雖然需要顯示的指定並行流,但是是非侵入式的(侵入式一般需要實現某個介面,重寫某個方法等),不需要開發人員手動實現並行程式碼。

    並行

    序列

    int sum = shapes.parallelStream()

    .filter(s -> s.getColor = BLUE)

    .mapToInt(s -> s.getWeight())

    .sum();

    int sum = shapes.stream()

    .filter(s -> s.getColor = BLUE)

    .mapToInt(s -> s.getWeight())

    .sum();

  • 因為並行的存在,所以stream流的操作應保證無干擾性:

    1. 不要干擾資料來源(這個條件和遍歷集合時所需的條件相似,如果集合在遍歷時被修改,絕大多數的集合實現都會丟擲ConcurrentModificationException)。

    2. 不要干擾其它lambda表示式,當一個lambda在修改某個可變狀態而另一個lambda在讀取該狀態時就會產生這種干擾(所以在lambda表示式只允許使用有效只讀的變數,對應用開放,對值封閉)。

7、應用示例

  • 遍歷所有宣告的方法,然後根據方法名稱、返回型別以及引數的數量和型別進行匹配。

不使用Stream流

for (Method method : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {

if (method.getName().equals(enclosingInfo.getName())) {

Class< ? >[] candidateParamClasses = method.getParameterTypes();

if (candidateParamClasses.length == parameterClasses.length) {

boolean matches = true;

for (int i = 0; i < candidateParamClasses.length; i += 1) {

if (!candidateParamClasses[i].equals(parameterClasses[i])) {

matches = false;

break;

}

}

if (matches) { // finally, check return type

if (method.getReturnType().equals(returnType)) {

return method;

}

}

}

}

}

thrownewInternalError("Enclosing method not found");

使用Stream流

returnArrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods())

.filter(m -> Objects.equal(m.getName(), enclosingInfo.getName()))

.filter(m -> Arrays.equal(m.getParameterTypes(), parameterClasses))

.filter(m -> Objects.equals(m.getReturnType(), returnType))

.findFirst()

.orElseThrow(() -> newInternalError("Enclosing method not found"));

  • 比較可以發現,Stream流不僅消除了所有的臨時變數,而且程式碼更加緊湊,可讀性更好,也不容易出錯。

  • 假設我們需要得到一個按名字排序的專輯列表,專輯列表裡面的每張專輯都至少包含一首四星及四星以上的音軌,為了構建這個專輯列表,我們可以這麼寫:

不使用Stream流:

List<Album> favs = new ArrayList<>(); 
        for (Album album : albums) { 
            boolean hasFavorite = false; 
            for (Track track : album.tracks) { 
                if (track.rating >= 4) { 
                    hasFavorite = true; 
                    break; 
                } 
            } 
            if (hasFavorite) 
                favs.add(album); 
        } 
        Collections.sort(favs, new Comparator<Album>() { 
            public int compare(Album a1, Album a2) { 
                return a1.name.compareTo(a2.name); 
            } 
        });

使用Stream流:

List<Album> sortedFavs = 
    albums.stream() 
          .filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4))) 
          .sorted(Comparator.comparing(a -> a.name)) 
          .collect(Collectors.toList());

8、收集器(Collectors)

  • 流通過.collect()方法返回一個集合,該方法接收一個型別為Collector的引數。

  • Collectors提供了大量常用的工廠方法,根據不同的入參返回不同的實現,有 toList()、toSet()、toMap(K,V)等,其中toMap方法還可以指定容器和解決key衝突方式。

  • groupingBy(Classify),還可以指定Map容器和收集器型別

    Map<Artist, List<Track>> favsByArtist = 
        tracks.stream() 
              .filter(t -> t.rating >= 4) 
              .collect(Collectors.groupingBy(t -> t.artist));
    Map<Artist, Set<Track>> favsByArtist = 
        tracks.stream() 
              .filter(t -> t.rating >= 4) 
              .collect(Collectors.groupingBy(t -> t.artist, 
                                             Collectors.toSet()));

9、並行的實質

  • 以java 7引入的Fork/Join模型為基礎實現Stream流的平行計算

  • Fork/Join的實現原理如下,為了能夠並行化任意流上的所有操作,我們把流抽象為Spliterator(支援順序依次訪問資料,也支援分解資料

    • 把問題分解為子問題;

    • 序列解決子問題從而得到部分結果(partial result);

    • 合併部分結果合為最終結果。

  • Spliterator與Iterator的用法區別在於,如果開發者還知道源的其他元資料時(資料大小),類庫就可以通過Spliterator提供一個更加高效的實現(就像JDK中所有的集合一樣)。

    Stream.iterate(0,n->n+3); 
    ArrayList arrayList = new ArrayList(); 
    Spliterator spliterator = arrayList.spliterator(); 
    Iterator iterator = arrayList.iterator(); 
    boolean parallel = true; 
    StreamSupport.stream(spliterator,parallel); 
    StreamSupport.stream(Spliterators.spliterator(iterator,arrayList.size(),Spliterator.DISTINCT),parallel);

10、比較器工廠(Comparator)

  • Comparator介面中新增了若干用於生成比較器的實用方法

  • 靜態方法Comparator.comparing()接收一個函式,返回一個Comparator。這種方式不僅簡潔了程式碼(無需匿名類),而且便於進行各種組合操作。

    //比較器反序
    people.sort(comparing(p -> p.getLastName()).reversed());
    //多條件排序
    Comparator<Person> c = Comparator.comparing(p -> p.getLastName()) 
                                     .thenComparing(p -> p.getFirstName()); 
    people.sort(c);