java8 Stream之原理
Stream
java8的Stream很重要,spring-reactor裡面用到reactor-core,而java8的stream與之很相似,搞懂了再看reactor-core必定事半功倍。
先看一下它的強大,這裡只是冰山一角:
從List<Student> 列表中取出name,將name組成一個List。
老程式碼
List<String> nameList = new ArrayList();
if(null != list){
for(Student stu : list){
nameList.add(stu.getName());
}
}
JAVA8
List<String> nameList = Optional.ofNullable(list).orElse(Collections.emptyList()).stream()
.map(Stu::getName).collect(Collectors.toList());
Stream.of 建立Stream
這裡給大家演示一下通過Stream.of建立Stream。
常見的集合通過stream()方法都可以建立Stream。
其實他們最終都是呼叫以下方法建立的。
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
Stream.of有兩種建立Stream的方法。
第一種
Stream.of("a1")
第二種
Stream.of("a1","a2"); //這種通過Arrays.stream 構建
這裡介紹兩個相關的類:
如果是單個元素,直接使用Spliterator進行構建。
如果是多個元素,會有一個優化,使用SpineBuffer構建。
如果是大陣列,使用SpineBuffer,小陣列是使用ArrayList。
如何使用SpineBuffer構建?
Stream.builder().add("a1").add("a2").build();
Stream 相關概念
stream的操作分為兩種:
一種是中間操作,就是不需要結果,只需要記錄這個過程,一般返回Stream物件都是屬於這種
一種是終極操作,就是立即需要返回結果,一般返回非Stream物件,都是屬於這種。
stream的狀態分為三種:
第一種:Head,第一次建立的時候就是這種
第二種:Stateless,無狀態,每個物件的操作是獨立的。
第三種:Stateful,有狀態,需要聯合多個象才能得出結果。
stream操作特性:
操作特性是指:該stream有固定大小,大小不固定,操作有序,資料有序等。
Stream.filter
顧名思義:對 Stream進行filter,然後返回新的Stream。
由前一節我們知道,stream的具體資料儲存在Spliterator中。而它本身可以理解為只是一個演算法。
filter只是一箇中間操作,我們只需要記錄這一個過程就OK了。然後返回新的Stream。如果再次呼叫fileter,會再次返回一個新Stream。
上面是一個流程圖,Sink是包裝運算元的一個類,比如呼叫filter,從Head裡面拿到物件,經過第一個Sink,再經過第二個Sink的運算,最終得到結果。
下面是Strea.filter的原始碼實現:
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
//如果通過當前filter,就進入下一個運算元
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
Stream.peek
這個方法可以理解為除錯方法,它不對結果產生任何影響,將資料原封不動的傳給下一個運算元
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
Objects.requireNonNull(action);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
0) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void accept(P_OUT u) {
action.accept(u);
downstream.accept(u);
}
};
}
};
}
Stream.flatMap
運算元應該是通過一個物件映身成一個Stream,然後呼叫foreach,將每個元素傳遞到下一個運算元。
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
Stream.map
與上面的類似,只是對映成另一個物件
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
Stream.limit
這是一個有狀態的操作,因為它返回一定資料的資料組成的Stream。 這裡只貼一段核心演算法:
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}
Stream.skip
這個與Stram.limit類似,兩個聯合起來就可以分面查詢了。
Stream.sorted
排序,如果沒傳比較器就用預設的。
如果有順序,就不用排序了,如果給定大小了就用一個固定大小的陣列來排序,否則用一個列來來排序。
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
通過排序,分頁,說明這個運算元需要支援開始,結束方法。還需要一個取消方法,為什麼了,比如第一個Stream有20個物件,但是後面只需要第一個,所以我第一個運算元給到你一個數據時,第一個運算元就需要終止了。
Stream.anyMatch
下面看一個anyMatch是怎麼實現的。
@Override
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
第二步,主要是用當前stream,和原始的資料容器spliterator
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
第三步,最後一個運算元和原始容器
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
第四步 包裝運算元
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
第五步 資料傳遞
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
//滿足要求後,是否需要停止計算
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
//需要停止計算
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Stream.spliterator
只需要一個Sink,然後呼叫wrapSink,再copyInto就可以實現了
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel) {
return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParal