1. 程式人生 > >java8 Stream詳解

java8 Stream詳解



Java 8引入了一個Stream物件,來重新封裝集合中的資料,就像集合根據其特定的資料結構儲存了資料,而Stream將其表示為一個數據流,一個類似List的有序的資料流。值得一提的是,Stream是不儲存資料的,它核心是要將不同的資料——流化。

Stream包含一個資料來源頭(Source),一組(0個或多個)中間操作和一個終止操作。其實很好理解,一個流一定需要一個數據源頭,畢竟要確定是哪些資料要流式處理。中間操作是一些類似map、filter之類的轉換操作,也就是說map和filter只是將一個流變為新的流,它們可以串起來(stream pipeline)。而終止操作顧名思義,終止操作會結束流,終止操作包括產出結果型和邊際效果型(side-effect),其中前者比如count之類的產出一個int值的,後者則是forEach之類的允許後續處理的。下面具體分開講解一下Stream的各個組成部分。




通過一個數組生成一個流,是比較容易理解的。Java API也是通過Arrays.stream()方法來實現的:

  1. public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
  2. return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
  3. }


  1. public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
  2. Objects.requireNonNull(spliterator);
  3. return new ReferencePipeline.Head<>(spliterator,
  4. StreamOpFlag.fromCharacteristics(spliterator),
  5. parallel);
  6. }


  1. abstract class ReferencePipeline<P_IN, P_OUT>
  2. extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
  3. implements Stream<P_OUT> {
  4. }





  1. public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
  2. return Spliterators.spliterator(array, startInclusive, endExclusive,
  3. Spliterator.ORDERED | Spliterator.IMMUTABLE);
  4. }


  1. public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
  2. int additionalCharacteristics) {
  3. checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex);
  4. return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
  5. }




通過Collection生成流,就能像陣列那樣靜態了,因為Collection是一個介面,標識了一大堆資料結構(Map、List、Set等)。Collection做的比較好的是,流的構建在介面層面完成設計,沒有沉到實際的類結構中(全靠Java 8的介面default能力啊)。我們具體看下Collection的構造。

  1. default Stream<E> stream() {
  2. return StreamSupport.stream(spliterator(), false);
  3. }


  1. default Spliterator<E> spliterator() {
  2. return Spliterators.spliterator(this, 0);
  3. }


  1. public static <T> Spliterator<T> spliterator(Collection<? extends T> c, int characteristics) {
  2. return new IteratorSpliterator<>(Objects.requireNonNull(c),
  3. characteristics);
  4. }




  1. /**
  2. * Returns an infinite sequential unordered stream where each element is
  3. * generated by the provided {@code Supplier}. This is suitable for
  4. * generating constant streams, streams of random elements, etc.
  5. *
  6. * @param <T> the type of stream elements
  7. * @param s the {@code Supplier} of generated elements
  8. * @return a new infinite sequential unordered {@code Stream}
  9. */
  10. public static<T> Stream<T> generate(Supplier<T> s) {
  11. Objects.requireNonNull(s);
  12. return StreamSupport.stream(
  13. new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
  14. }


  1. @Override
  2. public boolean tryAdvance(Consumer<? super T> action) {
  3. Objects.requireNonNull(action);
  4. action.accept(s.get());
  5. return true;
  6. }



  1. /**
  2. * Returns an infinite sequential ordered {@code Stream} produced by iterative
  3. * application of a function {@code f} to an initial element {@code seed},
  4. * producing a {@code Stream} consisting of {@code seed}, {@code f(seed)},
  5. * {@code f(f(seed))}, etc.
  6. *
  7. * <p>The first element (position {@code 0}) in the {@code Stream} will be
  8. * the provided {@code seed}. For {@code n > 0}, the element at position
  9. * {@code n}, will be the result of applying the function {@code f} to the
  10. * element at position {@code n - 1}.
  11. *
  12. * @param <T> the type of stream elements
  13. * @param seed the initial element
  14. * @param f a function to be applied to to the previous element to produce
  15. * a new element
  16. * @return a new sequential {@code Stream}
  17. */
  18. public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
  19. Objects.requireNonNull(f);
  20. final Iterator<T> iterator = new Iterator<T>() {
  21. @SuppressWarnings("unchecked")
  22. T t = (T) Streams.NONE;
  23. @Override
  24. public boolean hasNext() {
  25. return true;
  26. }
  27. @Override
  28. public T next() {
  29. return t = (t == Streams.NONE) ? seed : f.apply(t);
  30. }
  31. };
  32. return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
  33. iterator,
  34. Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
  35. }





