JAVA8 stream 中Spliterator的使用(二)
JAVA8 stream 中Spliterator的使用(一)給出了Spliterator的兩種使用,但是遺憾的是,程式碼並不正確。這篇說明下原因,並對Spliterator進行更深入的分析。
- 首先來看下sorted方法,將程式碼呼叫countNum處註釋掉,改為如下方法:
parallelStream.sorted().forEach(System.out::print);
程式碼將報錯。
Exception in thread "main" java.lang.NullPointerException at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52) at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:1) at java.util.TimSort.binarySort(TimSort.java:296) at java.util.TimSort.sort(TimSort.java:239) at java.util.Arrays.parallelSort(Arrays.java:1113) at java.util.stream.SortedOps$OfRef.opEvaluateParallel(SortedOps.java:158) at java.util.stream.AbstractPipeline.opEvaluateParallelLazy(AbstractPipeline.java:704) at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at T2.main(T2.java:13)
跟進程式碼中看,在Comparators處打斷點進行除錯(預設jdk的source包是不能打斷點的,需要自己重新打包出含有除錯資訊的source包),在
java.util.stream.SortedOps.OfRef<T>類的T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
方法中,解析出的flattenedData陣列中有null的元素。跟進helper.evaluate方法,如果是並行流,在
java.util.stream.Nodes.collect(PipelineHelper<P_OUT>, Spliterator<P_IN>, boolean, IntFunction<P_OUT[]>)
方法中會首先初始化一個全null的陣列。後面的邏輯是將資料中的元素根據Spliterator分割後的各元素插入到這個數組裡面。
問題就出在(一)中的第二個Spliterator修改了原始的char[]陣列的內容,因為在trySplit方法中,將char[]陣列中不是數字的char給忽略了,trySplit分割後的兩個Spliterator都沒有處理非數字的char,這樣會導致上文中全null陣列中在非數字index的位置沒有被元素填充,導致在sorted比較的時候報出空指標錯誤。
這裡暴露的問題說明,在編寫Spliterator的時候,不能修改stream的元素內容,這和stream不可修改性也是一脈相承的。
2. 修改程式碼,改成在trySplit方法中將非數字的char劃歸到分割後的第一個Spliterator中。
執行程式碼,報如下錯誤
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1 at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:364) at NumCounterSpliterator2.tryAdvance(NumCounterSpliterator2.java:24) at java.util.Spliterator.forEachRemaining(Spliterator.java:326) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:1) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at NumCounterTest2.main(NumCounterTest2.java:32)
跟進原始碼
java.util.stream.Nodes.SizedCollectorTask.OfRef.accept(P_OUT)中 public void accept(P_OUT value) { if (index >= fence) { throw new IndexOutOfBoundsException(Integer.toString(index)); } array[index++] = value; }
發現stream的sorted實現時,會根據estimateSize返回的值賦值給fence,如果進行排序比較的元素的index值超過estimateSize返回值,就會丟擲異常。因此,在sorted使用過程中,estimateSize方法並不是一個可以隨意返回值的。
修改方式有兩種,另一種修改方式在後面將stream的characteristics引數時介紹。
1.將estimateSize方法改成準確的計算方式即可:
@Override
public long estimateSize() {
return end – currentChar + 1;
}
編碼過程中還發現一個小問題,用parallel並行stream的時候,遍歷元素是需要採用forEachOrdered而不是forEach方法,具體可以參見【1】
3. stream的characteristics引數介紹
基本引數使用,大家可以參見原始碼註釋,這裡介紹下一些注意的地方:
3.1 java.util.Spliterator.DISTINCT 屬性 表明該stream已經是distinct的了,因此,如果Spliterator含有此屬性,則在stream.distinct()呼叫的時候,是直接輸出該stream的,也就是distinct方法不進行通常意義上的唯一性過濾。
舉例:
將文末示例程式碼中的characteristics方法返回值,加入DISTINCT屬性。即:
public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE |DISTINCT; }
執行如下程式碼:
String arr = "123123"; System.out.println(arr); Spliterator<Character> spliterator = new NumCounterSpliterator2(0,arr.length()-1,arr.toCharArray(),true); // 傳入true表示是並行流 Stream<Character> parallelStream = StreamSupport.stream(spliterator, true); parallelStream.distinct().forEach(System.out::print);
結果輸出:
123123
123123
可見並未做唯一性處理。如果去掉distinct屬性,則輸出結果:
123123
132
3.2 SORTED屬性
3.2.1. SORTED屬性和getComparator一起使用。如果該流已經按照預設字典序(natural order)排序好了,則返回null。
如果將Spliterator的getComparator返回null,並且設定SORTED屬性,則sorted()方法直接返回原stream流,不會做任何排序,原因和distinct相同,因為此流已經排好序了。
如果stream是非並行流,則返回直接和原stream流相同。如果是並行流,注意,因為並行流是會被trysplit處理的,每個分割後的Spliterator是sorted的。因為流屬性已經是sorted並且返回的getComparator是null,已經是排好序的了,因此每個子執行緒分割後的Spliterator直接輸出即可。
但是這裡注意,stream本質上底層是f/j程式碼,而f/j分割時候,是基於trySplit進行分割的。查看了java.util.stream.Streams.RangeIntSpliterator原始碼後發現,trySplit的分割是需要從[begin,end]返回一個以begin 為開始的Spliterator,例如分割為[begin,end1],將當前Spliterator的begin修改為end1+1,即分割為[end1+1,end].
原因應該是f/j的子執行緒fork和join有關,因為我們直到fork和join應該是相反序來寫的。例如:
f1.fork(); f2.fork(); f2.join(); f1.join();
因此,從f/j的多執行緒棧來說,f2 在 f1的上面,f2.join會導致f2先執行。return的[begin,end1]保證了先執行,
而f1的[end1+1,end] 任務後執行,這樣才是以encounter order順序執行的併發。
因此,程式碼中trysplit應該這麼寫:
public Spliterator<Character> trySplit() { int i = currentChar; int currentCharOld = currentChar; for(;canSplit && i <= end; ++i){ if(!Character.isDigit(str[i])){ int splitBeforeEnd = end; canSplit = false; if(i + 1 <= splitBeforeEnd){ currentChar = i + 1; return new NumCounterSpliterator3(currentCharOld,i,str,true); }else{ return null; } } } canSplit = false; return null; }
而不能寫成
@Override public Spliterator<Character> trySplit() { int i = currentChar; for(;canSplit && i <= end; ++i){ if(!Character.isDigit(str[i])){ int splitBeforeEnd = end; end = i ; canSplit = false; if(i + 1 <= splitBeforeEnd){ return new NumCounterSpliterator2(i+1,splitBeforeEnd,str,true); }else{ return null; } } } canSplit = false; return null; }
。當然如上文說,如果不是並行流,不涉及trysplit方法,則getComparator返回null,直接就返回的原始流
3.2.2 3.2.1講了如果返回的是null的情況,那麼如果返回的不是null呢?很不幸,那設定了sorted和沒設定沒有任何區別,
即使你用hasCharacteristics(Spliterator.SORTED)方法,的確返回true。
為什麼?
看下原始碼:
java.util.stream.StreamOpFlag.fromCharacteristics(Spliterator) if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) { // Do not propagate the SORTED characteristic if it does not correspond // to a natural sort order return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED; } else { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; }
如果具有SORTED屬性,同時getComparator()返回的不為null,則& ~Spliterator.SORTED會將sorted屬性抹去,
則此stream不具有sorted屬性。不具有sorted屬性,則stream的sorted方法,就直接按照字典序排序了。
3.2.3 sorted()方法還有一個可以傳入Comparator的重寫方法,如果使用了傳入Comparator的sorted方法,則以這個
Comparator進行排序,和原stream是否具有sorted屬性無關。
原始碼如下:
java.util.stream.SortedOps.OfRef OfRef(AbstractPipeline<?, T, ?> upstream) { super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); this.isNaturalSort = true; // Will throw CCE when we try to sort if T is not Comparable @SuppressWarnings("unchecked") Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); this.comparator = comp; } /** * Sort using the provided comparator. * * @param comparator The comparator to be used to evaluate ordering. */ OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); this.isNaturalSort = false; this.comparator = Objects.requireNonNull(comparator); }
3.2.3 SIZED | SUBSIZED屬性
SIZED | SUBSIZED可以和estimateSize返回Long.MAX_VALUE一起配合使用。
如果stream沒有SIZED | SUBSIZED屬性,則可以將estimateSize返回為Long.MAX_VALUE.這說明這個stream
的estimateSize計算很複雜或本身就是一個infinite的steam流。這樣設定後,效能上會差一些,但是,不會對sorted
方法產生影響。2中提到的錯誤,也可也用這種方法處理。
最後附上全部程式碼:
NumCounterSpliterator3 import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; public class NumCounterSpliterator3 implements Spliterator<Character> { private char[] str; private int currentChar = 0; private int end = Integer.MAX_VALUE; private boolean canSplit = true; public NumCounterSpliterator3(int currentChar,int end,char[] str,boolean canSplit) { this.str = str; this.currentChar = currentChar; this.canSplit = canSplit; this.end = end; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept( str[currentChar++] ); return currentChar <= end; } @Override public Spliterator<Character> trySplit() { int i = currentChar; int currentCharOld = currentChar; for(;canSplit && i <= end; ++i){ if(!Character.isDigit(str[i])){ int splitBeforeEnd = end; canSplit = false; if(i + 1 <= splitBeforeEnd){ currentChar = i + 1; return new NumCounterSpliterator3(currentCharOld,i,str,true); }else{ return null; } } } canSplit = false; return null; } @Override public long estimateSize() { return end - currentChar + 1 /*Long.MAX_VALUE*/ ; } public Comparator<? super Character> getComparator() { return null; } @Override public int characteristics() { return ORDERED | SIZED | SUBSIZED | NONNULL | IMMUTABLE /*|SORTED*/; } } NumCounterTest2 import java.util.Spliterator; import java.util.stream.Stream; import java.util.stream.StreamSupport; public class NumCounterTest2 { public static void main(String[] args) { String arr = "12%3 21sdas s34d dfsdz45 R3 jo34 sjkf8 3$1P 213ikflsd fdg55 kfd"; System.out.println(arr); Spliterator<Character> spliterator = new NumCounterSpliterator3(0,arr.length()-1,arr.toCharArray(),true); // 傳入true表示是並行流 Stream<Character> parallelStream = StreamSupport.stream(spliterator, true); System.out.println("parallel total: " + countNum(parallelStream)); } private static int countNum(Stream<Character> stream){ NumCounter numCounter = stream.reduce(new NumCounter(0, 0, false), NumCounter::accumulate, NumCounter::combine); return numCounter.getSum(); } }
【1】 java 8 parallelStream() with sorted()