1. 程式人生 > >java stream 原理

java stream 原理

eva repl 一段時間 imp pipeline array chain per static

java stream 原理

需求
從"Apple" "Bug" "ABC" "Dog"中選出以A開頭的名字,然後從中選出最長的一個,並輸出其長度

1. 最直白的實現

技術分享圖片

缺點
  1. 叠代次數過多
  2. 頻繁產生中間結果,性能無法接受

2. 平常寫法

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以張開頭的字符串
        int len = str.length();// 2. mapToInt(), 轉換成長度
        longest = Math.max(len, longest);// 3. max(), 保留最長的長度
} } System.out.println(longest);
缺點
  1. 具體業務與算法混在一起,不利於代碼復用
  2. 耦合性太強,代碼不清晰

3. 責任鏈模式解耦

public interface Chain {
    void proceed(Object object);
}
public class ForChain implements Chain {

    private final Chain chain;

    public ForChain(Chain chain){
        this.chain = chain;
    }

    @Override
public void proceed(Object object) { List<String> list = (List<String>) object; for(String a : list){ if(a.startsWith("A")) chain.proceed(a); } } }
public class LengthChain implements Chain {
    private final Chain chain;

    public
LengthChain(Chain chain){ this.chain = chain; } @Override public void proceed(Object object) { String string = (String)object; chain.proceed(string.length()); } }
public class ResultChain implements Chain {

    private Integer result = 0;
    @Override
    public void proceed(Object object) {
        Integer integer = (Integer) object;
        result = Math.max(integer,result);
    }

    public Integer getResult() {
        return result;
    }
}
public class Client {

    public static void main(String[] args) {
        ResultChain resultChain = new ResultChain();
        LengthChain lengthChain = new LengthChain(resultChain);
        ForChain forChain = new ForChain(lengthChain);
        List<String> list = Arrays.asList("Apple","Bug","ABC","Dog");
        forChain.proceed(list);
        System.out.println("result is "+ resultChain.getResult());
    }
}

4. java stream 實現

OptionalInt max = Stream.of("Apple", "Bug", "ABC", "Dog").
     filter(e -> e.startsWith("A")).
     mapToInt(e -> e.length()).
     max();
System.out.println("result is "+ max.getAsInt());
優點
  1. 開發者是需要關註具體的業務,頂層算法都封裝在框架中
  2. 代碼結構清晰,代碼量少,減少出錯的機會

5. Stream 的原理

5.1 stream與集合比較

盡管stream與集合框架在表現上非常相似,二者都是對數據進行處理,但事實上二者完全不同。集合是一種數據結構,主要關註在內存中組織數據,會在一段時間在內存中持續的存在,而流的主要關註在計算,不為數據提供任何存儲空間,只會通過管道提供計算結果。

5.2 stream 操作分類

技術分享圖片

中間操作:返回一個新的stream

  • 有狀態:必須等上一步操作完,才能執行下一步操作
  • 無狀態:該操作不受上一步操作的影響

終止操作:返回結果

  • 短路:找到即返回
  • 費短路:遍歷所有元素

以上操作決定了Stream一定是先構建完畢再執行的特點,也就是延遲執行,當需要結果(終端操作時)開始執行流水線。

5.3 stream 結構示意圖

技術分享圖片

5.4 操作如何記錄
  • Head記錄起始操作
  • StateLessOp記錄中間操作
  • StatefulOp記錄有狀態的中間操作

這三個操作,在實例化的時候回指向前一個操作,和後一個操作,形成雙向鏈表,每一步操作都能得知上一步和下一步操作。

對於Head:

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

對於其他操作:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this; // 構造雙向鏈表
    this.previousStage = previousStage;

    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

例子:

data.stream()
.filter(x -> x.length() == 2)
.map(x -> x.replace(“三”,”五”))
.sorted()
.filter(x -> x.contains(“五”))
.forEach(System.out::println);

Stage
技術分享圖片

5.5 操作如何疊加

從終止操作依次構造Sink,如此Sink鏈構造完成

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
     Objects.requireNonNull(sink);

     // 依次構造sink
     for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
         sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
     }
     return (Sink<P_IN>) sink;
 }

sink
技術分享圖片

  1. 依次調用sink的begin方法,通知sink鏈數據已準備好
  2. 依次調用sink的accept方法,處理數據
  3. 依次調用sink的end方法,通知數據處理完畢

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
    
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
    }

技術分享圖片

5.6 如何收集結果

對於forEach是不需要收集結果的,對於collect結果保存在最後一個sink中,這樣的操作都會提供一個get方法取出數據。終止操作都會實現Supplier的get方法

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }

java stream 原理