Java 8 Lambda(類庫篇——Streams API,Collector和並行)
參考資料:
1、背景
自從lambda表示式成為Java語言的一部分之後,Java集合(Collections)API就面臨著大幅變化。為了不推到重來,所以對現有的API進行改進。
為現有的介面(例如Collection,List和Stream)增加擴充套件方法;
在類庫中增加新的流(stream,即java.util.stream.Stream)抽象以便進行聚集(aggregation)操作;
改造現有的型別使之可以提供流檢視(stream view);
改造現有的型別使之可以容易的使用新的程式設計模式,這樣使用者就不必拋棄使用以久的類庫(並不是說集合API會常駐永存)。
提供更加易用的並行(Parallelism)庫。提
2、內部迭代和外部迭代
目前的集合庫主要依賴於外部迭代:應用程式負責了做什麼和怎麼做
外部迭代的迴圈是序列的,且集合框架無法對控制流程進行優化(排序、短路求值等)。
內部迭代,將流程控制權交給類庫:應用程式只負責做什麼,怎麼做交由類庫
3、流(Stream)
每個流代表一個值序列,流提供一系列常用的聚集操作,使得我們可以便捷的在它上面進行各種運算。
java 7 實現:發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合
List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
if(t.getType() == Transaction.GROCERY){
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
java 8 的實現:發現 type 為 grocery 的所有交易,然後返回以交易值降序排序好的交易 ID 集合
List<Integer> transactionsIds = transactions.parallelStream().
filter(t -> t.getType() == Transaction.GROCERY).
sorted(comparing(Transaction::getValue).reversed()).
map(Transaction::getId).
collect(toList());
資料來源:任何可以用Iterator描述的物件都可以成為流的資料來源,包括Collection型別、BufferedReader、Random、BitSet以及陣列(Arrays.stream());流中元素的遍歷順序與資料來源的遍歷順序一致。
流的構成:獲取一個數據源(source)→ 資料轉換(返回新的流)→執行操作獲取想要的結果。
流的操作主要分為兩類:
中間(Intermediate):一個流可以後面跟隨零個或多個 intermediate 操作,每個操作返回一個新的流;這類操作都是惰性化的(lazy),並沒有真正開始流的遍歷。
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
終止(Terminal):一個流只能有一個 terminal 操作。Terminal 操作的執行,才會真正開始流的遍歷,並且會生成一個結果。
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
還有一類比較特別的操作Short-circuiting:用於處理操作一個無限大的流。
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit
4、流和集合
集合主要用來對其元素進行有效(effective)的管理和訪問(access),而流並不支援對其元素進行直接操作或直接訪問,只是通過宣告式操作在其上進行運算然後得到結果。
無儲存:流並不儲存值;流的元素源自資料來源(可能是某個資料結構、生成函式或I/O通道等等),通過一系列計算步驟得到;
天然的函式式風格(Functional in nature):對流的操作會產生一個結果,但流的資料來源不會被修改;
惰性求值:多數流操作(包括過濾、對映、排序以及去重)都可以以惰性方式實現。這使得我們可以用一遍遍歷完成整個流水線操作,並可以用短路操作提供更高效的實現;
無需上界(Bounds optional):不少問題都可以被表達為無限流(infinite stream):使用者不停地讀取流直到滿意的結果出現為止(比如說,列舉完美數這個操作可以被表達為在所有整數上進行過濾)。集合是有限的,但流不是(操作無限流時我們必需使用短路操作,以確保操作可以在有限時間內完成);
5、惰性
假設我們現在要統計一個List<Person>裡面的男性個數
publicint countMan(){
return personList.stream().filter(person -> person.getGender().equal(“男”)).count();
}
Stream流處理的工作:獲取所有人;過濾“男性”;計數。
我們最後呼叫count()就是獲取結果,相當於獲取新的List的size。實際上,Stream並沒有new一個新List。如果我們不呼叫count,那麼Stream什麼都不做。下面這段程式碼不會列印任何東西:
publicvoid function(){
personList.stream().filter(person -> {
System.out.println(“Go!”);
person.getGender().equal(“男”);
});
}
6、並行
java 8的Stream流的並行處理是以java 7推出的fork/join框架來處理的。
流水線既可以序列執行也可以並行執行,並行或序列是流的屬性。
雖然需要顯示的指定並行流,但是是非侵入式的(侵入式一般需要實現某個介面,重寫某個方法等),不需要開發人員手動實現並行程式碼。
並行
序列
int sum = shapes.parallelStream()
.filter(s -> s.getColor = BLUE)
.mapToInt(s -> s.getWeight())
.sum();
int sum = shapes.stream()
.filter(s -> s.getColor = BLUE)
.mapToInt(s -> s.getWeight())
.sum();
因為並行的存在,所以stream流的操作應保證無干擾性:
不要干擾資料來源(這個條件和遍歷集合時所需的條件相似,如果集合在遍歷時被修改,絕大多數的集合實現都會丟擲ConcurrentModificationException)。
不要干擾其它lambda表示式,當一個lambda在修改某個可變狀態而另一個lambda在讀取該狀態時就會產生這種干擾(所以在lambda表示式只允許使用有效只讀的變數,對應用開放,對值封閉)。
7、應用示例
遍歷所有宣告的方法,然後根據方法名稱、返回型別以及引數的數量和型別進行匹配。
不使用Stream流 |
---|
for (Method method : enclosingInfo.getEnclosingClass().getDeclaredMethods()) { if (method.getName().equals(enclosingInfo.getName())) { Class< ? >[] candidateParamClasses = method.getParameterTypes(); if (candidateParamClasses.length == parameterClasses.length) { boolean matches = true; for (int i = 0; i < candidateParamClasses.length; i += 1) { if (!candidateParamClasses[i].equals(parameterClasses[i])) { matches = false; break; } } if (matches) { // finally, check return type if (method.getReturnType().equals(returnType)) { return method; } } } } } thrownewInternalError("Enclosing method not found"); |
使用Stream流 |
---|
returnArrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods()) .filter(m -> Objects.equal(m.getName(), enclosingInfo.getName())) .filter(m -> Arrays.equal(m.getParameterTypes(), parameterClasses)) .filter(m -> Objects.equals(m.getReturnType(), returnType)) .findFirst() .orElseThrow(() -> newInternalError("Enclosing method not found")); |
比較可以發現,Stream流不僅消除了所有的臨時變數,而且程式碼更加緊湊,可讀性更好,也不容易出錯。
假設我們需要得到一個按名字排序的專輯列表,專輯列表裡面的每張專輯都至少包含一首四星及四星以上的音軌,為了構建這個專輯列表,我們可以這麼寫:
不使用Stream流:
使用Stream流:
|
8、收集器(Collectors)
流通過.collect()方法返回一個集合,該方法接收一個型別為Collector的引數。
Collectors提供了大量常用的工廠方法,根據不同的入參返回不同的實現,有 toList()、toSet()、toMap(K,V)等,其中toMap方法還可以指定容器和解決key衝突方式。
groupingBy(Classify),還可以指定Map容器和收集器型別
Map<Artist, List<Track>> favsByArtist = tracks.stream() .filter(t -> t.rating >= 4) .collect(Collectors.groupingBy(t -> t.artist)); Map<Artist, Set<Track>> favsByArtist = tracks.stream() .filter(t -> t.rating >= 4) .collect(Collectors.groupingBy(t -> t.artist, Collectors.toSet()));
9、並行的實質
以java 7引入的Fork/Join模型為基礎實現Stream流的平行計算
Fork/Join的實現原理如下,為了能夠並行化任意流上的所有操作,我們把流抽象為Spliterator(支援順序依次訪問資料,也支援分解資料):
把問題分解為子問題;
序列解決子問題從而得到部分結果(partial result);
合併部分結果合為最終結果。
Spliterator與Iterator的用法區別在於,如果開發者還知道源的其他元資料時(資料大小),類庫就可以通過Spliterator提供一個更加高效的實現(就像JDK中所有的集合一樣)。
Stream.iterate(0,n->n+3); ArrayList arrayList = new ArrayList(); Spliterator spliterator = arrayList.spliterator(); Iterator iterator = arrayList.iterator(); boolean parallel = true; StreamSupport.stream(spliterator,parallel); StreamSupport.stream(Spliterators.spliterator(iterator,arrayList.size(),Spliterator.DISTINCT),parallel);
10、比較器工廠(Comparator)
Comparator介面中新增了若干用於生成比較器的實用方法
靜態方法Comparator.comparing()接收一個函式,返回一個Comparator。這種方式不僅簡潔了程式碼(無需匿名類),而且便於進行各種組合操作。
//比較器反序 people.sort(comparing(p -> p.getLastName()).reversed()); //多條件排序 Comparator<Person> c = Comparator.comparing(p -> p.getLastName()) .thenComparing(p -> p.getFirstName()); people.sort(c);