1. 程式人生 > >Java8的流Stream與收集器Collector詳解

Java8的流Stream與收集器Collector詳解

流Stream

概述

Stream可以說是java8的一大亮點。java8中的Stream對集合功能進行了增強。在日常開發中,我們免不了要經常對集合物件進行處理,而在java8以前,對於集合的處理完全是由我們自己來操作,所以程式碼看起來相對繁雜。而有了Stream以後,對於集合的處理得到了大大的簡化。Stream提供了對集合物件的各種非常便利的、高效的聚合操作。

集合和Stream,表面看起來很相似,卻有著不同的目標。集合關注的是它當中元素元素有效的管理和訪問。與集合不同,流不會對它當中的元素提供一種直接訪問的方式,它關注的是計算。Stream關注的是它的源source的各種聚合的計算操作。

這也是集合和流的本質區別。

流操作的執行原理

一般來說Stream可分為三個部分:源source、中間操作Intermediate和終止操作Terminal。

流的源可以是一個數組、一個集合、一個生成器方法,一個I/O通道等等。

一個流可以有零個和或者多箇中間操作,每一箇中間操作都會返回一個新的流,供下一個操作使用一個流只會有一個終止操作。中間操作都是惰性的,也就是說僅僅呼叫流的中間操作,其實並沒有真正開始流的源的遍歷。

一個流只能有一個終止操作,它必定是流的最後一個操作。只有呼叫了流的終止操作,流的源的元素才會真正的開始遍歷,並且會生成一個結果返回或者產生一個副作用(side-effect)。

另外,每一個流只能被使用一次(即呼叫中間操作或者終止操作)。如果檢測到流被重用,會丟擲IllegalStateException異常。所以才使用流的時候,建議採用鏈式的寫法,如下:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
int sum = list.stream().map(item -> item * 2).reduce(0, Integer::sum)

從表面上來看,好像流在執行了多箇中間操作和一個終止操作之後,對於每一個操作,流中的元素都會遍歷執行,也就是有幾個操作,流中的元素就會進行幾次遍歷。這種觀點是大錯特錯的。

流的實際執行流程是這樣的,在遇到中間操作的時候,其實只是構建了一個Pipeline物件,而該物件是一個雙向連結串列的資料結構,只有在遇到終止操作的時候,那些中間操作和終止操作會被封裝成連結串列的資料結構連結起來,而流中每一個元素只會按照順序連結的去執行這些操作,也就是說,流中的元素最終只會在遇到終止操作後遍歷一次,而每個元素會將所有操作按順序執行一遍。

Stream的常用API

1. map/flatMap 對映

map操作是講流中的元素對映成另外一種元素,接受一個Function型別的引數,是一箇中間操作。例子:

// 轉換大寫
List<String> output = wordList.stream()
                            .map(String::toUpperCase)
                            .collect(Collectors.toList());
// 求平方
List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> squareNums = nums.stream()
                            .map(n -> n * n)
                            .collect(Collectors.toList());

flatMap是一種打平的對映。可用於一堆多的操作。也是一箇中間操作例子:

Stream<List<Integer>> inputStream = Stream.of(Arrays.asList(1),
                    Arrays.asList(2, 3), Arrays.asList(4, 5, 6));
Stream<Integer> outputStream = inputStream.FlatMap((childList) -> childList.stream());

看到最後返回的是Stream<Integer>型別,原來Stream<List<Integer>>中的集合元素被打平了。

2. filter 過濾

filter是對流中的元素進行過濾操作,會接受一個Predicate型別的引數,是一箇中間操作。只要是不滿足這個predicate的,也就是說predicate.test()返回false的元素會被過濾掉。例子:

// 找出偶數
Integer[] sixNums = {1, 2, 3, 4, 5, 6};
Integer[] evens = Stream.of(sixNums)
                    .filter(n -> n%2 == 0)
                    .toArray(Integer[]::new);

3. distinct 去重

distinct操作是對流中的元素進行去重,是一箇中間操作。

4. sorted 排序