方法 引數 用途
concat Stream< ? extends T> a, Stream< ? extends T> b 將兩個流合起來形成新流
distinct 將流裡的元素按照Ojbect.equal方法進行聚合去重,返回一個去重結果的新流
empty 返回一個空的流
filter Predicate< ? super T> predicate 按照謂詞引數過濾,過濾後的元素形成新流返回
flatMap Function< ? super T, ? extends Stream< ? extends R>> mapper 將流裡的元素T,按照引數Function進行處理,處理結果是一個子流Stream< ? extends R>,後續將子流flat打平,形成元素R的新流。類似的有flatToDouble、flatToInt和flatToLong
limit long maxSize 返回一個新流,只包含maxSize個元素,其他被truncate掉
map Function< ? super T, ? extends R> mapper 經典的map操作,對流裡的每個元素,通過引數mapper對映為一個新的元素,返回新元素的流。類似map有mapToDouble、mapToInt和mapToLong
peek Consumer< ? super T> action 這個動作非常有趣,它並不改變流,而是對流裡的每個元素執行一個Consumer,對其進行一次處理。原始流不變繼續返回
skip long n 跳過n個元素,從第n+1個元素開始返回一個新的流
sorted Comparator< ? super T> comparator 根據引數排序器對流進行排序,返回新的流。如果引數為空,則按照自然序排序



方法 引數 用途
allMatch Predicate< ? super T> predicate 根據謂詞函式判斷流裡的元素是否都滿足,返回對應的boolean值
anyMatch Predicate< ? super T> predicate 根據謂詞函式判斷流裡的元素是否存在一個或多個滿足,返回對應的boolean值
noneMatch Predicate< ? super T> predicate 根據謂詞函式判斷流裡的元素是否不存在任何一個元素滿足,返回對應的boolean值
count 返回這個流裡元素的個數
findAny 返回一個Optional物件,這個等價於對於一個流執行一個select操作,返回一條記錄
findFirst 返回這個流裡的第一個元素的Optional,如果這個流不是有序的,則返回任意元素
forEach Consumer< ? super T> action 對這個流的每個元素,執行引數Consumer
forEachOrdered Consumer action 針對forEach在並行流裡對有序元素的輸出不足,這個方法確保並行流中按照原來順序處理
max Comparator comparator 返回一個Optional值,包含了流裡元素的max,條件是按照引數排序器排序
min Comparator comparator 返回一個Optional值,包含了流裡元素的min,條件是按照引數排序器排序
reduce BinaryOperator< T> accumulator 經典的reduce,就是根據一個二元操作運算元,將流中的元素逐個累計操作一遍,初始元素以foundAny結果為主
reduce T identity, BinaryOperator< T> accumulator 與上面的方法一致,只不過多了一個初始值,不需要findAny了
reduce U identity,BiFunction< U, ? super T, U> accumulator,BinaryOperator< U> combiner 最複雜的reduce,看到combiner會不會有聯想?它做的也是對於一個流裡的元素T,使用二元函式accumulator計算,計算的值累計到U上,因為之前的reduce要求流元素和結果元素型別一致,所以有限制。而該reduce函式,支援T和U型別不同,通過二元函式轉換,但是要求combiner會執行這個事情,要求“ combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)”
collect Supplier< R> supplier,BiConsumer< R, ? super T> accumulator,BiConsumer< R, R> combiner 超級強大的方法。常規的reduce是返回不可變的值。而collect可以將reduce後的值升級為一個可變容器。具體這個方法就是對流裡每個元素T,將Supplier提供的值R作為初始值,用BiConsumer的accumulator進行累加計算。combiner的作用和要求和reduce是一樣的
collect Collector< ? super T, A, R> collector 和上面的collect一致,只不過Collector封裝了一組上面的引數,T是流裡的元素,A是累計中間結果,R是返回值的型別(collect的話就是容器了)

