1. 程式人生 > >Lambda表示式最佳實踐(2)Stream與ParallelStream

Lambda表示式最佳實踐(2)Stream與ParallelStream

Stream是Java8新引入的API,有著廣泛的運用

建立一個Stream

Stream建立之後,就不能修改

建立一個空的Stream

Stream<String> streamEmpty = Stream.empty();

一般的,我們用如下這種寫法避免空指標異常

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

各種建立一個Stream的方法

雖然不常用(因為Stream是一次性的),大家看個熱鬧就好

Stream of collection

Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
streamOfCollection.forEach(a -> System.out.println(a));

陣列的Stream

String[] arr = new String[]{"a", "b", "c"
, "d", "e", "f"}; Stream<String> streamOfArrayFull = Arrays.stream(arr); streamOfArrayFull.forEach(a -> System.out.println(a)); Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);//下標從零開始,startInclusive,endExclusive streamOfArrayPart.forEach(a -> System.out.println(a));

StreamBuilder

Stream<String> streamBuilder =Stream.<String>builder().add("a").add("b").add("c").build();
streamBuilder.forEach(a -> System.out.println(a));

Stream.generate()

//generate會建立一個無限大的Stream直到記憶體上限,用limit限制大小
Stream<String> streamGenerated = Stream.generate(() -> "element").limit(10);
streamGenerated.forEach(a -> System.out.println(a));

Stream.iterate()

//從40作為第一個元素開始,每個元素遞增2,一共20個元素
Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
streamIterated.forEach(a -> System.out.println(a));

原生型別Stream

range(int startInclusive, int endExclusive)
rangeClosed(int startInclusive, int endInclusive)

IntStream intStream = IntStream.range(1, 3);
intStream.forEach(a -> System.out.println(a));
LongStream longStream = LongStream.rangeClosed(1, 3);
longStream.forEach(a -> System.out.println(a));

同時,Random還提供了隨機Stream

Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
doubleStream.forEach(a -> System.out.println(a));

Stream of String
通過String獲取char的utf-8編碼(系統預設編碼)Stream:

IntStream streamOfChars = "寫作abc".chars();
streamOfChars.forEach(a -> System.out.println(a));//輸出:20889,20316,97,98,99

通過正則分割提取Stream

Stream<String> streamOfString =
                Pattern.compile(", ").splitAsStream("a, b, c");
streamOfString.forEach(a -> System.out.println(a));

檔案Stream
可以讀取檔案每一行作為一個Stream

Path path = Paths.get("D:\\test.txt");
Stream<String> streamOfStrings = Files.lines(path);
streamOfStrings.forEach(a -> System.out.println(a));
//可以指定編碼讀取每一行
Stream<String> streamWithCharset =
        Files.lines(path, Charset.forName("UTF-8"));
streamWithCharset.forEach(a -> System.out.println(a));

Stream 工作流

一般Stream工作流分為三步:源處理,中間處理,結束處理

源處理,例如跳過1個元素:

Stream<String> onceModifiedStream =
  Stream.of("abcd", "bbcd", "cbcd").skip(1);

中間處理:例如轉換

Stream<String> twiceModifiedStream =
  stream.skip(1).map(element -> element.substring(0, 3));

結束處理:只能有一個,Stream不可以複用(經過結束處理的)

Stream<String> onceModifiedStream =
                Stream.of("abcd", "bbcd", "cbcd").skip(1);
long count = onceModifiedStream.skip(1).map(element -> element.substring(0, 3)).count();
System.out.println(count);

Stream的使用

一定注意,Stream不可以複用(經過結束處理的),一般Stream的使用如前文所述,分為三步:源處理,中間處理,結束處理

Stream<String> stream = 
  Stream.of("a", "b", "bbb", "bb", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
//如果存在,輸出該元素,不存在則輸出--
System.out.println(anyElement.orElse("--"));

List<String> elements =
  Stream.of("a", "b", "bbb", "bb", "c").filter(element -> element.contains("b"))
    .collect(Collectors.toList());
anyElement = elements.stream().findAny();
System.out.println(anyElement.orElse("--"));
Optional<String> firstElement = elements.stream().findFirst();
System.out.println(firstElement.orElse("--"));

Stream懶呼叫

工作流中的中間呼叫是懶呼叫,意思就是如果沒有結束處理,那麼中間處理是不會被執行的。例如:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
list.stream().filter(element -> {
    System.out.println("中間處理");
    return element.contains("2");
});

“中間處理”是不會輸出的
只有像這樣有結束處理,中間處理才會被呼叫:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
list.stream().filter(element -> {
    System.out.println("中間處理");
    return element.contains("2");
}).findFirst();

執行順序

有時候需要注意下那些處理放在前面,哪些處理放在後面,來提高整體程式碼效率,例如:

long size = list.stream().map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).skip(2).count();

