1. 程式人生 > 其它 >不用Spring如何正確使用Druid連線池資料來源

不用Spring如何正確使用Druid連線池資料來源

什麼是Stream

Java8 API添加了一個新的抽象稱為流Stream,可以以一種宣告的方式處理資料,給我們操作集合(Collection)提供了極大的便利。
Stream將要處理的元素集合看作一種流,在流的過程中,藉助Stream API對流中的元素進行操作,比如:篩選、排序、聚合等。

stream可以由陣列或集合建立,對流的操作分為兩種

  • 中間操作,每次返回一個新的流,可以有多個,類似MapReduce中的Map
  • 終端操作,每個流只能進行一次終端操作,終端操作結束後流無法再次使用。終端操作會產生一個新的集合或值。類似MapReduce中的Reduce

Stream API可以極大提高Java程式設計師的生產力,讓程式設計師寫出高效率、乾淨、簡潔的程式碼。

Stream特性

  1. stream不儲存資料,而是按照特定的規則對資料進行計算,一般會輸出結果。

  2. stream不會改變資料來源,通常情況下會產生一個新的集合或一個值。

  3. stream具有延遲執行特性,只有呼叫終端操作時,中間操作才會執行。

Stream的建立

  1. 通過java.utiil.Collection.stream()方法用集合建立流
List<String> list = Arrays.asList("a", "b", "c");
//建立一個順序流
Stream<String> stream = list.stream();
//建立一個並行流
Stream<String> parallelStream = list.parallelStream();
  1. 使用java.util.Arrays.stream(T[] array)方法用陣列建立流
String[] arr = {"a", "b", "c"};
Stream stream = Arrays.stream(arr);
  1. 使用Stream的靜態方法: of()、iterate()、generate()
Stream<String> stream1 = Stream.of("a", "b", "c");

Stream<String> stream2 = Stream.iterate("a", (x) -> x + "a").limit(4);
//stream2.forEach(System.out::println);

Stream<Double> stream3 = Stream.generate(Math::random).limit(3);
//stream3.forEach(System.out::println);

順序流&並行流

Stream是順序流,由主執行緒按順序對流執行操作,而ParallelStream是並行流,內部以多執行緒並行執行的方式對流進行操作,但前提是流中的資料處理沒有順序要求。
如果流中的資料量足夠大,並行流可以加快處理速度。
除了直接建立並行流,還可以通過parallel()把順序流轉換成並行流:

Optional<Integer> findFirst = list.stream().parallel().filter(x->x>6).findFirst();

Stream的使用

核心類Optional

Optional類是一個可以為null的容器物件,如果值存在則isPresent()方法會返回true,呼叫get()方法會返回該物件。

遍歷/匹配(foreach/find/match)

List<Integer> list = Arrays.asList(7,6,9,3,8,5,2,1,4);

//遍歷輸出符合條件的元素
list.stream().filter(x -> x > 6).forEach(System.out::println);

//匹配第一個
Optional<Integer> findFirst = list.stream().filter(x -> x > 6).findFirst();
System.out.println("匹配第一個值:" + findFirst.get());

//匹配任意(適用於並行流)
Optional<Integer> findAny = list.parallelStream().filter(x -> x > 6).findAny();
System.out.println("匹配任意一個值:" + findAny.get());

//是否包含符合特定條件的元素
boolean anyMatch = list.stream().anyMatch(x -> x > 6);
System.out.println("是否存在大於6的值:" + anyMatch);


結果輸出:
7
9
8
匹配第一個值:7
匹配任意一個值:8
是否存在大於6的值:true

篩選(filter)

List<Integer> list = Arrays.asList(7,6,9,3,8,5,2,1,4);

//遍歷輸出符合條件的元素
list.stream().filter(x -> x > 6).forEach(System.out::println);

結果輸出:
7
9
8

聚合(max/min/count)

List<String> list = Arrays.asList("admin", "winter", "test", "yfdyf", "supermarket");

Optional<String> max = list.stream().max(Comparator.comparing(String::length));
System.out.println("最長的字串: " + max.get());

Optional<String> min = list.stream().min(Comparator.comparing(String::length));
System.out.println("最短的字串: " + min.get());

long count = list.stream().filter(x -> x.length() > 5).count();
System.out.println("字串長度大於5的個數: " + count);

結果輸出:
最長的字串: supermarket
最短的字串: test
字串長度大於5的個數: 2

對映(map/flatMap)

