1. 程式人生 > >JAVA8學習——Stream底層的實現三(學習過程)

JAVA8學習——Stream底層的實現三(學習過程)

Stream的深入(三)

心得:之前學習流,深入了流的底層。但是學的這些東西在平時日常開發的過程中,是根本不會用到的。只是為了更好幫助自己去理解流的底層設施。用起來也更自信,能夠確定用的東西非常正確。

專注技術:這種純技術的這種環境。

而不是說:專注業務開發了5年,技術沒有長進。

這位張龍老師給講課的方式,就是學習一門新技術的過程。如果覺得這種方式學習起來很有效的話。可以使用這種方式去學習一門新的技術。

lambda表示式和匿名內部類完全不同

之前雖然學了流了,但是還不太夠。我們還缺少了一個能夠把一個流從頭到尾的執行過程給用起來的過程。

接下來會完成這個目的

用程式入門。

public class LambdaTest {
    //內部類和lambda表示式到底有什麼關係
    Runnable r1 = () -> System.out.println(this);

    //匿名內部類 - 標識我生成了一個Runnable的例項 . 是一個類
    Runnable r2 = new Runnable() {
        @Override
        public void run() {
            System.out.println(this);
        }
    };

    public static void main(String[] args) {
        LambdaTest lambdaTest = new LambdaTest();
        Thread t1 = new Thread(lambdaTest.r1);
        t1.start();

        System.out.println("-------------");
        Thread t2 = new Thread(lambdaTest.r2);
        t2.start();
        //請問,輸出結果一樣嗎?
    }
}

執行結果:

-------------
com.dawa.jdk8.LambdaTest@59a30351   (lambda表示式的結果)
com.dawa.jdk8.LambdaTest$1@2831008d  (匿名內部類的結果)

Process finished with exit code 0

LambdaTest$1, 這個1就是匿名內部類的類名。 (匿名內部類的名字)

經過對比,雖然說 lambda是匿名內部類的不同實現,但是 他們兩個是完全一樣的。原理不同。

結論:

  1. 匿名內部類會開闢一個新的作用域
  2. lambda是不會開闢新的作用域的

這裡普及這個知識點,是為了以後在Debug的時候會發現 匿名內部類和lambda表示式的類名不同、


系統的去走一遍stream的執行流程

public class StreamTest3 {
    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world", "welcome");
        list.stream().map(item->item+"_abc").forEach(System.out::println);
    }
}

map()實現

返回值為StatelessOp

@Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
//StatelessOp類的定義和構造方法
    /**
     * Base class for a stateless intermediate stage of a Stream.
     *
     * @param <E_IN> type of elements in the upstream source
     * @param <E_OUT> type of elements in produced by this stage
     * @since 1.8
     */
abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

StatelessOp繼承ReferencePipeline,而ReferencePipeline實現了Stream.

所以map方法返回new StatelessOp<P_OUT, R>就等於返回了一個Stream

返回的是繼承了StatelessOp的子類的物件。完成了上游和下游流的互通.

Reference Pipeline 無非就是一個雙向連結串列

操作包裝:map()方法中的 opWrapSink()的ChainedReference,實現了流的包裝操作。把剩下的流給warp到一起

然後就一個元素,同時經過了剩下的方法操作。

@Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
 Sink 類
 * <p>A sink may be in one of two states: an initial state and an active state.
 * It starts out in the initial state; the {@code begin()} method transitions
 * it to the active state, and the {@code end()} method transitions it back into
 * the initial state, where it can be re-used.  Data-accepting methods (such as
 * {@code accept()} are only valid in the active state.
 *

ChainedReference()連結引用

    /**
     * Abstract {@code Sink} implementation for creating chains of
     * sinks.  The {@code begin}, {@code end}, and
     * {@code cancellationRequested} methods are wired to chain to the
     * downstream {@code Sink}.  This implementation takes a downstream
     * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
     * implementation of the {@code accept()} method must call the correct
     * {@code accept()} method on the downstream {@code Sink}.
     */
    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

Sink類中的end(),和 begin()方法,切換兩種狀態:1.初始狀態 2.啟用狀態

每一次accept()方法執行之前,需要呼叫Sink中的begin()方法,進入啟用狀態,執行完畢之後呼叫end()方法,進入初始狀態。

涉及設計模式:模板方法模式。

opWrapSink() 的上級實現:

接受了一個Sink物件,這個物件接受了操作的結果,並且返回了一個Sink,還會執行這個操作,並將這個結果傳遞給所提供的sink。 *(輸入引數才是帶結果的sinK)
正是因為這種操作,才能將sink給包裝起來。

    /**
     * Accepts a {@code Sink} which will receive the results of this operation,
     * and return a {@code Sink} which accepts elements of the input type of
     * this operation and which performs the operation, passing the results to
     * the provided {@code Sink}.
     接受了一個Sink物件,這個物件接受了操作的結果,並且返回了一個Sink,還會執行這個操作,並將這個結果傳遞給所提供的sink。  *(輸入引數才是帶結果的sinK)
     正式因為這種操作,才能將sink給包裝起來。
     *
     * @apiNote
     * The implementation may use the {@code flags} parameter to optimize the
     * sink wrapping.  For example, if the input is already {@code DISTINCT},
     * the implementation for the {@code Stream#distinct()} method could just
     * return the sink it was passed.
     *
     * @param flags The combined stream and operation flags up to, but not
     *        including, this operation
     
     * @param sink sink to which elements should be sent after processing
     * @return a sink which accepts elements, perform the operation upon
     *         each element, and passes the results (if any) to the provided
     *         {@code Sink}.
     引數本身是用來接收結果的,而不是用返回值來返回結果的。
     */
    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
        @Override
        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
            throw new UnsupportedOperationException();
        }