sorted操作是對流中的元素按照進行排序,是一箇中間操作。不帶引數的是按照自然順序進行排序。帶引數的會傳一個Comparator型別的引數,作為比較規則。

5. limit

limit獲取流中前n個元素返回。是一箇中間操作。另外這個是一個短路操作(short-circuiting)。也就是說流中的元素遍歷到了第n個過後,後面的元素就不在進行遍歷了。可以看一個例子:

public void testLimitAndSkip() {
    List<Person> persons = new ArrayList();
    for (int i = 1; i <= 10000; i++) {
        Person person = new Person(i, "name" + i);
        persons.add(person);
    }
    List<String> personList2 = persons.stream()
        .map(Person::getName)
        .limit(5)
        .collect(Collectors.toList());
}

最後返回的結果為:

name1
name2
name3
name4
name5

6. skip

skip操作是跳過流中的前n個元素。是一箇中間操作。

7. forEach

forEach操作是流中的元素遍歷並且執行一個action。這是一個終止操作。

8. toArray

將流轉換為一個數組。是一個終止操作。

9. reduce

reduce匯聚操作,是一個終止操作。這個方法的主要作用是把 Stream 元素組合起來。它提供一個起始值(種子),然後依照運算規則(BinaryOperator),和前面 Stream 的第一個、第二個、第 n 個元素組合。從這個意義上說,字串拼接、數值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相當於:

Integer sum = integers.reduce(0, (a, b) -> a+b); 

或者

Integer sum = integers.reduce(0, Integer::sum);

10. min、max

求最大值最小值,是一個終止操作。

11. count

計算流中元素的個數。是一個終止操作。

12.匹配操作anyMatch、 allMatch、 noneMatch、 findFirst、 findAny

這些操作都是終止操作,且都是短路操作。
allMatch:Stream 中全部元素符合傳入的 predicate,返回 true,只要有一個不滿足就返回false;
anyMatch:Stream 中只要有一個元素符合傳入的 predicate,返回 true。只要有一個滿足就返回true;
noneMatch:Stream 中沒有一個元素符合傳入的 predicate,返回 true。只要有一個滿足就返回false;
findFirst:找到第一個元素。找到了就直接返回,不在遍歷後面元素。
findAny:找到任何一個元素就會返回。

13. collect 收集

對流中元素執行一個可變匯聚操作。是一個終止操作。比如:將流中的元素放入到一個List集合當中,將流中的元素進行分組、分割槽,求和等等操作。接受一個收集器Collector物件。Collector收集器下面會詳細介紹。下面舉幾個例子:

// 字串拼接
System.out.println(Stream.of("a", "b", "c", "d").collect(Collectors.joining()));
// 根據task的型別進行分組
private static Map<TaskType, List<Task>> groupTasksByType(List<Task> tasks) { 
    return tasks.stream().collect(groupingBy(Task::getType));
}

收集器Collector

概述

Collector是一介面。通過上面的介紹我們知道Stream的collect方法會接受一個Collector型別的引數,用來進行匯聚操作。那麼是怎樣實現匯聚操作的呢?Collector這個介面又有什麼用呢?下面講逐一介紹。

Collector介面

通過讀Jdk的文件可以知道,Collector介面是用來定義一個可變的匯聚操作:講輸入元素累加到一個可變結果容器,當所有的輸入元素都被處理過後,選擇性的將累加結果轉換為一個最終會的表示。匯聚操作可以被序列和並行的執行。

以下是Collector介面的定義:

public interface Collector<T, A, R> {
    /**
     * 用來建立並且返回一個可變結果容器
     */
    Supplier<A> supplier();

    /**
     * 將一個值疊進一個可變結果容器
     */
    BiConsumer<A, T> accumulator();

    /**
     * 接受兩個部分結果並將它們合併。可能是把一個引數疊進另一個引數並且返回另一個引數,
     * 也有可能返回一個新的結果容器,多執行緒處理時會用到
     */
    BinaryOperator<A> combiner();

    /**
     * 將中間型別執行最終的轉換,轉換成最終結果型別
     * 如果屬性 IDENTITY_TRANSFORM 被設定,該方法會假定中間結果型別可以強制轉成最終結果
     * 型別
     */
    Function<A, R> finisher();

