1. 程式人生 > >JAVA8 stream 中Spliterator的使用(一)

JAVA8 stream 中Spliterator的使用(一)

java8 stream大家用的比較多,但是發現,其實stream的底層建構函式中,還需要傳入Spliterator。查了一下,竟然發現網上對這個類講的幾乎沒有。唯一一篇文章寫得Spliterator使用有問題的,其實他的並行流是沒有用到的。因為

for (int pos = currentSize/2 + currentSize; pos < str.length(); pos++){
.....

這段邏輯沒有執行 , pos < str.length() 為false
因此,這裡面的Spliterator返回的是null,返回null說明不用進行分割,因此原文中的程式碼也就是單執行緒的,並沒有用到多執行緒。

以此問中的例子,給出兩種寫法,兩種寫法,都可以使用到多執行緒。

寫stream的程式碼越複雜,對技術的要求其實是越高的。需要對遞迴,分治有一定的理解,不然無法有效的進行stream程式碼的debug。

來看下Spliterator介面方法,需要實現以下一些方法:

  • boolean tryAdvance(Consumer action); 該方法會處理每個元素,如果沒有元素處理,則應該返回false,否則返回true。
  • default void forEachRemaining(Consumer action) 該方法有預設實現,功能後面會介紹。
  • Spliterator trySplit();
    將一個Spliterator分割成多個Spliterator。分割的Spliterator被用於每個子執行緒進行處理,從而達到併發處理的效果。
  • long estimateSize(); 該方法返回值並不會對程式碼正確性產生影響,但是會影響程式碼的執行執行緒數,後續會介紹一下
  • int characteristics(); 給出stream流具有的特性,不同的特性,不僅是會對流的計算有優化作用,更可能對計算結果會產生影響,後續會稍作介紹。
  • default Comparator getComparator() 對sorted的流,給出比較器。後續給出研究程式碼。

1.先來看下forEachRemaining的實現。

    default void forEachRemaining(Consumer&amp;amp;lt;? super T&amp;amp;gt; action) {
        do { } while (tryAdvance(action));
    }

該方法迴圈遍歷呼叫tryAdvance方法,直到返回false。因為tryAdvance是必須實現的方法,因此重寫forEachRemaining
只有對優化程式碼有作用,無法做到不寫tryAdvance方法實現。
2.estimateSize的使用場景場景:
在estimateSize處打斷點,跟蹤執行緒棧資訊,可以看出estimateSize在這裡用到:

java.util.stream.AbstractTask&amp;amp;lt;P_IN, P_OUT, R, K&amp;amp;gt;的
    public void compute() {
        Spliterator&amp;amp;lt;P_IN&amp;amp;gt; rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings(&amp;amp;quot;unchecked&amp;amp;quot;) K task = (K) this;
        while (sizeEstimate &amp;lt; sizeThreshold &amp;amp; (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {

如果sizeEstimate < sizeThreshold, 則執行緒是不會再呼叫trySplit()方法,則就不會再細分子執行緒了。
可以將estimateSize返回結果固定為1,將只會用到主執行緒在跑任務,沒有子執行緒。

當Spliterator的trySplit返回null的時候,說明當前這段分割不能再進行分割了,就會呼叫到
forEachRemaining方法。

仿照oricle原始碼示例,即可寫出示例程式碼。該示例程式碼較好,原因是通過f/j的程式碼,大致闡述了
stream底層使用Spliterator的方式,是如何使用Spliterator中各個介面的。

這裡面需要說明的是,tryAdvance方法中的Consumer.accept呼叫,最終將呼叫到reduce 操作的ccumulate方法。
也就是說,我們看到Consumer.accept返回一個void,其實就是對變數T做一個操作。這個操作將直接影響到stream的內部狀態,但是不會有返回值。

附上分別兩種方式實現的原始碼:

公用類:
public class NumCounter {
    private int num;
    private int sum;
    // 是否當前是個完整的數字
    private boolean isWholeNum;

    public NumCounter(int num, int sum, boolean isWholeNum) {
        this.num = num;
        this.sum = sum;
        this.isWholeNum = isWholeNum;
    }

    public NumCounter accumulate(Character c){
        System.out.println(Thread.currentThread().getName());
        if (Character.isDigit(c)){
            return isWholeNum ? new NumCounter(Integer.parseInt("" + c), sum, false) : new NumCounter(Integer.parseInt("" + num + c), sum, false);
        }else {
            return new NumCounter(0, sum + num, true);
        }
    }

    public NumCounter combine(NumCounter numCounter){
        return new NumCounter(0, this.getSum() + numCounter.getSum(), numCounter.isWholeNum);
    }

    public int getSum() {
        return sum + num;
    }
}

方法1:

NumCounterSpliterator

public class NumCounterSpliterator implements Spliterator&amp;lt;Character&amp;gt; {

    private String str;
    private int currentChar = 0;
    private boolean canSplit = true;

    public NumCounterSpliterator(int currentChar,String str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
    }


    public void forEachRemaining(Consumer&amp;lt;? super Character&amp;gt; action) {
        do {
        } while (tryAdvance(action));
    }

    @Override
    public boolean tryAdvance(Consumer&amp;lt;? super Character&amp;gt; action) {
        if(str.equals(&amp;amp;quot;&amp;amp;quot;)){
            return false;
        }
        action.accept(str.charAt(currentChar++));
        return currentChar &amp;amp;lt; str.length();
    }

    @Override
    public Spliterator&amp;lt;Character&amp;gt; trySplit() {
        int i = currentChar;
        for(;canSplit &amp;amp;amp;&amp;amp;amp; i &amp;amp;lt; str.length(); ++i){

//第一個不是數字的pos,進行分割
            if(!Character.isDigit(str.charAt(i))){
                String str1 = str;
                this.str = str1.substring(currentChar, i);
                canSplit = false;
                if(i + 1 &amp;amp;lt; str1.length()){
                    return new NumCounterSpliterator(0,str1.substring(i+1, str1.length()),true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return str.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
    }
}

public class NumCounterTest {
    public static void main(String[] args) {
        String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;
        Spliterator&lt;Character&gt; stream = IntStream.range(0, arr.length()).mapToObj(arr::charAt);
        System.out.println("ordered total: " + countNum(stream));

        Spliterator&lt;Character&gt; spliterator = new NumCounterSpliterator(0,arr,true);
        // 傳入true表示是並行流
        Stream&lt;Character&gt; parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println("parallel total: " + countNum(parallelStream));
    }

    private static int countNum(Stream&lt;Character&gt; stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

該方法使用的是string,string在不同子執行緒間傳遞時候,採用了substring方法,效率不高。
方法二,改為char陣列:

public class NumCounterSpliterator2 implements Spliterator&lt;Character&gt; {

    private char[] str;
    private int currentChar = 0;
    private int end = Integer.MAX_VALUE;
    private boolean canSplit = true;

    public NumCounterSpliterator2(int currentChar,int end,char[] str,boolean canSplit) {
        this.str = str;
        this.currentChar = currentChar;
        this.canSplit = canSplit;
        this.end = end;
    }

    @Override
    public boolean tryAdvance(Consumer&amp;amp;lt;? super Character&amp;amp;gt; action) {
        action.accept( str[currentChar++] );
        return currentChar &amp;amp;lt; end;
    }

    @Override
    public Spliterator<Character> trySplit() {
        int i = currentChar;
        for(;canSplit  ; i < end; ++i){
            if(!Character.isDigit(str[i])){
                int splitBeforeEnd = end;
                end = i ;
                canSplit = false;
                if(i + 1 &amp;amp;lt; splitBeforeEnd){
                    return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true);
                }else{
                    return null;
                }
            }
        }

        canSplit = false;
        return null;
    }

    @Override
    public long estimateSize() {
        return end - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE;
    }
}

public class NumCounterTest2 {
    public static void main(String[] args) {
        String arr = &amp;amp;quot;12%3 21sdas s34d dfsdz45   R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;amp;quot;;

        Spliterator&amp;amp;lt;Character&amp;amp;gt; spliterator = new NumCounterSpliterator2(0,arr.length(),arr.toCharArray(),true);
        // 傳入true表示是並行流
        Stream&amp;amp;lt;Character&amp;amp;gt; parallelStream = StreamSupport.stream(spliterator, true);
        System.out.println(&amp;amp;quot;parallel total: &amp;amp;quot; + countNum(parallelStream));
    }

    private static int countNum(Stream&amp;amp;lt;Character&amp;amp;gt; stream){
        NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine);
        return numCounter.getSum();
    }
}

感謝您看到這裡,但是不幸的是該程式碼並不是最合適的程式碼,會有一些問題。請移步到java8 stream 中Spliterator的使用(二) 更深入的討論。