Stream 源碼分析(串行流)
阿新 • • 發佈:2018-12-21
消費 扁平化 next prev too inter maker operation input
Stream
支持順序和並行聚合操作的一組元素序列。 1)operations:支持在單個元素上執行的操作,流操作分為中間操作和終止操作 1-1)中間操作: 1-1-1)無狀態:unordered()、filter()、map()、mapToInt()、mapToLong()、mapToDouble、 flatMap()、flatMapToInt()、flatMapToLong()、flatMapToDouble()、 peek() 1-1-2)有狀態:distinct()、sorted()、limit()、skip() 1-2)終止操作: 1-2-1)非短路操作:forEach()、forEachOrdered()、toArray()、min()、max()、count()、 collect()、reduce() 1-2-2)短路操作: findFirst()、findAny()、anyMatch()、noneMatch()、allMatch() 2)stream pipeline:將多個流操作串聯的流管道 流是延遲處理的,直到遇到一個終止操作時,才會觸發流管道計算。 已經執行終止操作的流不能再次觸發計算。
流管道
- 流管道的創建【以 ArrayList 為數據源】:.stream()、parallelStream()
Collection# /** * 返回一個順序流,集合中的元素就是數據源 */ default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } /** * 返回一個並行流,集合中的元素就是數據源 */ default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); } ArrayList# /** * 創建一個延時綁定和快速失敗的分割叠代器 */ @Override public Spliterator<E> spliterator() { return new ArrayListSpliterator(0, -1, 0); } /** 基於索引實現二分、延遲初始化的 Spliterator */ final class ArrayListSpliterator implements Spliterator<E> { /** * 當前索引,在調用 advance/split 時修改 */ private int index; /** * 初始狀態是 -1,使用之後是索引的上界 */ private int fence; /** * 快速失敗計數器 */ private int expectedModCount; /** 創建一個覆蓋給定索引範圍的新 ArrayListSpliterator*/ ArrayListSpliterator(int origin, int fence, int expectedModCount) { this.index = origin; this.fence = fence; this.expectedModCount = expectedModCount; } private int getFence() { int hi; // 第一次使用時初始化為元素個數 if ((hi = fence) < 0) { expectedModCount = modCount; hi = fence = size; } return hi; } /** * 對此 Spliterator 進行拆分,一分為二 */ @Override public ArrayListSpliterator trySplit() { /** * hi:high 索引上界,不包括 * lo:low 索引下界,包括 * mid:middle 二分索引 */ final int hi = getFence(), lo = index, mid = lo + hi >>> 1; // 將範圍分成兩半,直到無法分割為止【高低索引相鄰】 return lo >= mid ? null : // divide range in half unless too small new ArrayListSpliterator(lo, index = mid, expectedModCount); } /** * 如果此 Spliterator 中還有元素可用,則將低索引位的元素傳遞給 action 進行消費 * 同時遞增 index【一次消費一個元素】 */ @Override public boolean tryAdvance(Consumer<? super E> action) { if (action == null) { throw new NullPointerException(); } final int hi = getFence(), i = index; if (i < hi) { index = i + 1; @SuppressWarnings("unchecked") // 讀取元素 final E e = (E)elementData[i]; // 執行消費過程 action.accept(e); if (modCount != expectedModCount) { throw new ConcurrentModificationException(); } return true; } return false; } /** * 一次性消費此 Spliterator 中的所有元素 */ @Override public void forEachRemaining(Consumer<? super E> action) { int i, hi, mc; // hoist accesses and checks from loop Object[] a; if (action == null) { throw new NullPointerException(); } if ((a = elementData) != null) { if ((hi = fence) < 0) { mc = modCount; hi = size; } else { mc = expectedModCount; } // 讀取並更新 index if ((i = index) >= 0 && (index = hi) <= a.length) { // 順序消費 Spliterator 中的所有元素 for (; i < hi; ++i) { @SuppressWarnings("unchecked") final E e = (E) a[i]; action.accept(e); } if (modCount == mc) { return; } } } throw new ConcurrentModificationException(); } /** * 獲取此分割叠代器的估計可用元素數【ArrayListSpliterator 是精確的】 */ @Override public long estimateSize() { return getFence() - index; } /** * 此分割叠代器的特性 */ @Override public int characteristics() { return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED; } } StreamSupport# /** * 基於一個 Spliterator【分割叠代器】創建一個順序或並行的流 */ public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); } StreamOpFlag# /** * 將 Spliterator 的特征值轉換為流的標誌位 */ static int fromCharacteristics(Spliterator<?> spliterator) { // 讀取特征值 final int characteristics = spliterator.characteristics(); if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) { // Do not propagate the SORTED characteristic if it does not correspond to a natural sort order return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED; } else { // 轉換為流標識 return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; } } ReferencePipeline#Head static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * 創建流水線的管道頭 */ Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } } ReferencePipeline# /** * 創建流水線的管道頭 */ ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } AbstractPipeline# abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> { private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; private static final String MSG_CONSUMED = "source already consumed or closed"; /** * 流水線的源階段【即第一個流管道】 */ @SuppressWarnings("rawtypes") private final AbstractPipeline sourceStage; /** * 當前流管道的上一階段,如果是源流,則為 null */ @SuppressWarnings("rawtypes") private final AbstractPipeline previousStage; /** * 此階段操作的操作標識 */ protected final int sourceOrOpFlags; /** * 此流管道的下一階段 */ @SuppressWarnings("rawtypes") private AbstractPipeline nextStage; /** * 順序流:當前階段和源流之間存在的中間階段的個數 * 並行流:上一階段的狀態 */ private int depth; /** * 組合了源流和所有中間階段的流標識和操作標識 */ private int combinedFlags; /** * 源流的分割叠代器,用於產生元素 */ private Spliterator<?> sourceSpliterator; /** * 源流的分割叠代器生成器,如果 sourceSpliterator == null */ private Supplier<? extends Spliterator<?>> sourceSupplier; /** * 此流管道已經被鏈接或消費 */ private boolean linkedOrConsumed; /** * 流水線中存在有狀態的流管道 */ private boolean sourceAnyStateful; /** * 此流管道關閉時的後置操作 */ private Runnable sourceCloseAction; /** * 此流管道是否是並行的 */ private boolean parallel; /** * 流水線頭部管道的構造函數 */ AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = ~(sourceOrOpFlags << 1) & StreamOpFlag.INITIAL_OPS_VALUE; // 源階段的 depth=0 this.depth = 0; this.parallel = parallel; }
- 無狀態流管道的鏈接【以 map 為例】
ReferencePipeline# /** * 基於 mapper 創建一個無狀態的流管道,並將其鏈接到此流管道之後 */ @Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); /** * 新流管道的操作為 mapper * 新流管道的操作標識為 NOT_SORTED、NOT_DISTINCT * 下一階段的操作為 sink【反向鏈接】 */ return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { /** * 接收上遊階段發送的數據 u,並進行當前階段的處理, * 並將結果發送給下遊階段處理 */ downstream.accept(mapper.apply(u)); } }; } }; } /** * 一個無狀態的流管道 */ abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { /** * 將此流管道追加到上遊管道 upstream 之後 */ StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } /** * 此管道的操作是無狀態的 */ @Override final boolean opIsStateful() { return false; } } /** * 將此流管道追加到上遊管道 upstream 之後 */ ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { super(upstream, opFlags); } /** * 將此流管道追加到 previousStage 之後 */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { // 前一階段已經被鏈接或消費,則不允許重復消費 if (previousStage.linkedOrConsumed) { throw new IllegalStateException(MSG_STREAM_LINKED); } // 前一階段已經被鏈接 previousStage.linkedOrConsumed = true; // 設置前一階段的後置階段為當前階段 previousStage.nextStage = this; // 寫入前置階段 this.previousStage = previousStage; // 寫入此階段的操作標識 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; // 合並前一階段和此階段的流操作標識 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); // 寫入管道頭 this.sourceStage = previousStage.sourceStage; // 此階段是否是有狀態操作 if (opIsStateful()) { sourceStage.sourceAnyStateful = true; } // 寫入中間操作計數值(上一階段 + 1) this.depth = previousStage.depth + 1; }
- 有狀態流管道的鏈接【sorted()】
ReferencePipeline#
/**
* 將一個排序的流管道追加到此流管道之後
*/
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
SortedOps#
/**
* 將一個排序管道追加到 upstream 之後
*/
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new OfRef<>(upstream);
}
ReferencePipeline#
/**
* 有狀態的流管道
*/
abstract static class StatefulOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
/**
* 將一個有狀態的流管道追加到 upstream 之後
*/
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
/**
* 此流管道是有狀態的
*/
@Override
final boolean opIsStateful() {
return true;
}
@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
/**
* 用於對引用流進行排序的管道
*/
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
/**
* 是否是自然順序
*/
private final boolean isNaturalSort;
/**
* 排序使用的比較器
*/
private final Comparator<? super T> comparator;
/**
* 使用自然順序排序
*/
OfRef(AbstractPipeline<?, T, ?> upstream) {
super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
this.comparator = comp;
}
/**
* 使用指定的比較器排序
*/
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// 1)如果上遊管道是已排序的,並且是按照自然順序排序的,則此流管道可以忽略
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
return sink;
// 2)如果上遊管道是已知大小的
} else if (StreamOpFlag.SIZED.isKnown(flags)) {
return new SizedRefSortingSink<>(sink, comparator);
// 3)如果上遊管道是未知大小的
} else {
return new RefSortingSink<>(sink, comparator);
}
}
}
SortedOps#
private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
/**
* 排序元素的比較器
*/
protected final Comparator<? super T> comparator;
// 是否取消接收上遊的元素
protected boolean cancellationRequestedCalled;
AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream);
this.comparator = comparator;
}
/**
* 在排序元素被發送到下遊時,能夠保存短路行為【流水線中存在短路操作】
*/
@Override
public final boolean cancellationRequested() {
cancellationRequestedCalled = true;
return false;
}
}
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
// 暫存元素的數組
private T[] array;
// 當前元素偏移
private int offset;
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE) {
throw new IllegalArgumentException(Nodes.BAD_SIZE);
}
// 創建固定大小的對象數組,用於接收上遊發送的元素
array = (T[]) new Object[(int) size];
}
/**
* 上遊元素發送完畢,開始執行排序操作,並將排序後的元素發送到下遊
*/
@Override
public void end() {
// 執行元素排序
Arrays.sort(array, 0, offset, comparator);
// 發送通知給下遊管道,準備接收數據
downstream.begin(offset);
// 1)當前管道的下遊不存在短路操作
if (!cancellationRequestedCalled) {
// 順序發送所有元素
for (int i = 0; i < offset; i++) {
downstream.accept(array[i]);
}
// 2)當前管道的下遊存在短路操作
} else {
// 先發送一個元素,之後每次發送前都詢問下遊是否繼續接收,下遊拒絕接收元素則退出循環
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
downstream.accept(array[i]);
}
}
// 通知下遊管道,數據發送完畢
downstream.end();
// 回收數組
array = null;
}
/**
* 接收上遊發送的單個元素
*/
@Override
public void accept(T t) {
array[offset++] = t;
}
}
/**
* 能夠消費上遊管道發送的元素,同時存儲狀態的 Sink
*/
interface Sink<T> extends Consumer<T> {
/**
* 通知下遊管道,重置狀態以接收新的數據集
*/
default void begin(long size) {}
/**
* 通知下遊管道,數據已經推送完畢,可以執行聚合處理
*/
default void end() {}
/**
* 詢問下遊管道是否還需要繼續推送數據,適用於短路操作
*/
default boolean cancellationRequested() {
return false;
}
}
Sink#ChainedReference
/**
* 鏈式引用 sink
*/
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
// 下遊 sink
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
- 鏈接終端操作執行計算【forEach()】
ReferencePipeline#
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
ForEachOps#
/**
* 創建一個 TerminalOp,遍歷並處理流中的每個引用對象
*/
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
// 遍歷是否是有序的
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// 獲取此操作的操作標識
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
/**
* 將此終端操作鏈接到流管道尾部,並將 spliterator 中的每個元素都發送到 sink 中
*/
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
/**
* 並行評估 spliterator 中的元素
*/
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered) {
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
} else {
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
}
return null;
}
static final class OfRef<T> extends ForEachOp<T> {
/**
* 實際消費者
*/
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
/**
* 處理上遊發送的單個元素
*/
@Override
public void accept(T t) {
consumer.accept(t);
}
}
}
AbstractPipeline#
/**
* 使用終端操作 terminalOp 對此流管道進行處理,處理過程中會從後往前鏈接形成流水線
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
// 此階段的輸出類型==終端操作的輸入類型
assert getOutputShape() == terminalOp.inputShape();
// 不允許重復消費
if (linkedOrConsumed) {
throw new IllegalStateException(MSG_STREAM_LINKED);
}
// 設置已消費標識
linkedOrConsumed = true;
// 使用終端操作並行或串行處理此流管道
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/**
* 獲取此階段的源分割叠代器【數據源】
*
* created by ZXD at 19 Dec 2018 T 22:32:09
* @param terminalFlags 終端操作標識
* @return
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
// 1)源分割叠代器不為 null
if (sourceStage.sourceSpliterator != null) {
// 讀取
spliterator = sourceStage.sourceSpliterator;
// 使用後置空
sourceStage.sourceSpliterator = null;
}
// 2)分割叠代器通過 sourceSupplier 進行生成
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
// 此流是並行的 && 流管道中存在有狀態操作
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
/**
* 從源階段開始處理,一直處理到當前階段為止
*/
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
// 當前處理階段是有狀態操作
if (p.opIsStateful()) {
depth = 0;
// 當前操作是短路操作
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage based on the stage‘s spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
: thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
// 終端操作帶有標識位
if (terminalFlags != 0) {
// 將終端操作的標誌位合並到最後一階段中
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
/**
* @param sink 下遊管道操作,中間操作或終端操作
* @param spliterator 分割叠代器
* @return
*/
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
/**
* 從當前流管道開始,向前構建流水線直到第一個流管道為止,流水線後置操作為 sink
*/
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
/**
* AbstractPipeline.this:當前流管道
* p.depth:當前流管道距離管道頭的距離
* p.previousStage:前置流管道
*/
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
// 返回鏈接後的 sink
return (Sink<P_IN>) sink;
}
/**
* 將分割叠代器中的元素順序發送到流水線中處理
*
* @param wrappedSink 鏈接後的流水線
* @param spliterator 數據源
*/
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
// 1)流水線中不存在短路操作
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
// 通知 wrappedSink 處理元素的個數
wrappedSink.begin(spliterator.getExactSizeIfKnown());
// 使用 wrappedSink 依次處理 spliterator 中的所有元素
spliterator.forEachRemaining(wrappedSink);
// 通知 wrappedSink 元素發送完畢,可以執行後置操作
wrappedSink.end();
}
// 2)流水線中存在短路操作
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
- Spliterator 及其特征值
/**
* 用於劃分和遍歷數據源的對象,數據源可以是 array、Collection、IO channel、生成器等。
*/
public interface Spliterator<T> {
/**
* 嘗試使用 action 處理 Spliterator 中的一個元素
*/
boolean tryAdvance(Consumer<? super T> action);
/**
* 嘗試使用 action 一次性處理 Spliterator 中的所有元素
*/
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
/**
* 對此 Spliterator 進行拆分
*/
Spliterator<T> trySplit();
/**
* 獲取此 Spliterator 的估計元素數,如果數據源是無限的,則返回 -1
*/
long estimateSize();
/**
* 嘗試獲取此 Spliterator 的精確元素個數
*/
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
/**
* 讀取此 Spliterator 的特征值
*/
int characteristics();
/**
* Spliterator 中的元素是順序處理的
*/
public static final int ORDERED = 0x00000010;
/**
* Spliterator 中的元素是唯一的
*/
public static final int DISTINCT = 0x00000001;
/**
* Spliterator 中的元素根據自然順序或比較器進行過排序
*/
public static final int SORTED = 0x00000004;
/**
* Spliterator 中的元素個數是有限的
*/
public static final int SIZED = 0x00000040;
/**
* Spliterator 中的元素是非 null 的
*/
public static final int NONNULL = 0x00000100;
/**
* Spliterator 關聯的數據源是不可變的,不支持增加、替換、刪除等
*/
public static final int IMMUTABLE = 0x00000400;
/**
* Spliterator 關聯的數據源支持並發修改
*/
public static final int CONCURRENT = 0x00001000;
/**
* 此 Spliterator 通過 trySplit() 方法生成的子 Spliterator 是有限大小的
*/
public static final int SUBSIZED = 0x00004000;
}
- 流管道和操作標識
StreamOpFlag#
/**
* 流管道中的元素是唯一的
*/
// 0, 0x00000001
// Matches Spliterator.DISTINCT
DISTINCT(0,
set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
/**
* 流管道中的元素是排過序的【有狀態操作】
*/
// 1, 0x00000004
// Matches Spliterator.SORTED
SORTED(1,
set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
/**
* 流管道中的元素是順序處理的
*/
// 2, 0x00000010
// Matches Spliterator.ORDERED
ORDERED(2,
set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP)
.clear(Type.UPSTREAM_TERMINAL_OP)),
/**
* 流管道的大小是有限的【非無限流】
*/
// 3, 0x00000040
// Matches Spliterator.SIZED
SIZED(3,
set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),
/**
* 中間操作或終端操作是短路的
*/
// 12, 0x01000000
SHORT_CIRCUIT(12,
set(Type.OP).set(Type.TERMINAL_OP));
- 流管道中的元素類型
enum StreamShape {
/**
* 流元素是對象引用
*/
REFERENCE,
/**
* 流元素是 int 值
*/
INT_VALUE,
/**
* 流元素是 long 值
*/
LONG_VALUE,
/**
* 流元素是 double 值
*/
DOUBLE_VALUE
}
無狀態中間操作
- filter:使用指定的函數式斷言過濾流中的元素
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
/**
* 根據目標 predicate 對上遊管道發送的數據進行過濾,
* 只將滿足斷言的對象發送到下遊
*/
if (predicate.test(u)) {
downstream.accept(u);
}
}
};
}
};
}
- map:將上遊管道發送的數據進行映射處理後,再發送到下遊
/**
* 基於 mapper 創建一個無狀態的流管道,並將其鏈接到此流管道之後
*/
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
/**
* 新流管道的操作為 mapper
* 新流管道的操作標識為 NOT_SORTED、NOT_DISTINCT
* 下一階段的操作為 sink【反向鏈接】
*/
return new StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void accept(P_OUT u) {
/**
* 接收上遊階段發送的數據 u,並進行當前階段的處理,
* 並將結果發送給下遊階段處理
*/
downstream.accept(mapper.apply(u));
}
};
}
};
}
- flatMap:流的扁平化
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
/**
* 通過 mapper 函數將上遊元素映射成一個新的流,並將流中的元素順序發送到下遊
*/
try (Stream<? extends R> result = mapper.apply(u)) {
// 映射結果不為 null 時,將新流中的元素發送到下遊
if (result != null) {
// 1)下遊操作是非短路的
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
// 2)下遊操作是短路操作,則每次發送元素前都先詢問下遊是否需要繼續接收
else {
final var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
}
- peek:查看上遊發送的元素
@Override
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
Objects.requireNonNull(action);
return new StatelessOp<>(this, StreamShape.REFERENCE,
0) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void accept(P_OUT u) {
/**
* 先調用目標接口進行消費,之後再將該元素發送到下遊,
* 可以查看流的具體處理過程,主要用於調試
*/
action.accept(u);
downstream.accept(u);
}
};
}
};
}
有狀態的中間操作
- distinct:將流中的元素去重
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
DistinctOps#
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
// StreamOpFlag.IS_DISTINCT 新管道產生的元素是唯一的
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// 1)如果上遊管道已經是 distinct 則此階段無序任何處理。
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
return sink;
// 2)上遊管道是已排序的
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
return new Sink.ChainedReference<T, T>(sink) {
boolean seenNull;
// 最近發送的元素
T lastSeen;
@Override
public void begin(long size) {
seenNull = false;
lastSeen = null;
downstream.begin(-1);
}
@Override
public void end() {
seenNull = false;
lastSeen = null;
downstream.end();
}
@Override
public void accept(T t) {
// 1)上遊發送的元素為 null
if (t == null) {
if (!seenNull) {
seenNull = true;
downstream.accept(lastSeen = null);
}
/**
* 2)上遊發送的元素不為 null
* lastSeen == null,當前元素是第一個元素
* !t.equals(lastSeen),上次發送的元素和當前元素不一致
*/
} else if (lastSeen == null || !t.equals(lastSeen)) {
downstream.accept(lastSeen = t);
}
}
};
// 3)上遊管道是未排序的
} else {
return new Sink.ChainedReference<T, T>(sink) {
// 存放上遊發送的唯一元素
Set<T> seen;
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
@Override
public void end() {
seen = null;
downstream.end();
}
@Override
public void accept(T t) {
// 已接受元素中不存在此元素 t
if (!seen.contains(t)) {
// 將其加入已發送唯一元素集合
seen.add(t);
// 將此元素發送到下遊
downstream.accept(t);
}
}
};
}
}
};
}
- sorted:新管道產生的元素是已排序的
/**
* 將一個排序的流管道追加到此流管道之後
*/
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
SortedOps#
/**
* 將一個排序管道追加到 upstream 之後
*/
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
return new OfRef<>(upstream);
}
/**
* 用於對引用流進行排序的管道
*/
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
/**
* 是否是自然順序
*/
private final boolean isNaturalSort;
/**
* 排序使用的比較器
*/
private final Comparator<? super T> comparator;
/**
* 使用自然順序排序
*/
OfRef(AbstractPipeline<?, T, ?> upstream) {
super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
this.isNaturalSort = true;
final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
this.comparator = comp;
}
/**
* 使用指定的比較器排序
*/
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// 1)如果上遊管道是已排序的,並且是按照自然順序排序的,則此流管道可以忽略
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
return sink;
// 2)如果上遊管道是已知大小的
} else if (StreamOpFlag.SIZED.isKnown(flags)) {
return new SizedRefSortingSink<>(sink, comparator);
// 3)如果上遊管道是未知大小的
} else {
return new RefSortingSink<>(sink, comparator);
}
}
}
- skip:忽略上遊管道發送的前 n 個元素
@Override
public final Stream<P_OUT> skip(long n) {
if (n < 0) {
throw new IllegalArgumentException(Long.toString(n));
}
if (n == 0) {
return this;
} else {
return SliceOps.makeRef(this, n, -1);
}
}
SliceOps#
/**
* @param upstream 上遊管道
* @param skip 需要跳過的元素個數
* @param limit 限制接受的元素個數
*/
public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
long skip, long limit) {
if (skip < 0) {
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
}
return new ReferencePipeline.StatefulOp<>(upstream, StreamShape.REFERENCE,
flags(limit)) {
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<>(sink) {
// 需要跳過的前 n 個元素
long n = skip;
// 只需要獲取 m 個元素
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
// 已經不需要跳過元素
if (n == 0) {
// 下遊需要接受的元素個數 > 0
if (m > 0) {
// 遞減接收個數
m--;
// 將當前元素發送給下遊管道
downstream.accept(t);
}
}
// 跳過當前元素,並遞減跳過數
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
// m == 0 表示此管道將不會發送元素到下遊 || 下遊拒絕接收元素
return m == 0 || downstream.cancellationRequested();
}
};
}
};
}
- limit:只接受上遊管道發送的前 maxSize 個元素
@Override
public final Stream<P_OUT> limit(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException(Long.toString(maxSize));
}
return SliceOps.makeRef(this, 0, maxSize);
}
非短路的終端操作
- forEach:使用函數式接口 action 消費流水線生產的所有元素
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
ForEachOps#
/**
* 創建一個 TerminalOp,遍歷並處理流中的每個引用對象
*/
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
// 遍歷是否是有序的
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// 獲取此操作的操作標識
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
/**
* 將此終端操作鏈接到流管道尾部,並將 spliterator 中的每個元素都發送到 sink 中
*/
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
static final class OfRef<T> extends ForEachOp<T> {
/**
* 實際消費者
*/
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
/**
* 處理上遊發送的單個元素
*/
@Override
public void accept(T t) {
consumer.accept(t);
}
}
}
- forEachOrdered:使用函數式接口 action 順序消費流水線生產的所有元素
@Override
public void forEachOrdered(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, true));
}
短路的終端操作
- anyMatch:上遊管道發送的元素中至少有一個滿足函數式斷言 predicate 時返回 true
@Override
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
MatchOps#
enum MatchKind {
/** 是否所有的元素都滿足指定的斷言 */
ANY(true, true),
/** 是否至少有一個元素滿足指定的斷言 */
ALL(false, false),
/** 是否所有的元素都不滿足指定的斷言 */
NONE(true, false);
/**
* 是否需要在滿足斷言時停止接收上遊元素
*/
private final boolean stopOnPredicateMatches;
/**
* 操作被短路時的返回結果
*/
private final boolean shortCircuitResult;
private MatchKind(boolean stopOnPredicateMatches,
boolean shortCircuitResult) {
this.stopOnPredicateMatches = stopOnPredicateMatches;
this.shortCircuitResult = shortCircuitResult;
}
}
public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<T> {
MatchSink() {
super(matchKind);
}
@Override
public void accept(T t) {
/**
* 當前管道還能繼續接收元素 && 當前元素匹配停止條件
*/
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
// 停止接收元素
stop = true;
// 寫入結果值
value = matchKind.shortCircuitResult;
}
}
}
return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);
}
private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
/**
* 上遊發送的元素類型
*/
private final StreamShape inputShape;
/**
* 匹配類型
*/
final MatchKind matchKind;
/**
* sink 生成器
*/
final Supplier<BooleanTerminalSink<T>> sinkSupplier;
MatchOp(StreamShape shape,
MatchKind matchKind,
Supplier<BooleanTerminalSink<T>> sinkSupplier) {
this.inputShape = shape;
this.matchKind = matchKind;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
// 當前管道是短路的 && 未排序的
return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
}
@Override
public StreamShape inputShape() {
return inputShape;
}
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
}
/**
* 避免返回值的裝箱而定義的 BooleanTerminalSink
*/
private abstract static class BooleanTerminalSink<T> implements Sink<T> {
/**
* 是否停止接收
*/
boolean stop;
/**
* 返回結果值
*/
boolean value;
BooleanTerminalSink(MatchKind matchKind) {
value = !matchKind.shortCircuitResult;
}
/**
* 情況狀態並返回結果值
*/
public boolean getAndClearState() {
return value;
}
/**
* 是否停止接收上遊元素
*/
@Override
public boolean cancellationRequested() {
return stop;
}
}
- allMatch:上遊管道發送的所有元素都滿足函數式斷言 predicate 時返回 true
@Override
public final boolean allMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
}
- noneMatch:上遊管道發送的元素沒有一個滿足函數式斷言 predicate 時返回 true
@Override
public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
}
- findFirst:獲取上遊管道發送的第一個元素
@Override
public final Optional<P_OUT> findFirst() {
return evaluate(FindOps.makeRef(true));
}
FindOps#
/**
* @param mustFindFirst 是否必須是第一個元素
*/
@SuppressWarnings("unchecked")
public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
return (TerminalOp<T, Optional<T>>)
(mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY);
}
private abstract static class FindSink<T, O> implements TerminalSink<T, O> {
/**
* 是否已經找到值了
*/
boolean hasValue;
/**
* 結果值
*/
T value;
FindSink() {} // Avoid creation of special accessor
@Override
public void accept(T value) {
// 當前元素是第一個元素
if (!hasValue) {
// 已經找到值
hasValue = true;
// 寫入目標值
this.value = value;
}
}
/**
* 一旦找到值,就拒絕接收上遊元素
*/
@Override
public boolean cancellationRequested() {
return hasValue;
}
static final class OfRef<T> extends FindSink<T, Optional<T>> {
/**
* 獲取結果值
*/
@Override
public Optional<T> get() {
return hasValue ? Optional.of(value) : null;
}
static final TerminalOp<?, ?> OP_FIND_FIRST = new FindOp<>(true,
StreamShape.REFERENCE, Optional.empty(),
Optional::isPresent, FindSink.OfRef::new);
static final TerminalOp<?, ?> OP_FIND_ANY = new FindOp<>(false,
StreamShape.REFERENCE, Optional.empty(),
Optional::isPresent, FindSink.OfRef::new);
}
}
private static final class FindOp<T, O> implements TerminalOp<T, O> {
/**
* 上遊發送的元素類型
*/
private final StreamShape shape;
/**
* 此操作的標識
*/
final int opFlags;
/**
* 未找到值時的返回值
*/
final O emptyValue;
/**
* 查找斷言
*/
final Predicate<O> presentPredicate;
/**
* sink 生成器
*/
final Supplier<TerminalSink<T, O>> sinkSupplier;
FindOp(boolean mustFindFirst,
StreamShape shape,
O emptyValue,
Predicate<O> presentPredicate,
Supplier<TerminalSink<T, O>> sinkSupplier) {
this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
this.shape = shape;
this.emptyValue = emptyValue;
this.presentPredicate = presentPredicate;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return opFlags;
}
@Override
public StreamShape inputShape() {
return shape;
}
@Override
public <S> O evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// 使用 sink 順序評估流水線產生的元素,並返回查找結果
final O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
// 找到目標值,則返回它;否則返回 emptyValue
return result != null ? result : emptyValue;
}
}
- findAny:獲取上遊管道發送的任意一個元素【串行流取的是第一個元素】
@Override
public final Optional<P_OUT> findAny() {
return evaluate(FindOps.makeRef(false));
}
Stream 源碼分析(串行流)