流的特性:惰性求值和延遲求值。

map()方法,包括其他的peek(),filter()等等中間操作的這些方法。只是完成了返回了一個StatelessOp物件。

所以中間操作返回一個終止物件可能執行的StatelessOp,沒有終止操作,所以流不會被處理。

那麼終止操作。我們要去追一下了。


拿程式碼中寫的 forEach()方法開始去追

    // Terminal operations from Stream

    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

這是呼叫了makeRef()方法.方法在ForEachOps類中.

先看ForEachOps類的javadoc

/**
 * Factory for creating instances of {@code TerminalOp} that perform an
 * action for every element of a stream.  Supported variants include unordered
 * traversal (elements are provided to the {@code Consumer} as soon as they are
 * available), and ordered traversal (elements are provided to the
 * {@code Consumer} in encounter order.)
 這是一個工廠,用來建立 TerminalOp 物件,(終止操作。)這個物件會對每一個元素執行一個動作。
 所支援的變化包括:無序的遍歷,有序的遍歷(按照所提供的的順序來遍歷)。
 *
 * <p>Elements are provided to the {@code Consumer} on whatever thread and
 * whatever order they become available.  For ordered traversals, it is
 * guaranteed that processing an element <em>happens-before</em> processing
 * subsequent elements in the encounter order.
 元素被提供被一個任何可用的Consumer隊形。
 處理一個元素,一定是發生在 另外一件事之前 (happens-before)。
也就事 先遇到的元素先處理,後遇到的元素後處理。
 
 *
 * <p>Exceptions occurring as a result of sending an element to the
 * {@code Consumer} will be relayed to the caller and traversal will be
 * prematurely terminated.
 提供了大量的  靜態方法。
 *
 * @since 1.8
 */
final class ForEachOps {
    
}

如makeRef()

   /**
     * Constructs a {@code TerminalOp} that perform an action for every element
     * of a stream.
     *
     * @param action the {@code Consumer} that receives all elements of a
     *        stream
     * @param ordered whether an ordered traversal is requested
     * @param <T> the type of the stream elements
     * @return the {@code TerminalOp} instance
     */
    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }

TerminalOp說明

預設執行的是序列的。

/**
 * An operation in a stream pipeline that takes a stream as input and produces
 * a result or side-effect.  A {@code TerminalOp} has an input type and stream
 * shape, and a result type.  A {@code TerminalOp} also has a set of
 * <em>operation flags</em> that describes how the operation processes elements
 * of the stream (such as short-circuiting or respecting encounter order; see
 * {@link StreamOpFlag}).
 流管道中的一個操作。會接受一個流作為輸入,  產生的結果,是有副作用的(副作用:你傳遞了一個引用,你修改了這個引用)。
 一個 TerminalOp 會有一個輸入型別,和流的shape 和一個結果型別。
 TerminalOp 還會有一個 如何處理流中的元素  的標識。
 TerminalOp 必須要提供一種 序列的和並行的 實現。
 
 *
 * <p>A {@code TerminalOp} must provide a sequential and parallel implementation
 * of the operation relative to a given stream source and set of intermediate
 * operations.
 *
 * @param <E_IN> the type of input elements
 * @param <R>    the type of the result
 * @since 1.8
 */
interface TerminalOp<E_IN, R> {
        /**
     * Gets the shape of the input type of this operation.
     *
     * @implSpec The default returns {@code StreamShape.REFERENCE}.
     *
     * @return StreamShape of the input type of this operation
     */
    default StreamShape inputShape() { return StreamShape.REFERENCE; }

    /**
     * Gets the stream flags of the operation.  Terminal operations may set a
     * limited subset of the stream flags defined in {@link StreamOpFlag}, and
     * these flags are combined with the previously combined stream and
     * intermediate operation flags for the pipeline.
     *
     * @implSpec The default implementation returns zero.
     *
     * @return the stream flags for this operation
     * @see StreamOpFlag
     */
    default int getOpFlags() { return 0; }

