Java8系列 (二) Stream流
概述
Stream流是Java8新引入的一個特性, 它允許你以宣告性方式處理資料集合, 而不是像以前的指令式程式設計那樣需要編寫具體怎麼實現。
比如炒菜, 用指令式程式設計需要編寫具體的實現
配菜(); 熱鍋(); 放油(); 翻炒(); 放調料(); 出鍋();
而如果是Stream流這種宣告式方式, 只需要一步操作 炒菜(); 就可以完成上面的炒菜功能。它關注的是我要做什麼, 而不是我要怎麼做。
與Collection集合使用外部迭代不同, Stream 流使用內部迭代, 它幫你把迭代做了, 還把得到的流值存在了某個地方, 你只要給出一個函式說要做什麼就可以了。
同一個流只能被消費一次, 下面這段程式碼執行會拋異常 java.lang.IllegalStateException
@Test public void test16() { List<Dish> menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT)); Stream<Dish> stream = menu.stream(); stream.forEach(System.out::println); stream.forEach(System.out::println); //java.lang.IllegalStateException: stream has already been operated upon or closed }
諸如filter、map、limit、sorted、distinct等中間操作會返回另一個流, 多箇中間操作連線起來就形成了一條流水線。除非流水線上觸發一個終端操作, 如forEach、count、collect, 否則中間操作不會執行任何處理。
因為Stream流的一個特性就是惰性求值, 只有在觸發了終端操作時, 才會把前面所有的中間操作合併起來一次性全部處理完。
Stream API
在正式介紹Stream API之前, 先引入一些實體類和幾組資料集合, 後面的程式碼示例會經常用到它們。
這裡插入一個小技巧,使用IDEA外掛 lombok 可以讓你不用重複的編寫實體類的Getter/Setter、構造方法等等,你只需要在實體類上新增一個 @Data 註解即可,lombok外掛會在編譯期間自動幫你生成Getter/Setter方法、toString方法。
@Data public class Dish { private String name; private boolean vegetarian; private int calories; private Type type; public Dish(String name, boolean vegetarian, int calories, Type type) { this.name = name; this.vegetarian = vegetarian; this.calories = calories; this.type = type; } public enum Type { MEAT, FISH, OTHER } public enum CaloricLevel { DIET, NORMAL, FAT } @Override public String toString() { return name; } } @Data public class Transaction { private Trader trader; private int year; private int value; private String currency; public Transaction(Trader trader, int year, int value) { this.trader = trader; this.year = year; this.value = value; } } @Data public class Trader { private String name; private String city; public Trader(String name, String city) { this.name = name; this.city = city; } }實體類
static List<Dish> menu; static List<Integer> nums; static List<Transaction> transactions; static { menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT), new Dish("chicken", false, 400, Dish.Type.MEAT), new Dish("french fries", true, 530, Dish.Type.OTHER), new Dish("rice", true, 350, Dish.Type.OTHER), new Dish("season fruit", true, 120, Dish.Type.OTHER), new Dish("pizza", true, 550, Dish.Type.OTHER), new Dish("prawns", false, 300, Dish.Type.FISH), new Dish("salmon", false, 450, Dish.Type.FISH)); nums = Arrays.asList(1, 3, 5, 7, 9, 11, 13); Trader raoul = new Trader("Raoul", "Cambridge"); Trader mario = new Trader("Mario", "Milan"); Trader alan = new Trader("Alan", "Cambridge"); Trader brian = new Trader("Brian", "Cambridge"); transactions = Arrays.asList( new Transaction(brian, 2011, 300), new Transaction(raoul, 2012, 1000), new Transaction(raoul, 2011, 400), new Transaction(mario, 2012, 710), new Transaction(mario, 2012, 700), new Transaction(alan, 2012, 950) ); }資料集合
map()方法用於將流中的元素對映成一個新的元素。
@Test public void test17() { //獲取每道菜的名稱的長度 List<Integer> list = menu.stream() .map(Dish::getName) .map(String::length) .collect(Collectors.toList()); }
flatMap()方法會把一個流中的每個值轉換成另一個流,然後把所有的流扁平化,連線起來形成一個新的流。
@Test public void test18() { List<String> words = Arrays.asList("hello", "world"); List<String> list = words.stream() .map(i -> i.split("")) .flatMap(Arrays::stream)//流扁平化,形成一個新的流 .distinct()//過濾重複的元素 .collect(Collectors.toList()); System.out.println(list);//result: [h, e, l, o, w, r, d] }
findFirst()用於返回流中的第一個元素,findAny() 返回流中任意一個元素。因為流可能是空的,所以findFirst()和findAny()的返回型別都是Optional<T>, 當流沒有元素時,就返回一個空的Optional。
對於findFirst()和findAny(),如果不關心返回的元素是哪個,使用findAny()在並行流時限制更少。
@Test public void test19() { menu.stream() .filter(Dish::isVegetarian) .findAny() .ifPresent(i -> System.out.println(i.getName()));//會在Optional包含值的時候執行給定的程式碼塊 }
你可以用 allMatch() 、noneMatch()和anyMatch()方法讓流匹配給定的謂詞Predicate<T>, 方法名就可見名知意, 分別對應 所有元素都要匹配、所有元素都不匹配、任意一個元素匹配。
通過reduce()方法可以對流進行歸約操作。
所謂規約操作就是將流中所有元素反覆結合起來, 最終得到一個值.
@Test public void test20() { Integer sum1 = nums.stream().reduce(0, Integer::sum); System.out.println(sum1); Optional<Integer> o1 = nums.stream().reduce(Integer::sum);//求和 System.out.println(o1.get()); Optional<Integer> o2 = nums.stream().reduce(Integer::max);//最大值 System.out.println(o2.get()); Integer count = menu.stream().map(d -> 1).reduce(0, Integer::sum);//計算流中元素的個數 menu.stream().count(); }
下面通過一段對交易員資料集合transactions進行處理的示例, 總結下常用的幾種Stream API。
@Test public void test21() { //(1) 找出2011年發生的所有交易,並按交易額排序(從低到高)。 List<Transaction> list = transactions.stream().filter(i -> 2011 == i.getYear()).sorted(Comparator.comparing(Transaction::getValue)).collect(Collectors.toList()); //(2) 交易員都在哪些不同的城市工作過? Set<String> cities = transactions.stream().map(Transaction::getTrader).map(Trader::getCity).collect(Collectors.toSet()); //(3) 查詢所有來自於劍橋的交易員,並按姓名排序。 List<Trader> trades = transactions.stream().map(Transaction::getTrader).filter(i -> "Cambridge".equals(i.getCity())).distinct().sorted(Comparator.comparing(Trader::getName)).collect(Collectors.toList()); //(4) 返回所有交易員的姓名字串,按字母順序排序。 String names = transactions.stream().map(Transaction::getTrader).distinct().map(Trader::getName).sorted().reduce("", (a, b) -> a + b); //(5) 有沒有交易員是在米蘭工作的? boolean flag = transactions.stream().map(Transaction::getTrader).anyMatch(trader -> "Milan".equals(trader.getCity())); //(6) 列印生活在劍橋的交易員的所有交易的總額。 Integer sum = transactions.stream().filter(i -> "Cambridge".equals(i.getTrader().getCity())).map(Transaction::getValue).reduce(0, Integer::sum); //(7) 所有交易中,最高的交易額是多少? Integer max = transactions.stream().map(Transaction::getValue).reduce(0, Integer::max); //(8) 找到交易額最小的交易。 Optional<Transaction> first = transactions.stream().min(Comparator.comparingInt(Transaction::getValue)); System.out.println(first.get()); }
原始型別流特化: IntStream, LongStream, DoubleStream的簡單使用以及和Stream流之間的相互轉換。
@Test public void test22() { int calories = menu.stream().mapToInt(Dish::getCalories).sum(); //對映到數值流 mapToXxx IntStream intStream = menu.stream().mapToInt(Dish::getCalories); //轉換回基本型別對應的物件流 Stream<Integer> stream = intStream.boxed(); //intStream.mapToObj(Integer::valueOf); //預設值OptionalInt List<Dish> list = new ArrayList<>(); OptionalInt optionalInt = list.stream().mapToInt(Dish::getCalories).max(); System.out.println(optionalInt.orElse(88)); //result: 88 // 數值範圍 long count = IntStream.rangeClosed(1, 102).filter(i -> i % 3 == 0).count(); System.out.println(count);//result: 34 }
構建流的幾種方式
由集合建立流, 根據數值範圍建立數值流, 由值建立流, 由陣列建立流, 由檔案生成流, 由函式生成無限流。
@Test public void test24() { IntStream.rangeClosed(1, 100);//根據數值範圍建立數值流 Stream<String> stream = Stream.of("java8", "蓋聶", "少司命");//由值建立流 int sum = Arrays.stream(new int[]{1, 2, 3, 4}).sum();//由陣列建立流 //由檔案生成流 ===>下面示例Files.lines得到一個流,流中的每個元素對應檔案中的一行 try (Stream<String> lines = Files.lines(Paths.get("1.txt"), Charset.defaultCharset())) { long count = lines.flatMap(line -> Arrays.stream(line.split(" "))) .distinct() .count(); } catch (IOException ex) { } //由函式生成流: 建立無限流 Stream.iterate(0, n -> n + 1) .limit(10) .forEach(System.out::println); Stream.iterate(new int[]{0, 1}, arr -> new int[]{arr[1], arr[0] + arr[1]}) //建立一個斐波納契元祖序列 .limit(10) .forEach(arr -> System.out.println("(" + arr[0] + ", " + arr[1] + ")")); Stream.generate(Math::random) .limit(5) .forEach(System.out::println); }
Collectors類中提供了一些靜態工廠方法, 用於流的歸約和彙總操作。
常見的有counting() 計算流中元素的個數,maxBy()和minBy() 取出流中某個屬性值最大或最小的元素,joining() 將對流中每一個物件應用 toString() 方法得到的所有字串連線成一個字串,reducing() 對流中的元素進行歸約操作等等。
下面是簡單的示例, 類中已經匯入了Collectors類中的所有靜態方法。
@Test public void test1() { Long count = menu.stream().collect(counting());//選單裡有多少種菜 Optional<Dish> optionalDish = menu.stream().collect(maxBy(comparingInt(Dish::getCalories)));//選單裡熱量最高的菜 Integer totalCalories1 = menu.stream().collect(summingInt(Dish::getCalories));//選單列表的總熱量 Double averageCalories = menu.stream().collect(averagingInt(Dish::getCalories));//選單列表的熱量平均值 IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));//一次迭代,統計出選單列表元素個數, 菜餚熱量最大值、最小值、平均值、總和 System.out.println(intSummaryStatistics.toString()); //result: IntSummaryStatistics{count=9, sum=4200, min=120, average=466.666667, max=800} String names = menu.stream().map(Dish::getName).collect(joining(","));//連線字串 Integer totalCalories2 = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum));//選單列表的總熱量 }
流的分組和分割槽操作 groupingBy(), partitioningBy()
所謂分組,就是將流中的元素按某個屬性根據一定的規則分為不同的小塊。比如常見的考試評定班級學生成績情況,分數<60 為不及格,60<=分數<80為良好,80<=分數為優秀,這個就是分組。
分割槽則比較特殊,它是根據一個謂詞Predicate<T>作為分類函式,也就是分出來的只會有兩種型別,對應的Map鍵就是布林型別。
@Test public void test2() { //單級分組 Map<Type, List<Dish>> map1 = menu.stream().collect(groupingBy(Dish::getType)); //多級分組 result: {FISH={NORMAL=[salmon], DIET=[prawns]}, OTHER={NORMAL=[french fries, pizza], DIET=[rice, season fruit]}, MEAT={NORMAL=[chicken], FAT=[pork, beef]}} Map<Type, Map<CaloricLevel, List<Dish>>> map2 = menu.stream().collect(groupingBy(Dish::getType, groupingBy(dish -> { if (dish.getCalories() < 400) return DIET; else if (dish.getCalories() < 700) return NORMAL; else return FAT; }))); //選單中每種型別的菜餚的數量 Map<Type, Long> map3 = menu.stream().collect(groupingBy(Dish::getType, counting()));//result: {FISH=2, OTHER=4, MEAT=3} //選單中每種型別熱量最高的菜餚 Map<Type, Optional<Dish>> map4 = menu.stream().collect(groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //上面分組操作後的Optional<Dish>是一定有值的,所以這個Optional包裝沒什麼意義,可以通過collectingAndThen()方法把Dish直接提取出來 Map<Type, Dish> map5 = menu.stream().collect(groupingBy(Dish::getType, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //根據菜餚型別分組,獲取所有的菜餚名稱 result: {MEAT=[chicken, beef, pork], OTHER=[season fruit, pizza, rice, french fries], FISH=[salmon, prawns]} LinkedHashMap<Type, Set<String>> map6 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toSet()))); //在上面的例子中, toSet()方法生成的收集器我們是無法指定Set型別的, 可以使用toCollection()工廠方法來指定集合型別, 比如LInkedHashSet LinkedHashMap<Type, LinkedHashSet<String>> menu7 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toCollection(LinkedHashSet::new)))); //按菜餚是否素食進行分割槽 result: {false=[chicken, salmon, prawns, beef, pork], true=[rice, french fries, pizza, season fruit]} Map<Boolean, HashSet<Dish>> map9 = menu.stream().collect(partitioningBy(Dish::isVegetarian, toCollection(HashSet::new))); //獲取素食和非素食中熱量最高的菜餚 result: {false=pork, true=pizza} Map<Boolean, Dish> map10 = menu.stream().collect(partitioningBy(Dish::isVegetarian, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get))); //將前20個自然數按質數和非質數分割槽 Map<Boolean, List<Integer>> map11 = IntStream.rangeClosed(2, 20).boxed().collect(partitioningBy(this::isPrime)); } private boolean isPrime(int candidate) { int sqrt = (int) Math.sqrt(candidate); return IntStream.rangeClosed(2, sqrt).noneMatch(i -> candidate % i == 0); }
自定義收集器的兩種方式
- 實現Collector介面
- 使用Stream類的過載方法collect(),這種方式只有
IDENTITY_FINISH
特徵(即對結果容器做最終型別轉換的finisher()方法返回的是一個恆等函式)的收集器才能使用。
@Test public void test3() { //粗糙的自定義收集器 List<Dish> list = menu.stream().collect(new ToListCollector<Dish>()); //對於IDENTITY_FINISH這種最終函式是恆等函式的收集操作,可以用Stream中的過載方法collect()實現同樣的效果 HashSet<Object> hashset = menu.stream().collect(HashSet::new, HashSet::add, HashSet::addAll); } public class ToListCollector<T> implements Collector<T, List<T>, List<T>> { /** * 建立一個空的結果容器,供資料收集使用 */ @Override public Supplier<List<T>> supplier() { return ArrayList::new; } /** * 將元素新增到結果容器 */ @Override public BiConsumer<List<T>, T> accumulator() { return List::add; } /** * 此方法定義了在使用並行流時,從各個子流進行歸約所得的結果容器要如何合併在一起 */ @Override public BinaryOperator<List<T>> combiner() { return (left, right) -> { left.addAll(right); return left; }; } /** * 對結果容器做最終型別轉換 */ @Override public Function<List<T>, List<T>> finisher() { return Function.identity(); } /** * 定義收集器的一些行為特徵,比如無序歸約、並行歸約、最終型別轉換finisher()返回的函式是一個恆等函式 */ @Override public Set<Characteristics> characteristics() { return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT)); } }View Code
呼叫流的 sequential() 或 parallel() 方法可以指定流順序/並行執行,其底層原理就是改變一個記錄是否並行執行的標誌的布林變數的值來實現的。
並行流內部使用了預設的 ForkJoinPool 分支/合併框架,它預設的執行緒數就是當前機器的處理器數量,這個值是由 Runtime.getRuntime().availableProcessors() 得到的,可以通過下面的方式改變執行緒池的大小,但不建議,因為一旦執行緒數超過了處理器的數量,就可能會引發併發訪問的共享資源競爭問題。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "128");//全域性設定
下面這段程式碼對原始迭代、並行流、順序流的幾種方式進行了測試,它們使用不同的實現方式對 1~10000000 之間的自然數求和,你會看到,在某些場景下如果不恰當的使用了並行流,反而會大大降低效能,比如Stream類的iterate()方法生成的流使用並行反而會增加額外開銷。
因為每次應用iterate()方法時都要依賴前一次應用的結果,因此無法有效的把流劃分為多個小塊來並行處理,這裡把流標記成並行,實則給原本的順序處理增加了額外的開銷
@Test public void test1() { long sec1 = this.measureSumPerf(ParallelStream::iterativeSum, 1000_0000); System.out.println(sec1);//4毫秒 long sec2 = this.measureSumPerf(ParallelStream::sequentialSum, 1000_0000); System.out.println(sec2);//16毫秒 //每次應用iterate()方法時都要依賴前一次應用的結果,因此無法有效的把流劃分為多個小塊來並行處理,這裡把流標記成並行,實則給原本的順序處理增加了額外的開銷 long sec3 = this.measureSumPerf(ParallelStream::parallelSum, 1000_0000); System.out.println(sec3);//241毫秒 } public long measureSumPerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) fastest = duration; } return fastest; } public class ParallelStream { public static long sequentialSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .sum(); // return LongStream.rangeClosed(1, n).reduce(0, Long::sum);//4毫秒 } public static long iterativeSum(long n) { long sum = 0; for (long i = 1; i < n + 1; i++) { sum += i; } return sum; } public static long parallelSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .parallel() .sum(); // return LongStream.rangeClosed(1, n).parallel().reduce(0, Long::sum);//2毫秒 } }View Code
同理, 類似limit和findFirst這種依賴於元素順序的操作,在並行流上的效能一般會比順序流差。
參考資料
Java8實戰
作者:張小凡
出處:https://www.cnblogs.com/qingshanli/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】。