對映,可以將一個流的元素按照一定的對映規則對映到另一個流中。分為map和flatMap

  • map:接收一個函式作為引數,該函式會被應用到每個元素上,並將其對映成一個新的元素。

  • flatMap:接收一個函式作為引數,將流中的每個值都換成另一個流,然後把所有流連線成一個流。

List<String> list = Arrays.asList("admin", "winter", "test", "yfdyf", "supermarket");
List<String> strList = list.stream().map(String::toUpperCase).collect(Collectors.toList());
System.out.println("每個元素大寫: " + strList);

List<String> list2 = Arrays.asList("a-d-m-i-n", "w-i-n-t-e-r", "t-e-s-t");
List<String> strList2 = list2.stream().flatMap(s -> {
    //將每個元素轉換成一個stream
    String[] split = s.split("-");
    Stream<String> s2 = Arrays.stream(split);
    return s2;
}).collect(Collectors.toList());

System.out.println("處理前集合: " + list2);
System.out.println("處理後集合:" +strList2);

結果輸出:
每個元素大寫: [ADMIN, WINTER, TEST, YFDYF, SUPERMARKET]
處理前集合: [a-d-m-i-n, w-i-n-t-e-r, t-e-s-t]
處理後集合:[a, d, m, i, n, w, i, n, t, e, r, t, e, s, t]

歸約(reduce)

歸約,也稱縮減,是把一個流縮減成一個值,能實現對集合求和,求乘積和求最值操作。

List<Integer> list = Arrays.asList(1, 3, 2, 8, 11, 4);
//求和方式1
Optional<Integer> sum = list.stream().reduce((x, y) -> x + y);
//求和方式2
Optional<Integer> sum2 = list.stream().reduce(Integer::sum);
//求和方式3
Integer sum3 = list.stream().reduce(0, Integer::sum);
System.out.println("list求和: " + sum.get() + "," + sum2.get() + "," + sum3);

//求乘積
Optional<Integer> product = list.stream().reduce((x, y) -> x * y);
System.out.println("list求積: " + product.get());

//求最大值方式1
Optional<Integer> max = list.stream().reduce((x, y) -> x > y ? x : y);
//求最大值方式2
Integer max2 = list.stream().reduce(1, Integer::max);
System.out.println("list求最大值: " + max.get() + "," + max2);

結果輸出:
list求和: 29,29,29
list求積: 2112
list求最大值: 11,11

收集(collect)

//demo 員工類
@Data
public class Person{
    private String name; //姓名
    private int salary; //薪資
    private int age; //年齡
    private String sex; //性別
    private String area; //地區

    public Person(String name, int salary, int age, String sex, String area) {
        this.name = name;
        this.salary = salary;
        this.age = age;
        this.sex = sex;
        this.area = area;
    }
}

歸集(toList/toSet/toMap)

List<Person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

List<String> list = personList.stream().filter(p -> p.getAge() >  24).map(Person::getArea).collect(Collectors.toList());
System.out.println("age > 24 return area toList: " + list);

Set<String> set = personList.stream().filter(p -> p.getAge() >  24).map(Person::getArea).collect(Collectors.toSet());
System.out.println("age > 24 return area toSet: " + set);

Map<?, Person> map = personList.stream().filter(p -> p.getSalary() > 8000).collect(Collectors.toMap(Person::getName, p -> p));
System.out.println("salary >8000 return personMap: " + map);

結果輸出:
age > 24 return area toList: [Chang Sha, Hang Zhou, Chang Sha]
age > 24 return area toSet: [Chang Sha, Hang Zhou]
salary >8000 return personMap: {Winter=Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha), Spring=Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha)}

統計(count/averaging)

Collectors提供了一系列用於資料統計的靜態方法

  • 計數:count

  • 平均值:averagingInt、averagingLong、averagingDouble

  • 最值:maxBy、minBy

  • 求和:summingInt、summingLong、summingDouble

  • 統計以上所有:summarizingInt、summarizingLong、summarizingDouble

List<Person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//求總數
Long count = personList.stream().collect(Collectors.counting());
//求平均工資
Double average = personList.stream().collect(Collectors.averagingDouble(Person::getSalary));
//求最高工資
Optional<Integer> max = personList.stream().map(Person::getSalary).collect(Collectors.maxBy(Integer::compare));
//求工資之和
Integer sum = personList.stream().collect(Collectors.summingInt(Person::getSalary));
// 一次性統計所有資訊
DoubleSummaryStatistics collect = personList.stream().collect(Collectors.summarizingDouble(Person::getSalary));