這個map會被呼叫3次,但是實際沒有必要,因為只用保留一個元素,所以應該將skip放在開頭,如下所示:

long size = list.stream().skip(2).map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).count();

所以,一般的,skip(), filter(), distinct() 這些應該放在工作流的開頭

Stream 降維

Stream原生api中提供了很多統計功能的介面,例如:count(), max(), min(), sum();假設我們想實現自定義的降維規則,可以用 reduce() 和 collect()這兩個介面

reduce()

包括三個引數:

  • identity:accumulator的初始值,並且是在Stream為空時的預設值
  • accumulator:合併邏輯,在每一步合併兩個元素,只有最後一步的有用,但是用這個效率不高
  • combiner:accumulator的併發改進版,但必須在ParallelStream的上下文下執行,同時在有identity初始值的情況下結果與accumulator不一樣

例子:

OptionalInt reduced =
  IntStream.range(1, 4).reduce((a, b) -> a + b);

reduced = 1 + 2 + 3 = 6

int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
 log.info("combiner was called");
 return a + b;
});

reducedTwoParams = 10 + 1 + 2 + 3 = 16
由於沒有在ParallelStream下,所以combiner沒有被呼叫

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called");
       return a + b;
    });

reducedParallel = 12 + 13 = 25; 25 + 11 = 36

collect()

List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
  new Product(14, "orange"), new Product(13, "lemon"),
  new Product(23, "bread"), new Product(13, "sugar"));

將Stream轉換為List或者Set

List<String> collectorCollection = 
  productList.stream().map(Product::getName).collect(Collectors.toList());

拼接成一個字串

String listToString = productList.stream().map(Product::getName)
  .collect(Collectors.joining(", ", "[", "]"));

求物件某一數值域平均值

double averagePrice = productList.stream()
    .collect(Collectors.averagingDouble(Product::getPrice));

求物件某一數值域總和

double summingPrice = productList.stream()
  .collect(Collectors.summingDouble(Product::getPrice));

求物件某一數值域統計資料

DoubleSummaryStatistics doubleSummaryStatistics = productList.stream().collect(Collectors.summarizingDouble(Product::getPrice));

輸出:DoubleSummaryStatistics{count=5, sum=86.000000, min=13.000000, average=17.200000, max=23.000000}

按照某一物件域分類

Map<Double, List<Product>> collectorMapOfLists = productList.stream().collect(Collectors.groupingBy(Product::getPrice));

結果是:{23.0:[{"name":"potatoes","price":23.0},{"name":"bread","price":23.0}],13.0:[{"name":"lemon","price":13.0},{"name":"sugar","price":13.0}],14.0:[{"name":"orange","price":14.0}]}

按某一條件區分

Map<Boolean, List<Product>> booleanListMap = productList.stream().collect(Collectors.partitioningBy(a -> a.price > 13));

結果是:{false:[{"name":"lemon","price":13.0},{"name":"sugar","price":13.0}],true:[{"name":"potatoes","price":23.0},{"name":"orange","price":14.0},{"name":"bread","price":23.0}]}

collect之後輔以其他操作

Set<Product> unmodifiableSet = productList.stream()
                .collect(Collectors.collectingAndThen(Collectors.toSet(),
                        Collections::unmodifiableSet));

自定義Collector

和reduce有些類似,collector也是有三部分組成:

  • supplier:初始化用的,就是collector返回的結果型別的容器,一般是構造器,例如Collectors.toSet()的Supplier就是HashSet::new
  • accumulator: 用於收集每一個元素,在非parallel環境下使用
  • combiner:用於收集每一個元素,在parallel環境下使用,由於是併發,所以入參是兩個已經收集一些元素的容器

例如實現一個LinkedList收集器:

Collector<Product, ?, LinkedList<Product>> toLinkedList =
    Collector.of(() -> {
        System.out.println("supplier is called");
        return new LinkedList<>();
    }, (a, b) -> {
        System.out.println("accumulator is called");
        a.add(b);
    },
    (first, second) -> {
        System.out.println("combiner is called");
        first.addAll(second);
        return first;
    });