    /**
     * Performs a parallel evaluation of the operation using the specified
     * {@code PipelineHelper}, which describes the upstream intermediate
     * operations.
     *
     * @implSpec The default performs a sequential evaluation of the operation
     * using the specified {@code PipelineHelper}.
     *
     * @param helper the pipeline helper
     * @param spliterator the source spliterator
     * @return the result of the evaluation
     */
    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                      Spliterator<P_IN> spliterator) {
        if (Tripwire.ENABLED)
            Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
        return evaluateSequential(helper, spliterator);
    }

    /**
     * Performs a sequential evaluation of the operation using the specified
     * {@code PipelineHelper}, which describes the upstream intermediate
     * operations.
     *
     * @param helper the pipeline helper
     * @param spliterator the source spliterator
     * @return the result of the evaluation
     */
    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
                                Spliterator<P_IN> spliterator);
}

終止操作的實現就4類:

1.find

2.match

3.forEach 遍歷

4.reduce

返回去: forEach()操作就是返回了一個終止操作物件。

然後:evaluate()方法, 執行那個終止操作物件。


    // Terminal evaluation methods

    /**
     * Evaluate the pipeline with a terminal operation to produce a result.
     *
     * @param <R> the type of result
     * @param terminalOp the terminal operation to be applied to the pipeline.
     * @return the result
     */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

PipelineHelper類

用來描述流的各種資訊。

/**
 * Helper class for executing <a href="package-summary.html#StreamOps">
 * stream pipelines</a>, capturing all of the information about a stream
 * pipeline (output shape, intermediate operations, stream flags, parallelism,
 * etc) in one place.
 Helper是一個幫助類,用於執行流管道
 包含流管道的所有資訊:源資料。輸出型別,操作,流標識,並行標記等。
 
 *
 * <p>
 * A {@code PipelineHelper} describes the initial segment of a stream pipeline,
 * including its source, intermediate operations, and may additionally
 * incorporate information about the terminal (or stateful) operation which
 * follows the last intermediate operation described by this
 * {@code PipelineHelper}. The {@code PipelineHelper} is passed to the
 * {@link TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator)},
 * {@link TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator)},
 * and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator,
 * java.util.function.IntFunction)}, methods, which can use the
 * {@code PipelineHelper} to access information about the pipeline such as
 * head shape, stream flags, and size, and use the helper methods
 * such as {@link #wrapAndCopyInto(Sink, Spliterator)},
 * {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute
 * pipeline operations..
 一個流管道的最初的分塊,包含源,中間操作和增加的操作。等
 
 PipelineHelper會被傳遞給。。。。 方法, 就可以通過PipelineHelper來訪問管道的各種資訊。
 
 *
 * @param <P_OUT> type of output elements from the pipeline
 * @since 1.8
 */
abstract class PipelineHelper<P_OUT> {
    ...
}

PipelineHelper類裡的方法:wrapAndCopyInto()

   /**
     * Applies the pipeline stages described by this {@code PipelineHelper} to
     * the provided {@code Spliterator} and send the results to the provided
     * {@code Sink}.
     將呼叫了這個方法的pipeline所描述的管道的各個階段,同時 應用到Spliterator和傳送給Sink物件
     *
     * @implSpec
     * The implementation behaves as if:
     * <pre>{@code
     *     intoWrapped(wrapSink(sink), spliterator);
     * }</pre>
     *
     * @param sink the {@code Sink} to receive the results
     * @param spliterator the spliterator describing the source input to process
     */
    abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);

wrapAndCopyInto具體實現:

    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

Sink中的wrapSink()方法

    /**
     * Takes a {@code Sink} that accepts elements of the output type of the
     * {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts
     * elements of the input type and implements all the intermediate operations
     * described by this {@code PipelineHelper}, delivering the result into the
     * provided {@code Sink}.
     接受了一個Sink, Sink接受了PipelineHelper的所有輸出型別。
     
     *
     * @param sink the {@code Sink} to receive the results
     * @return a {@code Sink} that implements the pipeline stages and sends
     *         results to the provided {@code Sink}
     */
    abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);

wrapSink()方法具體實現 (完成了對於多個流操作的串聯。)

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        //根據depth判斷是否有中間操作。 從後往前的去走。
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

wrapSink()


自我總結:Stream的執行流程。

源資料-中間操作-中間操作-終止操作

1.串聯起來所有的操作。(中間操作 和 終止操作)

2.讓流中的元素,一個一個的執行所含有的所有操作。

最核心的方法:copyInto()中的:spliterator.forEachRemaining(wrappedSink); //最最核心的一步

 @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink); //最最核心的一步
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

wrappedSink : 所有的中間操作,封裝到了這個 sink物件

spliterator:源資料- 執行forEachRemaining 遍歷,執行每一次這過sink物件封裝的操作。

上面是靜態分析(通過原始碼分析)

自行通過動態分析(程式Debug分析)


t通過Debug去跟一遍程式碼。

public class StreamTest3 {
    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello", "world", "welcome");
//        list.stream().map(item->item+"_abc").forEach(System.out::println);

        Stream<String> stream = list.stream(); 
        System.out.println("1");//斷點
        Stream<String> stream1 = stream.map(item -> item + "_abc");
        System.out.println("2");//斷點
        stream1.forEach(System.out::println);
    }
}