System.out.println("員工總數: " + count);
System.out.println("員工平均工資: " + average);
System.out.println("員工最高工資: " + max.get());
System.out.println("員工工資總和: " + sum);
System.out.println("員工工資所有統計: " + collect);

結果輸出:
員工總數: 4
員工平均工資: 8332.5
員工最高工資: 9999
員工工資總和: 33330
員工工資所有統計: DoubleSummaryStatistics{count=4, sum=33330.000000, min=6666.000000, average=8332.500000, max=9999.000000}

分組(partitioningBy/groupingBy)

List<Person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//將員工按薪資是否高於8000分組
Map<Boolean, List<Person>> part = personList.stream().collect(Collectors.partitioningBy(x -> x.getSalary() > 8000));
System.out.println("員工按薪資是否大於8000 分組情況:" + part);

//將員工按性別分組
Map<String, List<Person>> group = personList.stream().collect(Collectors.groupingBy(Person::getArea));
System.out.println("員工按性別 分組情況:" + group);

// 將員工先按性別分組,再按地區分組
Map<String, Map<String, List<Person>>> group2 = personList.stream().collect(Collectors.groupingBy(Person::getSex, Collectors.groupingBy(Person::getArea)));
System.out.println("員工按性別、地區 分組情況: "+ group2);

結果輸出:
員工按薪資是否大於8000 分組情況:{false=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou), Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], true=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)]}
員工按性別 分組情況:{Chang Sha=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)], Shang Hai=[Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], Hang Zhou=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou)]}
員工按性別、地區 分組情況: {female={Shang Hai=[Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], Hang Zhou=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou)]}, male={Chang Sha=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)]}}

連線(joining)

List<String> list = Arrays.asList("A", "B", "C");
String string = list.stream().collect(Collectors.joining("-"));
System.out.println("拼接後的字串:" + string);

結果輸出:
拼接後的字串:A-B-C

排序(sorted)

  • sorted():自然排序,流中的元素需實現Comparable介面

  • sorted(Comparator com):Comparator排序器自定義排序

List<Person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//按工資升序(自然排序)
List<String> newList = personList.stream().sorted(Comparator.comparing(Person::getSalary)).map(Person::getName).collect(Collectors.toList());
System.out.println("按工資升序排序: " + newList);

//按工資降序
List<String> newList2 = personList.stream().sorted(Comparator.comparing(Person::getSalary).reversed()).map(Person::getName).collect(Collectors.toList());
System.out.println("按工資降序排序: " + newList2);

//先按工資再按年齡升序排序
List<String> newList3 = personList.stream().sorted(Comparator.comparing(Person::getSalary).thenComparing(Person::getAge)).map(Person::getName).collect(Collectors.toList());
System.out.println("先按工資再按年齡升序排序: " + newList3);

//先按工資再按年齡降序排序(自定義排序)
List<String> newList4 = personList.stream().sorted((p1, p2) -> {
    if(p1.getSalary() == p2.getSalary()) {
        return p2.getAge() - p1.getAge();
    } else {
        return p2.getSalary() - p1.getSalary();
    }
}).map(Person::getName).collect(Collectors.toList());
System.out.println("先按工資再按年齡降序排序: " + newList4);

結果輸出:
按工資升序排序: [Autumn, Summer, Winter, Spring]
按工資降序排序: [Spring, Winter, Summer, Autumn]
先按工資再按年齡升序排序: [Autumn, Summer, Winter, Spring]
先按工資再按年齡降序排序: [Spring, Winter, Summer, Autumn]

提取/組合(concat/distinct/limit/skip)

String[] arr1 = {"a", "b", "c", "d"};
String[] arr2 = {"d", "e", "f", "g"};

Stream<String> stream1 = Stream.of(arr1);
Stream<String> stream2 = Stream.of(arr2);

//合併兩個流並去重
List<String> newList = Stream.concat(stream1, stream2).distinct().collect(Collectors.toList());
System.out.println("流合併: " + newList);

//限制從流中獲取前5個數據
List<String> collect = newList.stream().limit(5).collect(Collectors.toList());
System.out.println("從合併的流中取出前5個數據: " + collect);

//跳過前5個數據
List<String> collect2 = newList.stream().skip(5).collect(Collectors.toList());
System.out.println("從合併的流中跳過前5個數據:" + collect2);