stream呼叫:

LinkedList<Product> collect = productList.stream().collect(toLinkedList);

輸出:

supplier is called
accumulator is called
accumulator is called
accumulator is called
accumulator is called
accumulator is called

paralleleStream呼叫:

LinkedList<Product> collect2 = productList.parallelStream().collect(toLinkedList);

輸出:

supplier is called
supplier is called
supplier is called
accumulator is called
supplier is called
accumulator is called
accumulator is called
combiner is called
supplier is called
accumulator is called
accumulator is called
combiner is called
combiner is called
combiner is called

ParallelStream

前面已經多多少少提到了ParallelStream,對於ParallelStream,我們需要知道里面的執行是非同步的,並且使用的執行緒池是ForkJoinPool.common,可以通過設定-Djava.util.concurrent.ForkJoinPool.common.parallelism=N來調整執行緒池的大小;

ParallelStream的作用

Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務切分成多個小任務,這表示每個任務都是一個操作,因此像以下的程式片段:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println);  

你得到的展示順序不一定會是1、2、3、4、5、6、7、8、9,而可能是任意的順序

parallelStream背後的男人:ForkJoinPool

要想深入的研究parallelStream之前,那麼我們必須先了解ForkJoin框架和ForkJoinPool.本文旨在parallelStream,但因為兩種關係甚密,故在此簡單介紹一下ForkJoinPool,如有興趣可以更深入的去了解下ForkJoin***(當然,如果你想真正的搞透parallelStream,那麼你依然需要先搞透ForkJoinPool).*

ForkJoin框架是從jdk7中新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法像任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼效能的差異呢?
首先,使用ForkJoinPool能夠使用數量有限的執行緒來完成非常多的具有父子關係的任務,比如使用4個執行緒來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個執行緒,顯然這是不可行的。

工作竊取演算法

forkjoin最核心的地方就是利用了現代硬體裝置多核,在一個操作時候會有空閒的cpu,那麼如何利用好這個空閒的cpu就成了提高效能的關鍵,而這裡我們要提到的工作竊取(work-stealing)演算法就是整個forkjion框架的核心理念,工作竊取(work-stealing)演算法是指某個執行緒從其他佇列裡竊取任務來執行。

那麼為什麼需要使用工作竊取演算法呢?
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應,比如A執行緒負責處理A佇列裡的任務。但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

工作竊取演算法的優點是充分利用執行緒進行平行計算,並減少了執行緒間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且消耗了更多的系統資源,比如建立多個執行緒和多個雙端佇列。

用看forkjion的眼光來看ParallelStreams

上文中已經提到了在Java 8引入了自動並行化的概念。它能夠讓一部分Java程式碼自動地以並行的方式執行,也就是我們使用了ForkJoinPool的ParallelStream。

Java 8為ForkJoinPool添加了一個通用執行緒池,這個執行緒池用來處理那些沒有被顯式提交到任何執行緒池的任務。它是ForkJoinPool型別上的一個靜態元素,它擁有的預設執行緒數量等於執行計算機上的處理器數量。當呼叫Arrays類上新增的新方法時,自動並行化就會發生。比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新新增的Stream API中。

使用ParallelStream需要注意的

在ParallelStream中,如果我們需要收集結果,一種我們是使用collect收集,還有collect無法涵蓋的情況例如收集多個結果,這時我們需要使用執行緒安全的集合收集:

List<Product> resultA = new CopyOnWriteArrayList<>();
List<Product> resultB = new CopyOnWriteArrayList<>();

productList.parallelStream().forEach(product -> {
    if (product.name.length() > 4) {
        resultA.add(product);
    } else {
        resultB.add(product);
    }
});

而且,有時候我們不想只用預設的執行緒池,而是用我們自己的執行緒池,這時,上面的程式碼可以修改成:

List<Product> resultA = new CopyOnWriteArrayList<>();
List<Product> resultB = new CopyOnWriteArrayList<>();

ForkJoinPool forkJoinPool = new ForkJoinPool(5);

forkJoinPool.submit(() -> {
    productList.parallelStream().forEach(product -> {
        if (product.name.length() > 4) {
            resultA.add(product);
        } else {
            resultB.add(product);
        }
    });
}).get();

一定要注意,最後的.get()不能少,因為原來的parallelStream執行,就是同步等待結果完成,程式才會繼續執行的