reduce:(word length sum)

  1. Integer[] digits = {3, 1, 4};
  2. Stream<Integer> values = Stream.of(digits);
  3. //無初始值的
  4. // Optional<Integer> sum = values.reduce((x, y) -> x + y);
  5. // System.out.println(sum);// 會輸出8
  6. //有初始值的
  7. Integer sum2 = values.reduce(0, (x, y) -> x + y);
  8. System.out.println(sum2);//也會輸出8
  9. //增強reduce的
  10. String[] wordArr = {"a", "b", "c"};
  11. Stream<String> words = Stream.of(wordArr);//string 流
  12. //計算每個元素的長度之和,0是初始值,第一次運算後,將其與第一個元素的長度1加和,得到1,把1和原始0進行combine,再不斷迭代。。。
  13. int result = words.reduce(0, (length, t) -> t.length() + length, (length1, length2) -> length1 + length2);
  14. System.out.println("reduce" + result);//這裡輸出reduce3

原始collect:(word count)

  1. String[] wordArr = {"a", "b", "c", "a", "a", "b", "c", "d", "e"};
  2. Arrays.stream(wordArr)
  3. .collect(TreeMap::new, (map, str) -> {
  4. Object val = map.get(str);
  5. if (val != null) {
  6. Integer v = Integer.valueOf(val.toString());
  7. map.put(str, v + 1);
  8. } else {
  9. map.put(str, 1);
  10. }
  11. }, (map1, map2) -> {
  12. map1.putAll(map2);
  13. }).entrySet()
  14. .forEach(System.out::println);




  1. String[] wordArr = {"a", "b", "c", "a", "a", "b", "c", "d", "e"};
  2. Arrays.stream(wordArr)
  3. .collect(groupingBy(Function.identity(), () -> new TreeMap<>(), counting())).entrySet()
  4. .forEach(System.out::println);


  1. /**
  2. * Returns a {@code Collector} implementing a cascaded "group by" operation
  3. * on input elements of type {@code T}, grouping elements according to a
  4. * classification function, and then performing a reduction operation on
  5. * the values associated with a given key using the specified downstream
  6. * {@code Collector}. The {@code Map} produced by the Collector is created
  7. * with the supplied factory function.
  8. *
  9. * <p>The classification function maps elements to some key type {@code K}.
  10. * The downstream collector operates on elements of type {@code T} and
  11. * produces a result of type {@code D}. The resulting collector produces a
  12. * {@code Map<K, D>}.
  13. *
  14. * <p>For example, to compute the set of last names of people in each city,
  15. * where the city names are sorted:
  16. * <pre>{@code
  17. * Map<City, Set<String>> namesByCity
  18. * = people.stream().collect(groupingBy(Person::getCity, TreeMap::new,
  19. * mapping(Person::getLastName, toSet())));
  20. * }</pre>
  21. *
  22. * @implNote
  23. * The returned {@code Collector} is not concurrent. For parallel stream
  24. * pipelines, the {@code combiner} function operates by merging the keys
  25. * from one map into another, which can be an expensive operation. If
  26. * preservation of the order in which elements are presented to the downstream
  27. * collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)}
  28. * may offer better parallel performance.
  29. *
  30. * @param <T> the type of the input elements
  31. * @param <K> the type of the keys
  32. * @param <A> the intermediate accumulation type of the downstream collector
  33. * @param <D> the result type of the downstream reduction
  34. * @param <M> the type of the resulting {@code Map}
  35. * @param classifier a classifier function mapping input elements to keys
  36. * @param downstream a {@code Collector} implementing the downstream reduction
  37. * @param mapFactory a function which, when called, produces a new empty
  38. * {@code Map} of the desired type
  39. * @return a {@code Collector} implementing the cascaded group-by operation
  40. *
  41. * @see #groupingBy(Function, Collector)
  42. * @see #groupingBy(Function)
  43. * @see #groupingByConcurrent(Function, Supplier, Collector)
  44. */
  45. public static <T, K, D, A, M extends Map<K, D>>
  46. Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
  47. Supplier<M> mapFactory,
  48. Collector<? super T, A, D> downstream) {
  49. Supplier<A> downstreamSupplier = downstream.supplier();
  50. BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
  51. BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
  52. K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
  53. A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
  54. downstreamAccumulator.accept(container, t);
  55. };
  56. BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
  57. @SuppressWarnings("unchecked")
  58. Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
  59. if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
  60. return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
  61. }
  62. else {
  63. @SuppressWarnings("unchecked")
  64. Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
  65. Function<Map<K, A>, M> finisher = intermediate -> {
  66. intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
  67. @SuppressWarnings("unchecked")
  68. M castResult = (M) intermediate;
  69. return castResult;
  70. };
  71. return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
  72. }
  73. }