結果輸出:
流合併: [a, b, c, d, e, f, g]
從合併的流中取出前5個數據: [a, b, c, d, e]
從合併的流中跳過前5個數據:[f, g]

Stream原始碼解析

1. 基本介紹

Stream中的操作可以分為兩大類:中間操作(Intermediate operations)與結束操作(Terminal operations),中間操作只是對操作進行了記錄,只有結束操作才會觸發實際的計算(即惰性求值),這也是Stream在迭代大集合時高效的原因之一。中間操作又可以分為無狀態(Stateless)操作與有狀態(Stateful)操作,前者是指元素的處理不受之前元素的影響;後者是指該操作只有拿到所有元素之後才能繼續下去。結束操作又可以分為短路(short-circuiting)與非短路操作,前者是指遇到某些符合條件的元素就可以得到最終結果;而後者是指必須處理所有元素才能得到最終結果。

之所以要進行如此精細的劃分,是因為底層對每一種情況的處理方式不同。

BaseStream:定義了流的迭代、並行、序列等基本特性

Stream:定義了map、filter、flatmap等使用者關注的常用操作

PipelineHelper用於執行管道流中的操作以及捕獲輸出型別、並行度等資訊

Head、StatelessOp、StatefulOp為ReferencePipeline中的內部子類,用於描述流的操作階段

2. Stream()

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    //返回了一個由Head實現的Stream,三個引數分別代表流的資料來源、特性組合、是否並行
    return new ReferencePipeline.Head<>(spliterator,                     StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

Head(Spliterator<?> source,
     int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline(Spliterator<?> source,
                  int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline.Head的構造方法為呼叫父類ReferencePipeline的構造方法,ReferencePipeline的構造方法又呼叫了父類AbstractPipeline的構造方法
AbstractPipeline

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;//上一個stage指向null
    this.sourceSpliterator = source;
    this.sourceStage = this;//源頭stage指向自己
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

此處建構函式,構造出了一個前一個節點為空,頭節點指向自己,後一個節點暫未指定的雙端連結串列。
即,stream函式返回了一個由類實現的管道流,且該管道流為一個雙端連結串列,的頭節點。

3. 無狀態的中間操作(filter、map、flatmap等)

以filter為例

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    //入參不能為空
    Objects.requireNonNull(predicate);
    //構建了一個StatelessOp物件,即無狀態的中間操作
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        //覆寫了父類的opWrapSink方法
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

StatelessOp最終呼叫的構造方法和ReferencePipeline.Head呼叫的構造方法一致,都是呼叫的AbstractPipeline的構造方法,不過第一個引數傳入的是this,也就是將上一步建立的物件傳入,作為該構造物件的previousStage。

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    //previousStage的指標指向該建立物件
    previousStage.nextStage = this;
    //上一個stage指向上一步建立的物件
    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;//源頭stage與previousStage保持一致
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

再來看看map操作

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

可以看到與filter方法一樣,都是建立了一個StagellessOp物件,重寫了opWrapSink方法
呼叫一系列中間操作後會形成如下所示的雙鏈表結構:

4. 終結操作(collect等)

以collect為例

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    //並行模式
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    //序列模式
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
           ? (R) container
           : collector.finisher().apply(container);
}

以序列模式為例,往下走

public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
    Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
    BiConsumer<I, ? super T> accumulator = collector.accumulator();
    BinaryOperator<I> combiner = collector.combiner();
    class ReducingSink extends Box<I>
            implements AccumulatingSink<T, I, ReducingSink> {
        @Override
        public void begin(long size) {
            state = supplier.get();
        }

        @Override
        public void accept(T t) {
            accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
            state = combiner.apply(state, other.state);
        }
    }
    return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    };
}

ReduceOps.makeRef(collector)會構造一個TerminalOp物件,傳入evaluate方法。
以序列模式為例,evaluate方法會呼叫TerminalOp.evaluateSequential方法,再呼叫
PipelineHelper.wrapAndCopyInto方法,最終呼叫AbstarctPipeline中的copyInto方法,最終實現流水線的啟動。

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
    //無短路操作
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());//通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);//依次處理每個元素
        wrappedSink.end();//通知結束遍歷
    }
    //有短路操作
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

該方法從資料來源Spliterator中獲取元素,推入Sink進行處理,如果有短路操作,在每個元素處理後會通過Sink.cancellationRequested()判斷是否立即返回。

總結:
前面的中間操作只是做了一系列的準備工作,並沒有真正執行,真正的迭代是由結束操作來觸發的。

5. Sink

Stream中使用Stage的概念來描述一個完整的操作,將具有先後順序的各個Stage連到一起,就構成了整個流水線。

很多Stream操作會需要一個回撥函式(Lambda表示式),因此一個完整的操作是<資料來源,操作,回撥函式>構成的三元組。

stage只是解決了操作記錄的問題,要想讓流水線起到應有的作用我們需要一種將所有操作疊加到一起的方案。你可能會覺得這很簡單,只需要從流水線的head開始依次執行每一步的操作(包括回撥函式)就行了。這聽起來似乎是可行的,但是你忽略了前面的Stage並不知道後面Stage到底執行了哪種操作,以及回撥函式是哪種形式。換句話說,只有當前Stage本身才知道該如何執行自己包含的動作。這就需要有某種協議來協調相鄰Stage之間的呼叫關係。
而通過上文的collect原始碼,可以推測,Sink將在Stream中扮演該角色。

interface Sink<T> extends Consumer<T> {

//開始遍歷元素之前呼叫該方法,通知Sink做好準備,size代表要處理的元素總數,如果傳入-1代表總數未知或者無限
default void begin(long size) {}

//所有元素遍歷完成之後呼叫,通知Sink沒有更多的元素了。
default void end() {}

//如果返回true,代表這個Sink不再接收任何資料
default boolean cancellationRequested() {
return false;
}

//還有一個繼承自Consumer的方法,用於接收管道流中的資料
//void accept(T t);

...
}

注意上文collect原始碼中,collect操作在呼叫copyInto方法時,傳入了一個名為wrappedSink的引數,就是一個Sink物件,由AbstractPipeline.wrapSink方法構造而來。

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);

for (@SuppressWarnings("rawtypes")
AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) {
// 自本身stage開始,不斷呼叫前一個stage的opWrapSink,直到頭節點
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}

onWrapSink()方法的作用是將當前操作與下游Sink結合成新的Sink,只要從流水線的最後一個Stage開始,不斷呼叫上一個Stage的onWrapSink()方法直到頭節點,就可以得到一個代表了流水線上所有操作的Sink。
而onWrapSink()方法,正是在上文中間操作中,重寫的方法。

每個Stage都會將自己的操作封裝到一個Sink裡,前一個Stage只需呼叫後一個Stage的accept()方法即可,並不需要知道其內部是如何處理的。當然對於有狀態的操作,Sink的begin()和end()方法也是必須實現的。比如Stream.sorted()
是一個有狀態的中間操作,其對應的Sink.begin()方法可能會建立一個盛放結果的容器,而accept()方法負責將元素新增到該容器,最後end()負責對容器進行排序。對於短路操作,Sink.cancellationRequested()也是必須實現的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應該返回true,以便呼叫者儘快結束查詢。Sink的四個介面方法常常相互協作,共同完成計算任務。實際上Stream API內部實現的的本質,就是如何過載Sink的這四個介面方法。

有了Sink對操作的包裝,Stage之間的呼叫問題就解決了,執行時只需要從流水線的head開始對資料來源依次呼叫每個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。

以sorted方法為例,sorted一種可能封裝的Sink程式碼如下:

// Stream.sort()方法用到的Sink實現
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;// 存放用於排序的元素

RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream, comparator);
}

@Override
public void begin(long size) {
...
// 建立一個存放排序元素的列表
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}

@Override
public void end() {
list.sort(comparator);// 只有元素全部接收之後才能開始排序
downstream.begin(list.size());
if (!cancellationWasRequested) {// 下游Sink不包含短路操作
list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink
} else {// 下游Sink包含短路操作
for (T t : list) {// 每次都呼叫cancellationRequested()詢問是否可以結束處理。
if (downstream.cancellationRequested())
break;
downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink
}
}
downstream.end();
list = null;
}

@Override
public void accept(T t) {
list.add(t);// 1. 使用當前Sink包裝動作處理t,只是簡單的將元素新增到中間列表當中
}
}

上述程式碼完美的展現了Sink的四個介面方法是如何協同工作的:
begin():告訴Sink參與排序的元素個數,方便確定中間結果容器的大小

accept():將元素新增到中間結果當中,最終執行時呼叫者會不斷呼叫該方法,直到遍歷所有元素。

end():告訴Sink所有元素遍歷完畢,啟動排序步驟,排序完成後將結果傳遞給下游的Sink

