java8 Stream詳解
Stream組成
在傳統Java程式設計,或者說是類C語言程式設計中,我們如何操作一個數組資料呢?或者更泛化的講,我們如何操作一個“集合”(Collection)資料呢?在Java中我們利用java.util包裡的各種資料結構封裝,來很好的表示了陣列(Array)、集合(Set)、列表(List)和kv物件(Map)。但是抽象表示只是意味著儲存和定義操作方法,具體如何訪問中間的資料,其實還是比較原始的,或者換句話說,操作一個Collection的資料,我們使用的是Collection本身提供的API。就如我們訪問一個List裡的所有資料,需要一個for迴圈來get每個element。
Java 8引入了一個Stream物件,來重新封裝集合中的資料,就像集合根據其特定的資料結構儲存了資料,而Stream將其表示為一個數據流,一個類似List的有序的資料流。值得一提的是,Stream是不儲存資料的,它核心是要將不同的資料——流化。
Stream包含一個資料來源頭(Source),一組(0個或多個)中間操作和一個終止操作。其實很好理解,一個流一定需要一個數據源頭,畢竟要確定是哪些資料要流式處理。中間操作是一些類似map、filter之類的轉換操作,也就是說map和filter只是將一個流變為新的流,它們可以串起來(stream pipeline)。而終止操作顧名思義,終止操作會結束流,終止操作包括產出結果型和邊際效果型(side-effect),其中前者比如count之類的產出一個int值的,後者則是forEach之類的允許後續處理的。下面具體分開講解一下Stream的各個組成部分。
源頭(Source)
其中源頭來源於陣列、Collection、I/O資源和生成函式。
Arrays
通過一個數組生成一個流,是比較容易理解的。Java API也是通過Arrays.stream()方法來實現的:
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
看其宣告就是將一個數組轉換為一個Stream物件。其委託StreamSupport來構造,而StreamSupport的stream方法宣告如下:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
這裡一個很重要的引數是Spliterator,這是構造流的核心。ReferencePipeline.Head構造方法就是將一些屬性設定好。而ReferencePipeline是個重要的概念,它是pipeline裡對於中間操作和源頭實現的抽象類,從其類宣告中可以看出:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
}
兩個泛型表示了pipeline的輸入和輸出。中間操作的實現很多也是由其實現的,比如map和filter。回到剛才的分析,因為ReferencePipeline構造後沒有後續的方法鏈呼叫了。所以要理解的就是Spliterator這個東西是什麼。
Spliterator
Spliterator是java.util包下的一個介面,抽象的能力就是對於一個源頭的遍歷(traverse)和分割槽(partition)的能力。也就是說,通過Spliterator來遍歷資料流源頭的每個元素(或者一個bulk的批量),也通過它來分割槽資料將其parallel並行化。看它的名字嘛,split+iterator,就是這個意思。
Spliterator聲明瞭幾個方法,其中tryAdvance()方法是單個遍歷的能力抽象,forEachRemaining()方法是批量遍歷的抽象,而trySplit是分割槽的抽象。這時回去看剛才StreamSupport.stream的引數,其中的Spliterator方法在Arrays類的宣告如下:
public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,
Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
其呼叫Spliterators靜態方法來構造Spliterator,裡面的一些ORDERED和IMMUTABLE的常量宣告,就是Spliterator的另一部分能力表達——流結構或者源頭資料的特徵,是有序的還是不可變的?是DISTINCT的還是不可空的?非常多的特性組合。具體到Arrays的呼叫依賴,Spliterators靜態類的實現如下:
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
int additionalCharacteristics) {
checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex);
return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
}
可以看到,返回了一個ArraySpliterator,它實現了針對陣列型別資料來源頭的幾個剛才提到的相關方法(tryAdvace和trySplit等)。具體我就不貼程式碼了,到這個層面大家可以自行關注。
因為我們理解一個流,或者理解一個構造流的過程,其實需要的就是一個Spliterator(我個人很佩服設計者的這種抽象能力),像個schema。
Collection
通過Collection生成流,就能像陣列那樣靜態了,因為Collection是一個介面,標識了一大堆資料結構(Map、List、Set等)。Collection做的比較好的是,流的構建在介面層面完成設計,沒有沉到實際的類結構中(全靠Java 8的介面default能力啊)。我們具體看下Collection的構造。
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
其同樣委託了StreamSupport。通過上面Arrays的分析,我們已經知道了流的構造核心就是Spliterator(第二遍講了)。所以直接看Spliterator()方法。
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
也是利用Spliterators的靜態方法,只不過傳遞了Collection自己做引數。
public static <T> Spliterator<T> spliterator(Collection<? extends T> c, int characteristics) {
return new IteratorSpliterator<>(Objects.requireNonNull(c),
characteristics);
}
這裡就和Arrays不一樣了,返回了一個IteratorSpliterator。而具體區別就是Spliterator實現的幾個try*方法的不同了。
生成方法
我是這麼理解生成方法的,通過generator或iterator來生成。比如generator,主要是通過Stream預設的generate()方法。宣告如下:
/**
* Returns an infinite sequential unordered stream where each element is
* generated by the provided {@code Supplier}. This is suitable for
* generating constant streams, streams of random elements, etc.
*
* @param <T> the type of stream elements
* @param s the {@code Supplier} of generated elements
* @return a new infinite sequential unordered {@code Stream}
*/
public static<T> Stream<T> generate(Supplier<T> s) {
Objects.requireNonNull(s);
return StreamSupport.stream(
new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
}
這就是一個Supplier來生成流。單單看其中的OfRef()構造方法做的事情,其實就是定義了一個InfiniteSupplyingSpliterator,名曰無窮的供給Spliterator。它的核心tryAdvance方法,就是用tryAdvance的引數Consumer,來執行外層引數Supplier。具體實現如下:
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
action.accept(s.get());
return true;
}
這下都明白了。當然場景在註釋裡也提到了,適合常量流和隨機變數流,無窮無盡持續生成。
使用iterate方法構建的流,也是一個無窮的流,宣告如下:
/**
* Returns an infinite sequential ordered {@code Stream} produced by iterative
* application of a function {@code f} to an initial element {@code seed},
* producing a {@code Stream} consisting of {@code seed}, {@code f(seed)},
* {@code f(f(seed))}, etc.
*
* <p>The first element (position {@code 0}) in the {@code Stream} will be
* the provided {@code seed}. For {@code n > 0}, the element at position
* {@code n}, will be the result of applying the function {@code f} to the
* element at position {@code n - 1}.
*
* @param <T> the type of stream elements
* @param seed the initial element
* @param f a function to be applied to to the previous element to produce
* a new element
* @return a new sequential {@code Stream}
*/
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
Objects.requireNonNull(f);
final Iterator<T> iterator = new Iterator<T>() {
@SuppressWarnings("unchecked")
T t = (T) Streams.NONE;
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
return t = (t == Streams.NONE) ? seed : f.apply(t);
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iterator,
Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
}
其通過構造一個Iterator,將遍歷方法next實現為呼叫一元操作運算元對初始種子值迭代計算來構造流。
流的處理
流的處理包含了中間操作和終止操作。後面通過兩個表格來解釋一下中間操作和終止操作的具體方法。
中間操作
方法 | 引數 | 用途 |
---|---|---|
concat | Stream< ? extends T> a, Stream< ? extends T> b | 將兩個流合起來形成新流 |
distinct | 將流裡的元素按照Ojbect.equal方法進行聚合去重,返回一個去重結果的新流 | |
empty | 返回一個空的流 | |
filter | Predicate< ? super T> predicate | 按照謂詞引數過濾,過濾後的元素形成新流返回 |
flatMap | Function< ? super T, ? extends Stream< ? extends R>> mapper | 將流裡的元素T,按照引數Function進行處理,處理結果是一個子流Stream< ? extends R>,後續將子流flat打平,形成元素R的新流。類似的有flatToDouble、flatToInt和flatToLong |
limit | long maxSize | 返回一個新流,只包含maxSize個元素,其他被truncate掉 |
map | Function< ? super T, ? extends R> mapper | 經典的map操作,對流裡的每個元素,通過引數mapper對映為一個新的元素,返回新元素的流。類似map有mapToDouble、mapToInt和mapToLong |
peek | Consumer< ? super T> action | 這個動作非常有趣,它並不改變流,而是對流裡的每個元素執行一個Consumer,對其進行一次處理。原始流不變繼續返回 |
skip | long n | 跳過n個元素,從第n+1個元素開始返回一個新的流 |
sorted | Comparator< ? super T> comparator | 根據引數排序器對流進行排序,返回新的流。如果引數為空,則按照自然序排序 |
中間操作因為不中斷流,所以比較好理解,最複雜的算是map,但是也是一個對映關係,因此這裡不做例子展示。
終止操作
方法 | 引數 | 用途 |
---|---|---|
allMatch | Predicate< ? super T> predicate | 根據謂詞函式判斷流裡的元素是否都滿足,返回對應的boolean值 |
anyMatch | Predicate< ? super T> predicate | 根據謂詞函式判斷流裡的元素是否存在一個或多個滿足,返回對應的boolean值 |
noneMatch | Predicate< ? super T> predicate | 根據謂詞函式判斷流裡的元素是否不存在任何一個元素滿足,返回對應的boolean值 |
count | 返回這個流裡元素的個數 | |
findAny | 返回一個Optional物件,這個等價於對於一個流執行一個select操作,返回一條記錄 | |
findFirst | 返回這個流裡的第一個元素的Optional,如果這個流不是有序的,則返回任意元素 | |
forEach | Consumer< ? super T> action | 對這個流的每個元素,執行引數Consumer |
forEachOrdered | Consumer action | 針對forEach在並行流裡對有序元素的輸出不足,這個方法確保並行流中按照原來順序處理 |
max | Comparator comparator | 返回一個Optional值,包含了流裡元素的max,條件是按照引數排序器排序 |
min | Comparator comparator | 返回一個Optional值,包含了流裡元素的min,條件是按照引數排序器排序 |
reduce | BinaryOperator< T> accumulator | 經典的reduce,就是根據一個二元操作運算元,將流中的元素逐個累計操作一遍,初始元素以foundAny結果為主 |
reduce | T identity, BinaryOperator< T> accumulator | 與上面的方法一致,只不過多了一個初始值,不需要findAny了 |
reduce | U identity,BiFunction< U, ? super T, U> accumulator,BinaryOperator< U> combiner | 最複雜的reduce,看到combiner會不會有聯想?它做的也是對於一個流裡的元素T,使用二元函式accumulator計算,計算的值累計到U上,因為之前的reduce要求流元素和結果元素型別一致,所以有限制。而該reduce函式,支援T和U型別不同,通過二元函式轉換,但是要求combiner會執行這個事情,要求“ combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)” |
collect | Supplier< R> supplier,BiConsumer< R, ? super T> accumulator,BiConsumer< R, R> combiner | 超級強大的方法。常規的reduce是返回不可變的值。而collect可以將reduce後的值升級為一個可變容器。具體這個方法就是對流裡每個元素T,將Supplier提供的值R作為初始值,用BiConsumer的accumulator進行累加計算。combiner的作用和要求和reduce是一樣的 |
collect | Collector< ? super T, A, R> collector | 和上面的collect一致,只不過Collector封裝了一組上面的引數,T是流裡的元素,A是累計中間結果,R是返回值的型別(collect的話就是容器了) |
這裡我們需要看幾個方法例子:
reduce:(word length sum)
Integer[] digits = {3, 1, 4};
Stream<Integer> values = Stream.of(digits);
//無初始值的
// Optional<Integer> sum = values.reduce((x, y) -> x + y);
// System.out.println(sum);// 會輸出8
//有初始值的
Integer sum2 = values.reduce(0, (x, y) -> x + y);
System.out.println(sum2);//也會輸出8
//增強reduce的
String[] wordArr = {"a", "b", "c"};
Stream<String> words = Stream.of(wordArr);//string 流
//計算每個元素的長度之和,0是初始值,第一次運算後,將其與第一個元素的長度1加和,得到1,把1和原始0進行combine,再不斷迭代。。。
int result = words.reduce(0, (length, t) -> t.length() + length, (length1, length2) -> length1 + length2);
System.out.println("reduce" + result);//這裡輸出reduce3
原始collect:(word count)
String[] wordArr = {"a", "b", "c", "a", "a", "b", "c", "d", "e"};
Arrays.stream(wordArr)
.collect(TreeMap::new, (map, str) -> {
Object val = map.get(str);
if (val != null) {
Integer v = Integer.valueOf(val.toString());
map.put(str, v + 1);
} else {
map.put(str, 1);
}
}, (map1, map2) -> {
map1.putAll(map2);
}).entrySet()
.forEach(System.out::println);
輸出結果是:
a=3
b=2
c=2
d=1
e=1
可以看到這樣去完成任務還是有點噁心的。這也是Collector想幫我們的地方:
String[] wordArr = {"a", "b", "c", "a", "a", "b", "c", "d", "e"};
Arrays.stream(wordArr)
.collect(groupingBy(Function.identity(), () -> new TreeMap<>(), counting())).entrySet()
.forEach(System.out::println);
這是collector版的wordcount,簡潔了好多。這裡的Collectors.groupingBy是個很複雜的函式,其實現如下:
/**
* Returns a {@code Collector} implementing a cascaded "group by" operation
* on input elements of type {@code T}, grouping elements according to a
* classification function, and then performing a reduction operation on
* the values associated with a given key using the specified downstream
* {@code Collector}. The {@code Map} produced by the Collector is created
* with the supplied factory function.
*
* <p>The classification function maps elements to some key type {@code K}.
* The downstream collector operates on elements of type {@code T} and
* produces a result of type {@code D}. The resulting collector produces a
* {@code Map<K, D>}.
*
* <p>For example, to compute the set of last names of people in each city,
* where the city names are sorted:
* <pre>{@code
* Map<City, Set<String>> namesByCity
* = people.stream().collect(groupingBy(Person::getCity, TreeMap::new,
* mapping(Person::getLastName, toSet())));
* }</pre>
*
* @implNote
* The returned {@code Collector} is not concurrent. For parallel stream
* pipelines, the {@code combiner} function operates by merging the keys
* from one map into another, which can be an expensive operation. If
* preservation of the order in which elements are presented to the downstream
* collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)}
* may offer better parallel performance.
*
* @param <T> the type of the input elements
* @param <K> the type of the keys
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
* @param <M> the type of the resulting {@code Map}
* @param classifier a classifier function mapping input elements to keys
* @param downstream a {@code Collector} implementing the downstream reduction
* @param mapFactory a function which, when called, produces a new empty
* {@code Map} of the desired type
* @return a {@code Collector} implementing the cascaded group-by operation
*
* @see #groupingBy(Function, Collector)
* @see #groupingBy(Function)
* @see #groupingByConcurrent(Function, Supplier, Collector)
*/
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(container, t);
};
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}
主要對比一下泛型引數吧,這裡的T就是stream元素String,K呢也是String,M是TreeMap,D是counting的結果Long,A也是counting的Supplier等於Long。
by the way,Collectors類還提供了很多靜態方法方便開發者將stream做collect操作,比如快捷的toSet/toMap/toList,groupingBy/counting/joining等等。
官方jdk註釋裡提供了一些簡單例子,我這裡copy一下:
// Accumulate names into a List
List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());
// Accumulate names into a TreeSet
Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));
// Convert elements to strings and concatenate them, separated by commas
String joined = things.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
// Compute sum of salaries of employee
int total = employees.stream()
.collect(Collectors.summingInt(Employee::getSalary)));
// Group employees by department
Map<Department, List<Employee>> byDept
= employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment));
// Compute sum of salaries by department
Map<Department, Integer> totalByDept
= employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment,
Collectors.summingInt(Employee::getSalary)));
// Partition students into passing and failing
Map<Boolean, List<Student>> passingFailing =
students.stream()
.collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));
Stream究竟怎麼實現的
一個例子
我們拿一個簡單的例子來詳細分析一下:
String[] a = {"1.0", "2.0", "3.0", "4.0", "5.0"};
Optional optional = Stream.of(a).map((v) -> Double.valueOf(v)).filter((v) -> v > 2).sorted((v1, v2) -> v2.compareTo(v1)).limit(2).reduce((v1, v2) -> v1 + v2);
System.out.println(optional);
這個例子會輸出一個Optional值,答案是12,流的處理邏輯是:
1. 將陣列a構建為一個流:{"1.0", "2.0", "3.0", "4.0", "5.0"}
2. 把string型資料對映為double:{1.0, 2.0, 3.0, 4.0, 5.0}
3. 過濾其中大於2的元素:{3.0, 4.0, 5.0}
4. 倒序排列元素:{5.0, 4.0, 3.0}
5. 限制流大小隻取前兩個元素:{5.0, 4.0}
6. reduce求和:5.0+4.0=9.0
分析map階段
我們之前已經講過構造流源頭的分析,產生了一個ReferencePipeline的Head。那麼從後續的map開始分析:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
仔細看,map返回的是一個StatelessOp,它繼承了ReferencePipeline。我們知道ReferencePipeline繼承了AbstractPipeline,而AbstractPipeline是真正流結構的持有者。AbstractPipeline裡定義了source、nextStage和previousStage,這個結構很明確的將流等同於一個連結串列,每個node都和前後有關聯,並且每個node都有一個到連結串列起始head的連線。如下圖所示:
nextsourceprevioussourcenextprevioussourceHeadop1op2
AbstractPipeline有個onWrapSink抽象方法,其核心就是將每個pipeline表達為一個Sink,而一個流就是由一個源頭Source,多個Sink串起來的一個連結串列結構。每個Sink結構上作為一個連結串列節點存在,功能上等價於一個Consumer(Sink介面繼承Consumer),通過accept方法來執行Sink操作。額外的多了begin和end方法做一些預處理和後處理的工作。
回到剛才的map方法,map返回一個Sink(具體是一個ChainedReference),其中accept做的工作就是呼叫mapper的apply方法,相當於執行mapper了。做一個拓撲圖,就是
nextsourceprevioussourceHeadSink1-mapper
分析filter階段
接下來構造filter,原始碼如下:
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
同理分析,它返回的也是個StatelessOp,其中同樣是一個ChainedReference的 Sink,accept方法就是會判斷一下謂詞函式predicate是否接受引數,如果接受呼叫下游的accept。拓撲變更為
nextsourceprevioussourcenextprevioussourceHeadSink1-mapperSink2-filter
分析排序階段
再接下來排序,排序返回的是一個StatefulOp,對比之前的map和filter,排序是一個有狀態的中間操作。實際的StatefulOp是一個它的子類OfRef,OfRef持有一個排序器
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
/**
* Comparator used for sorting
*/
private final boolean isNaturalSort;
private final Comparator<? super T> comparator;
...
}
它的onWrapSink方法返回
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
也就是說,如果有序,直接pass,返回原sink,否則,根據其列表是否有界,來使用不同的Sink返回,有界的排序Sink內部是一個數組T[],而無界的排序Sink內部是一個ArrayList< T>。以無界排序Sink為例,其中的三個方法begin負責構造List,accept負責把流元素add進去,而end負責排序sort。看下實現一目瞭然:
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}
}
最終構造的Stream拓撲結構變為
nextsourceprevioussourcenextprevioussourcenextprevioussourceHeadSink1-mapperSink2-filterSink3-sorted
分析limit階段
limit也是一個StatefulOp,其和skip複用了一個Sink,其中的onWrapSink方法具體會在accept階段控制一個limit的size:
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
其他的邏輯和之前的sink類似,不贅述,拓撲結構更新如下:
nextsourceprevioussourcenextprevioussourcenextprevioussourcenextprevioussourceHeadSink1-mapperSink2-filterSink3-sortedSink4-limit
分析reduce階段
reduce就和之前的Sink不一樣了,reduce作為一個終止操作,不再是Sink,而是一個觸發流執行的動作,其呼叫了AbstractPipeline的evaluate方法來觸發整個流的執行。
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
evaluate方法宣告如下:
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
其入參是一個TerminalOp終止操作,而返回的就是流執行後的結果。乾的事情就是呼叫TerminalOp的evaluateSequential方法來執行整個流。具體到這個例子就是呼叫ReduceOp(它繼承TerminalOp)的evaluateSequential方法。
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
其中PipelineHelper的wrapAndCopyInto方法就是包裝sink,然後copyInto就是執行了。
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
其中wrapSink很重要,它負責呼叫之前拓撲結構的每個Sink的onWrapSink方法把這個鏈式結構建立起來。看程式碼會發現,鏈是自後向前的。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
構造好後,執行copyInto:
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
具體核心就是驅動源頭的Spliterator開始forEachRemaining,也就是遍歷。對於每個元素進行tryAdvance(action),此處的action就是一個Sink了(剛才說過Sink繼承Consumer)。這樣一個鏈就開始從頭到尾(不一定,這取決於源頭結構和Spliterator型別)執行了。
最終的鏈式拓撲結構如下:
nextsourceprevioussourcenextprevioussourcenextprevioussourcenextprevioussource無真實關聯HeadSink1-mapperSink2-filterSink3-sortedSink4-limitTerminalOp-reduce
鏈式的執行依賴於TerminalOp究竟是什麼,因此一個Stream的執行是lazy的,當流構建好時,只是一個Sink的鏈式結構,最終的遍歷和執行需要一個終止操作來觸發。回到本節開始時的例子,前5步流是構造不執行的,直到第6步確定reduce求和,才觸發了流的遍歷執行。
總結
本文是Java8三個系列的最後一篇,筆者非常粗淺地分析了Java 8的Stream用法和簡單實現剖析。整個一個系列完成,個人以為Java 8的三大亮點也分別簡單總結整理完成。對於Java開發者,個人建議要在程式設計時廣泛使用流式Stream用法,其可以非常高效地幫助開發者完成一些常用結構體(array/list/map)的遍歷操作;使用函式式設計,可以極大的補充面向物件的程式設計思維,在OO模式抽象實體的同時,可以FP模式抽象操作和函式,這對於一些以操作為主的系統架構和設計來說,引入方法抽象和函式將極大地輔助系統設計。