by the way,Collectors類還提供了很多靜態方法方便開發者將stream做collect操作,比如快捷的toSet/toMap/toList,groupingBy/counting/joining等等。


  1. // Accumulate names into a List
  2. List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());
  3. // Accumulate names into a TreeSet
  4. Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));
  5. // Convert elements to strings and concatenate them, separated by commas
  6. String joined = things.stream()
  7. .map(Object::toString)
  8. .collect(Collectors.joining(", "));
  9. // Compute sum of salaries of employee
  10. int total = employees.stream()
  11. .collect(Collectors.summingInt(Employee::getSalary)));
  12. // Group employees by department
  13. Map<Department, List<Employee>> byDept
  14. = employees.stream()
  15. .collect(Collectors.groupingBy(Employee::getDepartment));
  16. // Compute sum of salaries by department
  17. Map<Department, Integer> totalByDept
  18. = employees.stream()
  19. .collect(Collectors.groupingBy(Employee::getDepartment,
  20. Collectors.summingInt(Employee::getSalary)));
  21. // Partition students into passing and failing
  22. Map<Boolean, List<Student>> passingFailing =
  23. students.stream()
  24. .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));




  1. String[] a = {"1.0", "2.0", "3.0", "4.0", "5.0"};
  2. Optional optional = Stream.of(a).map((v) -> Double.valueOf(v)).filter((v) -> v > 2).sorted((v1, v2) -> v2.compareTo(v1)).limit(2).reduce((v1, v2) -> v1 + v2);
  3. System.out.println(optional);

1. 將陣列a構建為一個流:{"1.0", "2.0", "3.0", "4.0", "5.0"} 
2. 把string型資料對映為double:{1.0, 2.0, 3.0, 4.0, 5.0} 
3. 過濾其中大於2的元素:{3.0, 4.0, 5.0} 
4. 倒序排列元素:{5.0, 4.0, 3.0} 
5. 限制流大小隻取前兩個元素:{5.0, 4.0} 
6. reduce求和:5.0+4.0=9.0



  1. public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
  2. Objects.requireNonNull(mapper);
  3. return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
  4. StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
  5. @Override
  6. Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
  7. return new Sink.ChainedReference<P_OUT, R>(sink) {
  8. @Override
  9. public void accept(P_OUT u) {
  10. downstream.accept(mapper.apply(u));
  11. }
  12. };
  13. }
  14. };
  15. }








  1. public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
  2. Objects.requireNonNull(predicate);
  3. return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
  4. StreamOpFlag.NOT_SIZED) {
  5. @Override
  6. Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
  7. return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
  8. @Override
  9. public void begin(long size) {
  10. downstream.begin(-1);
  11. }
  12. @Override
  13. public void accept(P_OUT u) {
  14. if (predicate.test(u))
  15. downstream.accept(u);
  16. }
  17. };
  18. }
  19. };
  20. }

