java stream 原理
阿新 • • 發佈:2018-04-03
eva repl 一段時間 imp pipeline array chain per static
java stream 原理
需求
從"Apple" "Bug" "ABC" "Dog"中選出以A開頭的名字,然後從中選出最長的一個,並輸出其長度
1. 最直白的實現
缺點
- 叠代次數過多
- 頻繁產生中間結果,性能無法接受
2. 平常寫法
int longest = 0;
for(String str : strings){
if(str.startsWith("A")){// 1. filter(), 保留以張開頭的字符串
int len = str.length();// 2. mapToInt(), 轉換成長度
longest = Math.max(len, longest);// 3. max(), 保留最長的長度
}
}
System.out.println(longest);
缺點
- 具體業務與算法混在一起,不利於代碼復用
- 耦合性太強,代碼不清晰
3. 責任鏈模式解耦
public interface Chain {
void proceed(Object object);
}
public class ForChain implements Chain {
private final Chain chain;
public ForChain(Chain chain){
this.chain = chain;
}
@Override
public void proceed(Object object) {
List<String> list = (List<String>) object;
for(String a : list){
if(a.startsWith("A"))
chain.proceed(a);
}
}
}
public class LengthChain implements Chain {
private final Chain chain;
public LengthChain(Chain chain){
this.chain = chain;
}
@Override
public void proceed(Object object) {
String string = (String)object;
chain.proceed(string.length());
}
}
public class ResultChain implements Chain {
private Integer result = 0;
@Override
public void proceed(Object object) {
Integer integer = (Integer) object;
result = Math.max(integer,result);
}
public Integer getResult() {
return result;
}
}
public class Client {
public static void main(String[] args) {
ResultChain resultChain = new ResultChain();
LengthChain lengthChain = new LengthChain(resultChain);
ForChain forChain = new ForChain(lengthChain);
List<String> list = Arrays.asList("Apple","Bug","ABC","Dog");
forChain.proceed(list);
System.out.println("result is "+ resultChain.getResult());
}
}
4. java stream 實現
OptionalInt max = Stream.of("Apple", "Bug", "ABC", "Dog").
filter(e -> e.startsWith("A")).
mapToInt(e -> e.length()).
max();
System.out.println("result is "+ max.getAsInt());
優點
- 開發者是需要關註具體的業務,頂層算法都封裝在框架中
- 代碼結構清晰,代碼量少,減少出錯的機會
5. Stream 的原理
5.1 stream與集合比較
盡管stream與集合框架在表現上非常相似,二者都是對數據進行處理,但事實上二者完全不同。集合是一種數據結構,主要關註在內存中組織數據,會在一段時間在內存中持續的存在,而流的主要關註在計算,不為數據提供任何存儲空間,只會通過管道提供計算結果。
5.2 stream 操作分類
中間操作:返回一個新的stream
- 有狀態:必須等上一步操作完,才能執行下一步操作
- 無狀態:該操作不受上一步操作的影響
終止操作:返回結果
- 短路:找到即返回
- 費短路:遍歷所有元素
以上操作決定了Stream一定是先構建完畢再執行的特點,也就是延遲執行,當需要結果(終端操作時)開始執行流水線。
5.3 stream 結構示意圖
5.4 操作如何記錄
- Head記錄起始操作
- StateLessOp記錄中間操作
- StatefulOp記錄有狀態的中間操作
這三個操作,在實例化的時候回指向前一個操作,和後一個操作,形成雙向鏈表,每一步操作都能得知上一步和下一步操作。
對於Head:
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
對於其他操作:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this; // 構造雙向鏈表
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
例子:
data.stream()
.filter(x -> x.length() == 2)
.map(x -> x.replace(“三”,”五”))
.sorted()
.filter(x -> x.contains(“五”))
.forEach(System.out::println);
Stage
5.5 操作如何疊加
從終止操作依次構造Sink,如此Sink鏈構造完成
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
// 依次構造sink
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
sink
- 依次調用sink的begin方法,通知sink鏈數據已準備好
- 依次調用sink的accept方法,處理數據
依次調用sink的end方法,通知數據處理完畢
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
5.6 如何收集結果
對於forEach是不需要收集結果的,對於collect結果保存在最後一個sink中,這樣的操作都會提供一個get方法取出數據。終止操作都會實現Supplier的get方法
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
java stream 原理