    /**
     * 收集器的屬性集合
     */
    Set<Characteristics> characteristics();

    public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
                                              BiConsumer<R, T> accumulator,
                                              BinaryOperator<R> combiner,
                                              Characteristics... characteristics) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(accumulator);
        Objects.requireNonNull(combiner);
        Objects.requireNonNull(characteristics);
        Set<Characteristics> cs = (characteristics.length == 0)
                                  ? Collectors.CH_ID
                                  : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
                                                                           characteristics));
        return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
    }

    public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
                                                 BiConsumer<A, T> accumulator,
                                                 BinaryOperator<A> combiner,
                                                 Function<A, R> finisher,
                                                 Characteristics... characteristics) {
        Objects.requireNonNull(supplier);
        Objects.requireNonNull(accumulator);
        Objects.requireNonNull(combiner);
        Objects.requireNonNull(finisher);
        Objects.requireNonNull(characteristics);
        Set<Characteristics> cs = Collectors.CH_NOID;
        if (characteristics.length > 0) {
            cs = EnumSet.noneOf(Characteristics.class);
            Collections.addAll(cs, characteristics);
            cs = Collections.unmodifiableSet(cs);
        }
        return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
    }

    enum Characteristics {

        CONCURRENT,


        UNORDERED,


        IDENTITY_FINISH
    }

可以看到介面中定義了5個抽象方法,各個方法的作用都給出了註釋。其實Stream的collect操作就是呼叫這介面中定義的方法來實現匯聚操作的。不同的匯聚操作這些方法需要有不同的實現。

Collectors: Collector的工廠

Collectors類中只有一個私有的無參構造方法,而且裡面提供了大量的靜態方法,這些方法最終都是返回一個Collector收集器,因此可以認為Collectors這個類是Collector收集器的一個工廠類。Collectors裡面定義了一個靜態內部類CollectorImpl,該類是Collector收集器的一個實現:

static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;
        private final BinaryOperator<A> combiner;
        private final Function<A, R> finisher;
        private final Set<Characteristics> characteristics;

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Function<A,R> finisher,
                      Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }

        CollectorImpl(Supplier<A> supplier,
                      BiConsumer<A, T> accumulator,
                      BinaryOperator<A> combiner,
                      Set<Characteristics> characteristics) {
            this(supplier, accumulator, combiner, castingIdentity(), characteristics);
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier() {
            return supplier;
        }

        @Override
        public BinaryOperator<A> combiner() {
            return combiner;
        }

        @Override
        public Function<A, R> finisher() {
            return finisher;
        }

        @Override
        public Set<Characteristics> characteristics() {
            return characteristics;
        }
    }

可以看到,它針對Collector中5和方法的返回型別定義了五個對應型別的成員變數,而抽象方法的實現是直接返回這5個成員變數。而這五個物件是在構造CollectorImpl的時候傳進來的,這些都是函式式介面型別。看原始碼還可以看到Collectors中的靜態方法其實很多就是new一個CollectorImpl物件返回,而Collector中抽象方法的實現直接以lambda的形式直接通過CollectorImpl構造方法的引數傳過去。

這裡有必要對Collector

Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();

這個四個抽象方法結合文件註釋還是很好理解的。

下面以Collectors的toList方法來做一個講解,以下是toList方法的實現:

public static <T>
    Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
    }

可以看到,
supplier方法的實現為:ArrayList::new,建立一個ArrayList物件並返回。
accumulator方法的實現為:List::add,將流中的元素新增進上面建立的ArrayList物件。
combiner方法的實現為:(left, right) -> { left.addAll(right); return left; },對於兩個中間結果容器ArrayList,將一個的所有元素新增進另外一個,並返回另外一個ArrayList。
上面的程式碼跟進去,發現finisher方法的實現只是將ArrayList用List型別返回。
而characteristics方法的實現就是返回靜態常量CH_ID,它是一個包含了IDENTITY_FINISH的集合,標示中間結果是可以直接向最終結果進行強制型別轉換的。