同理分析,它返回的也是個StatelessOp,其中同樣是一個ChainedReference的 Sink,accept方法就是會判斷一下謂詞函式predicate是否接受引數,如果接受呼叫下游的accept。拓撲變更為




  1. private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
  2. /**
  3. * Comparator used for sorting
  4. */
  5. private final boolean isNaturalSort;
  6. private final Comparator<? super T> comparator;
  7. ...
  8. }


  1. public Sink<T> opWrapSink(int flags, Sink<T> sink) {
  2. Objects.requireNonNull(sink);
  3. // If the input is already naturally sorted and this operation
  4. // also naturally sorted then this is a no-op
  5. if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
  6. return sink;
  7. else if (StreamOpFlag.SIZED.isKnown(flags))
  8. return new SizedRefSortingSink<>(sink, comparator);
  9. else
  10. return new RefSortingSink<>(sink, comparator);
  11. }

也就是說,如果有序,直接pass,返回原sink,否則,根據其列表是否有界,來使用不同的Sink返回,有界的排序Sink內部是一個數組T[],而無界的排序Sink內部是一個ArrayList< T>。以無界排序Sink為例,其中的三個方法begin負責構造List,accept負責把流元素add進去,而end負責排序sort。看下實現一目瞭然:

  1. private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
  2. private ArrayList<T> list;
  3. RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
  4. super(sink, comparator);
  5. }
  6. @Override
  7. public void begin(long size) {
  8. if (size >= Nodes.MAX_ARRAY_SIZE)
  9. throw new IllegalArgumentException(Nodes.BAD_SIZE);
  10. list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
  11. }
  12. @Override
  13. public void end() {
  14. list.sort(comparator);
  15. downstream.begin(list.size());
  16. if (!cancellationWasRequested) {
  17. list.forEach(downstream::accept);
  18. }
  19. else {
  20. for (T t : list) {
  21. if (downstream.cancellationRequested()) break;
  22. downstream.accept(t);
  23. }
  24. }
  25. downstream.end();
  26. list = null;
  27. }
  28. @Override
  29. public void accept(T t) {
  30. list.add(t);
  31. }
  32. }





  1. public void accept(T t) {
  2. if (n == 0) {
  3. if (m > 0) {
  4. m--;
  5. downstream.accept(t);
  6. }
  7. }
  8. else {
  9. n--;
  10. }
  11. }





  1. public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
  2. return evaluate(ReduceOps.makeRef(accumulator));
  3. }


  1. /**
  2. * Evaluate the pipeline with a terminal operation to produce a result.
  3. *
  4. * @param <R> the type of result
  5. * @param terminalOp the terminal operation to be applied to the pipeline.
  6. * @return the result
  7. */
  8. final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  9. assert getOutputShape() == terminalOp.inputShape();
  10. if (linkedOrConsumed)
  11. throw new IllegalStateException(MSG_STREAM_LINKED);
  12. linkedOrConsumed = true;
  13. return isParallel()
  14. ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
  15. : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
  16. }


  1. public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
  2. Spliterator<P_IN> spliterator) {
  3. return helper.wrapAndCopyInto(makeSink(), spliterator).get();
  4. }


  1. final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
  2. copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
  3. return sink;
  4. }


  1. final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
  2. Objects.requireNonNull(sink);
  3. for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
  4. sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
  5. }
  6. return (Sink<P_IN>) sink;
  7. }


  1. final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
  2. Objects.requireNonNull(wrappedSink);
  3. if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
  4. wrappedSink.begin(spliterator.getExactSizeIfKnown());
  5. spliterator.forEachRemaining(wrappedSink);
  6. wrappedSink.end();
  7. }
  8. else {
  9. copyIntoWithCancel(wrappedSink, spliterator);
  10. }
  11. }






本文是Java8三個系列的最後一篇,筆者非常粗淺地分析了Java 8的Stream用法和簡單實現剖析。整個一個系列完成,個人以為Java 8的三大亮點也分別簡單總結整理完成。對於Java開發者,個人建議要在程式設計時廣泛使用流式Stream用法,其可以非常高效地幫助開發者完成一些常用結構體(array/list/map)的遍歷操作;使用函式式設計,可以極大的補充面向物件的程式設計思維,在OO模式抽象實體的同時,可以FP模式抽象操作和函式,這對於一些以操作為主的系統架構和設計來說,引入方法抽象和函式將極大地輔助系統設計。