JAVA8 stream 中Spliterator的使用(一)
java8 stream大家用的比較多,但是發現,其實stream的底層建構函式中,還需要傳入Spliterator。查了一下,竟然發現網上對這個類講的幾乎沒有。唯一一篇文章寫得Spliterator使用有問題的,其實他的並行流是沒有用到的。因為
for (int pos = currentSize/2 + currentSize; pos < str.length(); pos++){ .....
這段邏輯沒有執行 , pos < str.length() 為false
因此,這裡面的Spliterator返回的是null,返回null說明不用進行分割,因此原文中的程式碼也就是單執行緒的,並沒有用到多執行緒。
以此問中的例子,給出兩種寫法,兩種寫法,都可以使用到多執行緒。
寫stream的程式碼越複雜,對技術的要求其實是越高的。需要對遞迴,分治有一定的理解,不然無法有效的進行stream程式碼的debug。
來看下Spliterator介面方法,需要實現以下一些方法:
boolean tryAdvance(Consumer action);
該方法會處理每個元素,如果沒有元素處理,則應該返回false,否則返回true。default void forEachRemaining(Consumer action)
該方法有預設實現,功能後面會介紹。Spliterator trySplit();
long estimateSize();
該方法返回值並不會對程式碼正確性產生影響,但是會影響程式碼的執行執行緒數,後續會介紹一下int characteristics();
給出stream流具有的特性,不同的特性,不僅是會對流的計算有優化作用,更可能對計算結果會產生影響,後續會稍作介紹。default Comparator getComparator()
對sorted的流,給出比較器。後續給出研究程式碼。
1.先來看下forEachRemaining的實現。
default void forEachRemaining(Consumer&amp;lt;? super T&amp;gt; action) { do { } while (tryAdvance(action)); }
該方法迴圈遍歷呼叫tryAdvance方法,直到返回false。因為tryAdvance是必須實現的方法,因此重寫forEachRemaining
只有對優化程式碼有作用,無法做到不寫tryAdvance方法實現。
2.estimateSize的使用場景場景:
在estimateSize處打斷點,跟蹤執行緒棧資訊,可以看出estimateSize在這裡用到:
java.util.stream.AbstractTask&amp;lt;P_IN, P_OUT, R, K&amp;gt;的 public void compute() { Spliterator&amp;lt;P_IN&amp;gt; rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings(&amp;quot;unchecked&amp;quot;) K task = (K) this; while (sizeEstimate &lt; sizeThreshold &amp; (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) {
如果sizeEstimate < sizeThreshold, 則執行緒是不會再呼叫trySplit()方法,則就不會再細分子執行緒了。
可以將estimateSize返回結果固定為1,將只會用到主執行緒在跑任務,沒有子執行緒。
當Spliterator的trySplit返回null的時候,說明當前這段分割不能再進行分割了,就會呼叫到
forEachRemaining方法。
仿照oricle原始碼示例,即可寫出示例程式碼。該示例程式碼較好,原因是通過f/j的程式碼,大致闡述了
stream底層使用Spliterator的方式,是如何使用Spliterator中各個介面的。
這裡面需要說明的是,tryAdvance方法中的Consumer.accept呼叫,最終將呼叫到reduce 操作的ccumulate方法。
也就是說,我們看到Consumer.accept返回一個void,其實就是對變數T做一個操作。這個操作將直接影響到stream的內部狀態,但是不會有返回值。
附上分別兩種方式實現的原始碼:
公用類: public class NumCounter { private int num; private int sum; // 是否當前是個完整的數字 private boolean isWholeNum; public NumCounter(int num, int sum, boolean isWholeNum) { this.num = num; this.sum = sum; this.isWholeNum = isWholeNum; } public NumCounter accumulate(Character c){ System.out.println(Thread.currentThread().getName()); if (Character.isDigit(c)){ return isWholeNum ? new NumCounter(Integer.parseInt("" + c), sum, false) : new NumCounter(Integer.parseInt("" + num + c), sum, false); }else { return new NumCounter(0, sum + num, true); } } public NumCounter combine(NumCounter numCounter){ return new NumCounter(0, this.getSum() + numCounter.getSum(), numCounter.isWholeNum); } public int getSum() { return sum + num; } }
方法1:
NumCounterSpliterator public class NumCounterSpliterator implements Spliterator&lt;Character&gt; { private String str; private int currentChar = 0; private boolean canSplit = true; public NumCounterSpliterator(int currentChar,String str,boolean canSplit) { this.str = str; this.currentChar = currentChar; this.canSplit = canSplit; } public void forEachRemaining(Consumer&lt;? super Character&gt; action) { do { } while (tryAdvance(action)); } @Override public boolean tryAdvance(Consumer&lt;? super Character&gt; action) { if(str.equals(&amp;quot;&amp;quot;)){ return false; } action.accept(str.charAt(currentChar++)); return currentChar &amp;lt; str.length(); } @Override public Spliterator&lt;Character&gt; trySplit() { int i = currentChar; for(;canSplit &amp;amp;&amp;amp; i &amp;lt; str.length(); ++i){ //第一個不是數字的pos,進行分割 if(!Character.isDigit(str.charAt(i))){ String str1 = str; this.str = str1.substring(currentChar, i); canSplit = false; if(i + 1 &amp;lt; str1.length()){ return new NumCounterSpliterator(0,str1.substring(i+1, str1.length()),true); }else{ return null; } } } canSplit = false; return null; } @Override public long estimateSize() { return str.length() - currentChar; } @Override public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE; } } public class NumCounterTest { public static void main(String[] args) { String arr = &amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;quot;; Spliterator<Character> stream = IntStream.range(0, arr.length()).mapToObj(arr::charAt); System.out.println("ordered total: " + countNum(stream)); Spliterator<Character> spliterator = new NumCounterSpliterator(0,arr,true); // 傳入true表示是並行流 Stream<Character> parallelStream = StreamSupport.stream(spliterator, true); System.out.println("parallel total: " + countNum(parallelStream)); } private static int countNum(Stream<Character> stream){ NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine); return numCounter.getSum(); } }
該方法使用的是string,string在不同子執行緒間傳遞時候,採用了substring方法,效率不高。
方法二,改為char陣列:
public class NumCounterSpliterator2 implements Spliterator<Character> { private char[] str; private int currentChar = 0; private int end = Integer.MAX_VALUE; private boolean canSplit = true; public NumCounterSpliterator2(int currentChar,int end,char[] str,boolean canSplit) { this.str = str; this.currentChar = currentChar; this.canSplit = canSplit; this.end = end; } @Override public boolean tryAdvance(Consumer&amp;lt;? super Character&amp;gt; action) { action.accept( str[currentChar++] ); return currentChar &amp;lt; end; } @Override public Spliterator<Character> trySplit() { int i = currentChar; for(;canSplit ; i < end; ++i){ if(!Character.isDigit(str[i])){ int splitBeforeEnd = end; end = i ; canSplit = false; if(i + 1 &amp;lt; splitBeforeEnd){ return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true); }else{ return null; } } } canSplit = false; return null; } @Override public long estimateSize() { return end - currentChar; } @Override public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE; } } public class NumCounterTest2 { public static void main(String[] args) { String arr = &amp;quot;12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd&amp;quot;; Spliterator&amp;lt;Character&amp;gt; spliterator = new NumCounterSpliterator2(0,arr.length(),arr.toCharArray(),true); // 傳入true表示是並行流 Stream&amp;lt;Character&amp;gt; parallelStream = StreamSupport.stream(spliterator, true); System.out.println(&amp;quot;parallel total: &amp;quot; + countNum(parallelStream)); } private static int countNum(Stream&amp;lt;Character&amp;gt; stream){ NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine); return numCounter.getSum(); } }
感謝您看到這裡,但是不幸的是該程式碼並不是最合適的程式碼,會有一些問題。請移步到java8 stream 中Spliterator的使用(二) 更深入的討論。