以上就是toList的匯聚操作。其他的匯聚操作也是類似的,大家可自行分析原始碼。

Collector的序列和並行實現

  • 序列實現:Collector的匯聚操作的序列實現(即單執行緒)將會使用supplier方法建立唯一的一個結果容器,並且每一個輸入元素會呼叫一次accumulaor方法。
  • 並行實現:Collector的匯聚操作的並行實現(即多執行緒)將會對輸入元素進行分割槽,並且對每一個分割槽會使用supplier方法建立一個結果容器,然後將各個分割槽的每一個元素都呼叫accumulator方法將分割槽的內容計算出一個子結果,最後通過combiner方法將這些子結果合併成一個最終結果。

    以上是jdk文件對於Collector序列和並行實現的一個介紹。

Collector的一些約束

JDK規定,為了確保序列與並行結果的等價性,Collector函式需要滿足兩個約束條件:identity(同一性)與associativity(結合性)。
- identity(同一性)約束:對於任何的部分累積結果,跟一個空的結果容器合併將會產生一個等價的結果。也就是說,對於一個任何一條分割槽線上呼叫accumulator和combiner方法產生的部分結果a,必須等價於combiner.apply(a, supplier.get());即:a == combiner.apply(a, supplier.get());
- associativity(結合性)約束:進行分割的計算和未分割的計算必須要產生一個等價的結果。也就是說,對於任何的輸入元素t1和t2,通過序列操作產生的最終結果r1和用過平行計算產生的最終結果r2,必須是等價的。以下是序列和平行計算的程式碼:

    A a1 = supplier.get();
    accumulator.accept(a1, t1);
    accumulator.accept(a1, t2);
    R r1 = finisher.apply(a1);  // result without splitting

    A a2 = supplier.get();
    accumulator.accept(a2, t1);
    A a3 = supplier.get();
    accumulator.accept(a3, t2);
    R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting

JDK還規定,基於Collector實現匯聚操作的庫,例如Stream.collect(Collector),必須滿足以下約束:

  1. 第一個傳遞給accumulator方法的引數,傳遞給combiner方法的兩個引數以及傳遞給finisher方法的引數,必須是上一次呼叫supplier, accumulator, 或者 combiner方法所產生的結果。

  2. 匯聚操作的實現不應該對supplier, accumulator, 或者combiner產生的結果做任何處理,除了將他們作為引數再一次傳遞給accumulator, combiner, 或者finisher方法或者作為結果返回給匯聚操作的呼叫者。

  3. 如果一個結果被作為引數傳遞給了combiner或者finisher方法,但是方法的呼叫並沒有返回跟傳進來的結果相同的物件,那麼這個結果物件再也不會被使用了。傳進來的引數沒有被返回,說明生成了新的結果物件作為返回物件,所以原來傳進來的引數在執行邏輯產生了新的結果物件後就沒有用了,不會再被其他地方用到。

  4. 一旦一個結果被傳遞給了combiner或者finisher方法,那麼它不會再被傳遞給accumulator方法。也就是說方法執行順序是不可逆的。收集器方法的執行順序肯定是這樣的:首先用supplier方法產生一個結果容器,然後不斷利用accumulator將元素或者其轉換過後的結果往結果容器累加,再然後結果容器再被作為引數傳遞給combiner或者finisher方法。

  5. 對於非併發的收集器而言,supplier, accumulator, 或者combiner方法產生的結果必須是跟當前執行緒繫結的。這也使得並行收集無需實現額外的同步操作。匯聚的實現必須要對輸入的元素進行劃分處理,每一個分割槽的元素必須被隔離的處理,並且combining必須發生在accumulation完成之後。

  6. 對於併發的蒐集器而言,併發匯聚操作的實現是很自由的(甚至不是必須的)。併發的匯聚操作指的是 accumulator在多個執行緒上被併發呼叫時,使用的是同一個併發可修改的(執行緒安全的)結果容器,而不是在accumulation的時候結果被完全隔離。併發的匯聚操作只有在一個收集器包含Characteristics.UNORDERED屬性或者資料本身是無需的時候才能被使用。