stream api 使用及其常用技巧彙總
什麼是流?
先來看看Pig upms 中的使用
流是Java8引入的全新概念,它用來處理集合中的資料,暫且可以把它理解為一種高階集合。
眾所周知,集合操作非常麻煩,若要對集合進行篩選、投影,需要寫大量的程式碼,而流是以宣告的形式操作集合,它就像SQL語句,我們只需告訴流需要對集合進行什麼操作,它就會自動進行操作,並將執行結果交給你,無需我們自己手寫程式碼。
因此,流的集合操作對我們來說是透明的,我們只需向流下達命令,它就會自動把我們想要的結果給我們。由於操作過程完全由Java處理,因此它可以根據當前硬體環境選擇最優的方法處理,我們也無需編寫複雜又容易出錯的多執行緒程式碼了。
流的特點
- 只能遍歷一次
一旦元素走到了流水線的另一頭,那麼這些元素就被“消費掉了”,我們無法再對這個流進行操作。當然,我們可以從資料來源那裡再獲得一個新的流重新遍歷一遍。 - 採用內部迭代方式
若要對集合進行處理,則需我們手寫處理程式碼,這就叫做外部迭代。
而要對流進行處理,我們只需告訴流我們需要什麼結果,處理過程由流自行完成,這就稱為內部迭代。
流的操作種類
流的操作分為兩種,分別為中間操作和終端操作。
- 中間操作
當資料來源中的資料上了流水線後,這個過程對資料進行的所有操作都稱為“中間操作”。 - 終端操作
當所有的中間操作完成後,若要將資料從流水線上拿下來,則需要執行終端操作。
終端操作將返回一個執行結果,這就是你想要的資料。
流的操作過程
使用流一共需要三步:
- 準備一個數據源
- 執行中間操作
中間操作可以有多個,它們可以串連起來形成流水線。 - 執行終端操作
執行終端操作後本次流結束,你將獲得一個執行結果。
使用流
建立流
在使用流之前,首先需要擁有一個數據源,並通過StreamAPI提供的一些方法獲取該資料來源的流物件。資料來源可以有多種形式:
1. 集合
這種資料來源較為常用,通過stream()方法即可獲取流物件:
List<Person> list = new ArrayList<Person>(); Stream<Person> stream = list.stream();
2. 陣列
通過Arrays類提供的靜態函式stream()獲取陣列的流物件:
String[] names = {"chaimm","peter","john"};
Stream<String> stream = Arrays.stream(names);
3. 值
直接將幾個值變成流物件:
Stream<String> stream = Stream.of("chaimm","john");
4. 檔案
try(Stream lines = Files.lines(Paths.get(“檔案路徑名”),Charset.defaultCharset())){ //可對lines做一些操作 }catch(IOException e){ }
5. iterator
建立無限流
Stream.iterate(0,n -> n + 2) .limit(10) .forEach(System.out::println);
PS:Java7簡化了IO操作,把開啟IO操作放在try後的括號中即可省略關閉IO的程式碼。
篩選 filter
filter 函式接收一個Lambda表示式作為引數,該表示式返回boolean,在執行過程中,流將元素逐一輸送給filter,並篩選出執行結果為true的元素。
如,篩選出所有學生:
List<Person> result = list.stream()
.filter(Person::isStudent)
.collect(toList());
去重distinct
去掉重複的結果:
List<Person> result = list.stream()
.distinct()
.collect(toList());
擷取
擷取流的前N個元素:
List<Person> result = list.stream() .limit(3) .collect(toList());
跳過
跳過流的前n個元素:
List<Person> result = list.stream() .skip(3) .collect(toList());
對映
對流中的每個元素執行一個函式,使得元素轉換成另一種型別輸出。流會將每一個元素輸送給map函式,並執行map中的Lambda表示式,最後將執行結果存入一個新的流中。
如,獲取每個人的姓名(實則是將Perosn型別轉換成String型別):
List<Person> result = list.stream()
.map(Person::getName)
.collect(toList());
合併多個流
例:列出List中各不相同的單詞,List集合如下:
List<String> list = new ArrayList<String>(); list.add("I am a boy"); list.add("I love the girl"); list.add("But the girl loves another girl");
思路如下:
首先將list變成流:
list.stream();
按空格分詞:
list.stream()
.map(line->line.split(" "));
分完詞之後,每個元素變成了一個String[]陣列。
將每個String[]
變成流:
list.stream() .map(line->line.split(" ")) .map(Arrays::stream)
此時一個大流裡面包含了一個個小流,我們需要將這些小流合併成一個流。
將小流合併成一個大流:用flatMap
替換剛才的 map
list.stream() .map(line->line.split(" ")) .flatMap(Arrays::stream)
去重
list.stream() .map(line->line.split(" ")) .flatMap(Arrays::stream) .distinct() .collect(toList());
是否匹配任一元素:anyMatch
anyMatch用於判斷流中是否存在至少一個元素滿足指定的條件,這個判斷條件通過Lambda表示式傳遞給anyMatch,執行結果為boolean型別。
如,判斷list中是否有學生:
boolean result = list.stream() .anyMatch(Person::isStudent);
是否匹配所有元素:allMatch
allMatch用於判斷流中的所有元素是否都滿足指定條件,這個判斷條件通過Lambda表示式傳遞給anyMatch,執行結果為boolean型別。
如,判斷是否所有人都是學生:
boolean result = list.stream() .allMatch(Person::isStudent);
是否未匹配所有元素:noneMatch
noneMatch與allMatch恰恰相反,它用於判斷流中的所有元素是否都不滿足指定條件:
boolean result = list.stream() .noneMatch(Person::isStudent);
獲取任一元素findAny
findAny能夠從流中隨便選一個元素出來,它返回一個Optional型別的元素。
Optional<Person> person = list.stream().findAny();
獲取第一個元素findFirst
Optional<Person> person = list.stream().findFirst();
歸約
歸約是將集合中的所有元素經過指定運算,摺疊成一個元素輸出,如:求最值、平均數等,這些操作都是將一個集合的元素摺疊成一個元素輸出。
在流中,reduce函式能實現歸約。
reduce函式接收兩個引數:
- 初始值
- 進行歸約操作的Lambda表示式
元素求和:自定義Lambda表示式實現求和
例:計算所有人的年齡總和
int age = list.stream().reduce(0,(person1,person2)->person1.getAge()+person2.getAge());
- reduce的第一個引數表示初試值為0;
- reduce的第二個引數為需要進行的歸約操作,它接收一個擁有兩個引數的Lambda表示式,reduce會把流中的元素兩兩輸給Lambda表示式,最後將計算出累加之和。
元素求和:使用Integer.sum函式求和
上面的方法中我們自己定義了Lambda表示式實現求和運算,如果當前流的元素為數值型別,那麼可以使用Integer提供了sum函式代替自定義的Lambda表示式,如:
int age = list.stream().reduce(0,Integer::sum);
Integer類還提供了min
、max
等一系列數值操作,當流中元素為數值型別時可以直接使用。
數值流的使用
採用reduce進行數值操作會涉及到基本數值型別和引用數值型別之間的裝箱、拆箱操作,因此效率較低。
當流操作為純數值操作時,使用數值流能獲得較高的效率。
將普通流轉換成數值流
StreamAPI提供了三種數值流:IntStream、DoubleStream、LongStream,也提供了將普通流轉換成數值流的三種方法:mapToInt、mapToDouble、mapToLong。
如,將Person中的age轉換成數值流:
IntStream stream = list.stream().mapToInt(Person::getAge);
數值計算
每種數值流都提供了數值計算函式,如max、min、sum等。如,找出最大的年齡:
OptionalInt maxAge = list.stream()
.mapToInt(Person::getAge)
.max();
由於數值流可能為空,並且給空的數值流計算最大值是沒有意義的,因此max函式返回OptionalInt,它是Optional的一個子類,能夠判斷流是否為空,並對流為空的情況作相應的處理。
此外,mapToInt、mapToDouble、mapToLong進行數值操作後的返回結果分別為:OptionalInt、OptionalDouble、OptionalLong
中間操作和收集操作
操作 | 型別 | 返回型別 | 使用的型別/函式式介面 | 函式描述符 |
---|---|---|---|---|
filter |
中間 | Stream<T> |
Predicate<T> |
T -> boolean |
distinct |
中間 | Stream<T> |
||
skip |
中間 | Stream<T> |
long | |
map |
中間 | Stream<R> |
Function<T,R> |
T -> R |
flatMap |
中間 | Stream<R> |
Function<T,Stream<R>> |
T -> Stream<R> |
limit |
中間 | Stream<T> |
long | |
sorted |
中間 | Stream<T> |
Comparator<T> |
(T,T) -> int |
anyMatch |
終端 | boolean |
Predicate<T> |
T -> boolean |
noneMatch |
終端 | boolean |
Predicate<T> |
T -> boolean |
allMatch |
終端 | boolean |
Predicate<T> |
T -> boolean |
findAny |
終端 | Optional<T> |
||
findFirst |
終端 | Optional<T> |
||
forEach |
終端 | void |
Consumer<T> |
T -> void |
collect |
終端 | R |
Collector<T,A,R> |
|
reduce |
終端 | Optional<T> |
BinaryOperator<T> |
(T,T) -> T |
count |
終端 | long |
Collector 收集
收集器用來將經過篩選、對映的流進行最後的整理,可以使得最後的結果以不同的形式展現。collect
方法即為收集器,它接收Collector
介面的實現作為具體收集器的收集方法。Collector
介面提供了很多預設實現的方法,我們可以直接使用它們格式化流的結果;也可以自定義Collector
介面的實現,從而定製自己的收集器。
歸約
流由一個個元素組成,歸約就是將一個個元素“摺疊”成一個值,如求和、求最值、求平均值都是歸約操作。
一般性歸約
若你需要自定義一個歸約操作,那麼需要使用Collectors.reducing
函式,該函式接收三個引數:
- 第一個引數為歸約的初始值
- 第二個引數為歸約操作進行的欄位
- 第三個引數為歸約操作的過程
彙總
Collectors類專門為彙總提供了一個工廠方法:Collectors.summingInt
。
它可接受一 個把物件對映為求和所需int的函式,並返回一個收集器;該收集器在傳遞給普通的collect
方法後即執行我們需要的彙總操作。
分組
資料分組是一種更自然的分割資料操作,分組就是將流中的元素按照指定類別進行劃分,類似於SQL語句中的GROUPBY
。
多級分組
多級分組可以支援在完成一次分組後,分別對每個小組再進行分組。
使用具有兩個引數的groupingBy
過載方法即可實現多級分組。
- 第一個引數:一級分組的條件
- 第二個引數:一個新的
groupingBy
函式,該函式包含二級分組的條件
Collectors 類的靜態工廠方法
工廠方法 | 返回型別 | 用途 | 示例 |
---|---|---|---|
toList |
List<T> |
把流中所有專案收集到一個 List | List<Project> projects = projectStream.collect(toList()); |
toSet |
Set<T> |
把流中所有專案收集到一個 Set,刪除重複項 | Set<Project> projects = projectStream.collect(toSet()); |
toCollection |
Collection<T> |
把流中所有專案收集到給定的供應源建立的集合 | Collection<Project> projects = projectStream.collect(toCollection(),ArrayList::new); |
counting |
Long |
計算流中元素的個數 | long howManyProjects = projectStream.collect(counting()); |
summingInt |
Integer |
對流中專案的一個整數屬性求和 | int totalStars = projectStream.collect(summingInt(Project::getStars)); |
averagingInt |
Double |
計算流中專案 Integer 屬性的平均值 | double avgStars = projectStream.collect(averagingInt(Project::getStars)); |
summarizingInt |
IntSummaryStatistics |
收集關於流中專案 Integer 屬性的統計值,例如最大、最小、 總和與平均值 | IntSummaryStatistics projectStatistics = projectStream.collect(summarizingInt(Project::getStars)); |
joining |
String |
連線對流中每個專案呼叫 toString 方法所生成的字串 | String shortProject = projectStream.map(Project::getName).collect(joining(",")); |
maxBy |
Optional<T> |
按照給定比較器選出的最大元素的 Optional, 或如果流為空則為 Optional.empty() | Optional<Project> fattest = projectStream.collect(maxBy(comparingInt(Project::getStars))); |
minBy |
Optional<T> |
按照給定比較器選出的最小元素的 Optional, 或如果流為空則為 Optional.empty() | Optional<Project> fattest = projectStream.collect(minBy(comparingInt(Project::getStars))); |
reducing |
歸約操作產生的型別 | 從一個作為累加器的初始值開始,利用 BinaryOperator 與流中的元素逐個結合,從而將流歸約為單個值 | int totalStars = projectStream.collect(reducing(0,Project::getStars,Integer::sum)); |
collectingAndThen |
轉換函式返回的型別 | 包含另一個收集器,對其結果應用轉換函式 | int howManyProjects = projectStream.collect(collectingAndThen(toList(),List::size)); |
groupingBy |
Map<K,List<T>> |
根據專案的一個屬性的值對流中的專案作問組,並將屬性值作 為結果 Map 的鍵 | Map<String,List<Project>> projectByLanguage = projectStream.collect(groupingBy(Project::getLanguage)); |
partitioningBy |
Map<Boolean,List<T>> |
根據對流中每個專案應用斷言的結果來對專案進行分割槽 | Map<Boolean,List<Project>> vegetarianDishes = projectStream.collect(partitioningBy(Project::isVegetarian)); |
轉換型別
有一些收集器可以生成其他集合。比如前面已經見過的toList
,生成了java.util.List
類的例項。
還有toSet
和toCollection
,分別生成Set
和Collection
類的例項。
到目前為止, 我已經講了很多流上的鏈式操作,但總有一些時候,需要最終生成一個集合——比如:
- 已有程式碼是為集合編寫的,因此需要將流轉換成集合傳入;
- 在集合上進行一系列鏈式操作後,最終希望生成一個值;
- 寫單元測試時,需要對某個具體的集合做斷言。
使用toCollection
,用定製的集合收集元素
stream.collect(toCollection(TreeSet::new));
還可以利用收集器讓流生成一個值。maxBy
和minBy
允許使用者按某種特定的順序生成一個值。
資料分割槽
分割槽是分組的特殊情況:由一個斷言(返回一個布林值的函式)作為分類函式,它稱分割槽函式。
分割槽函式返回一個布林值,這意味著得到的分組Map
的鍵型別是Boolean
,於是它最多可以分為兩組: true是一組,false是一組。
分割槽的好處在於保留了分割槽函式返回true或false的兩套流元素列表。
並行流
並行流就是一個把內容分成多個數據塊,並用不不同的執行緒分別處理每個資料塊的流。最後合併每個資料塊的計算結果。
將一個順序執行的流轉變成一個併發的流只要呼叫parallel()
方法
public static long parallelSum(long n){ return Stream.iterate(1L,i -> i +1).limit(n).parallel().reduce(0L,Long::sum); }
將一個併發流轉成順序的流只要呼叫sequential()
方法
stream.parallel().filter(...).sequential().map(...).parallel().reduce();
這兩個方法可以多次呼叫,只有最後一個呼叫決定這個流是順序的還是併發的。
併發流使用的預設執行緒數等於你機器的處理器核心數。
通過這個方法可以修改這個值,這是全域性屬性。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
並非使用多執行緒並行流處理資料的效能一定高於單執行緒順序流的效能,因為效能受到多種因素的影響。
如何高效使用併發流的一些建議:
- 如果不確定, 就自己測試。
- 儘量使用基本型別的流 IntStream,LongStream,DoubleStream
- 有些操作使用併發流的效能會比順序流的效能更差,比如limit,findFirst,依賴元素順序的操作在併發流中是極其消耗效能的。findAny的效能就會好很多,應為不依賴順序。
- 考慮流中計算的效能(Q)和操作的效能(N)的對比,Q表示單個處理所需的時間,N表示需要處理的數量,如果Q的值越大,使用併發流的效能就會越高。
- 資料量不大時使用併發流,效能得不到提升。
- 考慮資料結構:併發流需要對資料進行分解,不同的資料結構被分解的效能時不一樣的。
流的資料來源和可分解性
源 | 可分解性 |
---|---|
ArrayList |
非常好 |
LinkedList |
差 |
IntStream.range |
非常好 |
Stream.iterate |
差 |
HashSet |
好 |
TreeSet |
好 |
流的特性以及中間操作對流的修改都會對資料對分解效能造成影響。 比如固定大小的流在任務分解的時候就可以平均分配,但是如果有filter操作,那麼流就不能預先知道在這個操作後還會剩餘多少元素。
考慮終端操作的效能:如果終端操作在合併併發流的計算結果時的效能消耗太大,那麼使用併發流提升的效能就會得不償失。