如果下游Sink是短路操作,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否可以結束處理。

6. 結果收集

流水線上所有操作都執行後,使用者所需要的結果(如果有)在哪裡?
首先要說明的是不是所有的Stream結束操作都需要返回結果,有些操作只是為了使用其副作用(Side-effects),比如使用Stream.forEach()方法將結果打印出來就是常見的使用副作用的場景(事實上,除了列印之外其他場景都應避免使用副作用),對於真正需要返回結果的結束操作結果存在哪裡呢?這種需要分情況討論:
對於返回boolean或者Optional的操作的操作,由於值返回一個值,只需要在對應的Sink中記錄這個值,等到執行結束時返回就可以了。
對於歸約操作,最終結果放在使用者呼叫時指定的容器中(容器型別通過收集器指定)。collect(),reduce(),max(),min()都是歸約操作,雖然max()和min()也是返回一個Optional,但事實上底層是通過呼叫reduce()方法實現的。
對於返回是陣列的情況,在最終返回陣列之前,結果其實是儲存在一種叫做Node的資料結構中的。Node是一種多叉樹結構,元素儲存在樹的葉子當中,並且一個葉子節點可以存放多個元素。這樣做是為了並行執行方便。

7. 並行流

由上文可知,可通過parallel()方法,將順序流轉換成並行流。parallel()方法的實現很簡單,只是將源stage的並行標記值設為true。在結束操作通過evaluate方法啟動管道流時,會根據並行標記來判斷。如果並行標記為true則會通過ReduceTask來執行併發任務。

public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

ReduceTask是ForkJoinTask的子類,其實Stream的並行處理都是基於Fork/Join框架的,相關類與介面的結構如下圖所示:

fork/join框架是jdk1.7引入的,可以以遞迴方式將並行的任務拆分成更小的任務,然後將每個子任務的結果合併起來生成整體結果。它是ExecutorService介面的一個實現,它把子任務分配執行緒池(ForkJoinPool)中的工作執行緒。要把任務提交到這個執行緒池,必須建立RecursiveTask的一個子類,如果任務不返回結果則是RecursiveAction的子類。(本文不過多贅述fork/join框架)
對於ReduceTask來說,任務分解的實現定義在其父類AbstractTask的compute()方法當中:

public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}

該方法先呼叫當前splititerator 方法的estimateSize 方法,預估這個分片中的資料量,根據預估的資料量獲取最小處理單元的閾值,即當資料量已經小於這個閾值的時候進行計算,否則進行fork 將任務劃分成更小的資料塊,進行求解。

這裡面有個很重要的引數LEAF_TARGET,用來判斷是否需要繼續分割成更小的子任務,預設為parallelism*4(ForkJoinPool.getCommonPoolParallelism() << 2),parallelism是併發度的意思,預設值為cpu 數 – 1,可以通過java.util.concurrent.ForkJoinPool.common.parallelism設定, 如果當前分片大小仍然大於處理資料單元的閾值,且分片繼續嘗試切分成功,那麼就繼續切分,分別將左右分片的任務建立為新的Task,並且將當前的任務關聯為兩個新任務的父級任務(邏輯在makeChild 裡面)。

先後對左右子節點的任務進行fork,對另外的分割槽進行分解。同時設定pending 為1,這代表一個task 實際上只會有一個等待的子節點(被fork)。當任務已經分解到足夠小的時候退出迴圈,嘗試進行結束。呼叫子類實現的doLeaf方法,完成最小計算單元的計算任務,並設定到當前任務的localResult中。

然後呼叫tryComplete方法進行最終任務的掃尾工作,如果該任務pending值不等於0,則原子的減1,如果已經等於0,說明任務都已經完成,則呼叫onCompletion回撥,如果該任務是葉子任務,則直接銷燬中間資料結束;如果是中間節點會將左右子節點的結果進行合併。

最後檢查這個任務是否還有父級任務了,如果沒有則將該任務置為正常結束,如果還有則嘗試遞迴的去呼叫父級節點的onCompletion回撥,逐級進行任務的合併。

並行流的實現本質上就是在ForkJoin上進行了一層封裝,將Stream 不斷嘗試分解成更小的split,然後使用fork/join 框架分而治之。

參考資料

Java8 Stream:2萬字20個例項,玩轉集合的篩選、歸約、分組、聚合
好文推薦:JAVA進階之Stream實現原理