Java 8 的Stream流那麼強大,你知道它的原理嗎?
阿新 • • 發佈:2021-12-09
點選“終碼一生”,關注,置頂公眾號
每日技術乾貨,第一時間送達!
Java 8 API添加了一個新的抽象稱為流Stream,可以讓你以一種宣告的方式處理資料。 Stream 使用一種類似用 SQL 語句從資料庫查詢資料的直觀方式來提供一種對 Java 集合運算和表達的高階抽象。 Stream API可以極大提高Java程式設計師的生產力,讓程式設計師寫出高效率、乾淨、簡潔的程式碼。 本文會對Stream的實現原理進行剖析。1、Stream的組成與特點
Stream(流)是一個來自資料來源的元素佇列並支援聚合操作:-
元素是特定型別的物件,形成一個佇列。Java中的Stream並_不會_向集合那樣儲存和管理元素,而是按需計算
- 資料來源流的來源可以是集合Collection、陣列Array、I/O channel, 產生器generator 等
- 聚合操作類似SQL語句一樣的操作, 比如filter, map, reduce, find, match, sorted等
- Pipelining: 中間操作都會返回流物件本身。這樣多個操作可以串聯成一個管道, 如同流式風格(fluent style)。這樣做可以對操作進行優化, 比如延遲執行(laziness evaluation)和短路( short-circuiting)
-
內部迭代:以前對集合遍歷都是通過Iterator或者For-Each的方式, 顯式的在集合外部進行迭代, 這叫做外部迭代。Stream提供了內部迭代的方式, 通過訪問者模式 (Visitor)實現。
“ 1.0-1.4 中的 java.lang.Thread 5.0 中的 java.util.concurrent 6.0 中的 Phasers 等 7.0 中的 Fork/Join 框架 8.0 中的 Lambda ”Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作: List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream() .forEach(out::println); 可以看到一行簡單的程式碼就幫我們實現了並行輸出集合中元素的功能,但是由於並行執行的順序是不可控的所以每次執行的結果不一定相同。 如果非得相同可以使用forEachOrdered方法執行終止操作: List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);numbers.parallelStream() .forEachOrdered(out::println); 這裡有一個疑問,如果結果需要有序,是否和我們的並行執行的初衷相悖?是的,這個場景下明顯無需使用並行流,直接用序列流執行即可, 否則效能可能更差,因為最後又強行將所有並行結果進行了排序。 OK,下面我們先介紹一下Stream介面的相關知識。
2、BaseStream介面
Stream的父介面是BaseStream,後者是所有流實現的頂層介面,定義如下: public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable { Iterator<T> iterator(); Spliterator<T> spliterator(); boolean isParallel(); S sequential(); S parallel(); S unordered(); S onClose(Runnable closeHandler); void close();} 其中,T為流中元素的型別,S為一個BaseStream的實現類,它裡面的元素也是T並且S同樣是自己: S extends BaseStream<T, S> 是不是有點暈? 其實很好理解,我們看一下介面中對S的使用就知道了:如sequential()、parallel()這兩個方法,它們都返回了S例項,也就是說它們分別支援對當前流進行序列或者並行的操作,並返回「改變」後的流物件。“ 如果是並行一定涉及到對當前流的拆分,即將一個流拆分成多個子流,子流肯定和父流的型別是一致的。子流可以繼續拆分子流,一直拆分下去… ”也就是說這裡的S是BaseStream的一個實現類,它同樣是一個流,比如Stream、IntStream、LongStream等。
3、Stream介面
再來看一下Stream的介面宣告: public interface Stream<T> extends BaseStream<T, Stream<T>> 參考上面的解釋這裡不難理解:即Stream<T>可以繼續拆分為Stream<T>,我們可以通過它的一些方法來證實: Stream<T> filter(Predicate<? super T> predicate);<R> Stream<R> map(Function<? super T, ? extends R> mapper);<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);Stream<T> sorted();Stream<T> peek(Consumer<? super T> action);Stream<T> limit(long maxSize);Stream<T> skip(long n);... 這些都是操作流的中間操作,它們的返回結果必須是流物件本身。4、關閉流操作
BaseStream 實現了 AutoCloseable 介面,也就是 close() 方法會在流關閉時被呼叫。同時,BaseStream 中還給我們提供了onClose()方法: S onClose(Runnable closeHandler); 當AutoCloseable的close()介面被呼叫的時候會觸發呼叫流物件的onClose()方法,但有幾點需要注意:- onClose() 方法會返回流物件本身,也就是說可以對改物件進行多次呼叫
- 如果呼叫了多個onClose() 方法,它會按照呼叫的順序觸發,但是如果某個方法有異常則只會向上丟擲第一個異常
- 前一個 onClose() 方法丟擲了異常不會影響後續 onClose() 方法的使用
- 如果多個 onClose() 方法都丟擲異常,只展示第一個異常的堆疊,而其他異常會被壓縮,只展示部分資訊
5、並行流和序列流
BaseStream介面中分別提供了並行流和序列流兩個方法,這兩個方法可以任意呼叫若干次,也可以混合呼叫,但最終只會以最後一次方法呼叫的返回結果為準。 參考parallel()方法的說明:“ Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel. ”所以多次呼叫同樣的方法並不會生成新的流,而是直接複用當前的流物件。 下面的例子裡以最後一次呼叫parallel()為準,最終是並行地計算sum: stream.parallel() .filter(...) .sequential() .map(...) .parallel() .sum();
6、ParallelStream背後的男人:ForkJoinPool
ForkJoin框架是從JDK7中新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService 介面。它使用了一個「無限佇列」來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入, 如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。 ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm) 來解決問題,典型的應用比如_快速排序演算法_。這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。 比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500 萬的排序任務和一個針對這兩組500萬資料的合併任務。 以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概2000000+個。“ 問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行,想象一下歸併排序的過程。 ”所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法向 任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。 那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼效能的差異呢? 首先,使用ForkJoinPool能夠使用數量有限的執行緒來完成非常多的具有「父子關係」的任務,比如使用4個執行緒來完成超過200萬個任務。使用ThreadPoolExecutor 時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個執行緒,顯然這是不可行的。 Work Stealing原理:
- 每個工作執行緒都有自己的工作佇列WorkQueue;
- 這是一個雙端佇列dequeue,它是執行緒私有的;
- ForkJoinTask中fork的子任務,將放入執行該任務的工作執行緒的隊頭,工作執行緒將以LIFO的順序來處理工作佇列中的任務,即堆疊的方式;
- 為了最大化地利用CPU,空閒的執行緒將從其它執行緒的佇列中「竊取」任務來執行
- 但是是從工作佇列的尾部竊取任務,以減少和佇列所屬執行緒之間的競爭;
- 雙端佇列的操作:push()/pop()僅在其所有者工作執行緒中呼叫,poll()是由其它執行緒竊取任務時呼叫的;
- 當只剩下最後一個任務時,還是會存在競爭,是通過CAS來實現的;
7、用ForkJoinPool的眼光來看ParallelStream
Java 8為ForkJoinPool添加了一個通用執行緒池,這個執行緒池用來處理那些沒有被顯式提交到任何執行緒池的任務。它是ForkJoinPool型別上的一個靜態元素,它擁有的預設執行緒數量等於執行計算機上的CPU數量。 當呼叫Arrays 類上新增的新方法時,自動並行化就會發生。 比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新新增的Stream API中。 比如下面的程式碼用來遍歷列表中的元素並執行需要的操作: List<UserInfo> userInfoList = DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo); 對於列表中的元素的操作都會以並行的方式執行。forEach方法會為每個元素的計算操作建立一個任務,該任務會被前文中提到的ForkJoinPool中的commonPool處理。 以上的平行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就程式碼的可讀性和程式碼量而言,使用ForkJoinPool明顯更勝一籌。 對於ForkJoinPool通用執行緒池的執行緒數量,通常使用預設值就可以了,即執行時計算機的處理器數量。也可以通過設定系統屬性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N為執行緒數量),來調整ForkJoinPool的執行緒數量。 值得注意的是,當前執行的執行緒也會被用來執行任務,所以最終的執行緒個數為N+1,1就是當前的主執行緒。 這裡就有一個問題,如果你在並行流的執行計算使用了_阻塞操作_,如I/O,那麼很可能會導致一些問題: public static String query(String question) { List<String> engines = new ArrayList<String>(); engines.add("http://www.google.com/?q="); engines.add("http://duckduckgo.com/?q="); engines.add("http://www.bing.com/search?q="); // get element as soon as it is available Optional<String> result = engines.stream().parallel().map((base) - { String url = base + question; // open connection and fetch the result return WS.url(url).get(); }).findAny(); return result.get();} 這個例子很典型,讓我們來分析一下:- 這個並行流計算操作將由主執行緒和JVM預設的ForkJoinPool.commonPool()來共同執行。
- map中是一個阻塞方法,需要通過訪問HTTP介面並得到它的response,所以任何一個worker執行緒在執行到這裡的時候都會阻塞並等待結果。
- 所以當此時再其他地方通過並行流方式呼叫計算方法的時候,將會受到此處阻塞等待的方法的影響。
- 目前的ForkJoinPool的實現並未考慮補償等待那些阻塞在等待新生成的執行緒的工作worker執行緒,所以最終ForkJoinPool.commonPool()中的執行緒將備用光並且阻塞等待。
“ 正如我們上面那個列子的情況分析得知,lambda的執行並不是瞬間完成的,所有使用parallel streams的程式都有可能成為阻塞程式的源頭, 並且在執行過程中程式中的其他部分將無法訪問這些workers,這意味著任何依賴parallel streams的程式在什麼別的東西佔用著common ForkJoinPool時將會變得不可預知並且暗藏危機。 ”小結:
- 當需要處理遞迴分治演算法時,考慮使用ForkJoinPool。
- 仔細設定不再進行任務劃分的閾值,這個閾值對效能有影響。
- Java 8中的一些特性會使用到ForkJoinPool中的通用執行緒池。在某些場合下,需要調整該執行緒池的預設的執行緒數量
- lambda應該儘量避免副作用,也就是說,避免突變基於堆的狀態以及任何IO
- lambda應該互不干擾,也就是說避免修改資料來源(因為這可能帶來執行緒安全的問題)
- 避免訪問在流操作生命週期內可能會改變的狀態
8、並行流的效能
並行流框架的效能受以下因素影響:- 資料大小:資料夠大,每個管道處理時間夠長,並行才有意義;
- 源資料結構:每個管道操作都是基於初始資料來源,通常是集合,將不同的集合資料來源分割會有一定消耗;
- 裝箱:處理基本型別比裝箱型別要快;
- 核的數量:預設情況下,核數量越多,底層fork/join執行緒池啟動執行緒就越多;
- 單元處理開銷:花在流中每個元素身上的時間越長,並行操作帶來的效能提升越明顯;
- 效能好:ArrayList、陣列或IntStream.range(資料支援隨機讀取,能輕易地被任意分割)
- 效能一般:HashSet、TreeSet(資料不易公平地分解,大部分也是可以的)
- 效能差:LinkedList(需要遍歷連結串列,難以對半分解)、Stream.iterate和BufferedReader.lines(長度未知,難以分解)