1. 程式人生 > >java8--Stream流

java8--Stream流

轉載自:冰橘檸檬
https://blog.csdn.net/weixin_39124328/article/details/83065123

為什麼需要 Stream
Java8中的stream與 java.io 包裡的 InputStream 和 OutputStream 是完全不同的概念,也不同於 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大資料實時處理的 Stream,它是對集合(Collection)物件功能的增強,它專注於對集合物件進行各種非常便利、高效的聚合操作(aggregate operation),或者大批量資料操作 (bulk data operation)。Stream API 藉助於同樣新出現的 Lambda 表示式,極大的提高程式設計效率和程式可讀性。同時它提供序列和並行兩種模式進行匯聚操作,併發模式能夠充分利用多核處理器的優勢,使用 fork/join 並行方式來拆分任務和加速處理過程。通常編寫並行程式碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多執行緒的程式碼,就可以很方便地寫出高效能的併發程式。

Stream的特性
在集合和其他資料集上執行函式式查詢。Stream可以認為是一個高階版本的Iterator,它代表著資料流,流中的資料元素的數量可以是有限的,也可以是無限的。其差別在於:
  無儲存:Stream是基於資料來源的物件,它本身不儲存資料元素,而是通過管道將資料來源的元素傳遞給操作。
  函數語言程式設計:對Stream的任何修改都不會修改背後的資料來源,比如對Sream的filter操作並不會刪除被過濾掉的元素,而是生成一個不含被過濾元素的新的Stream。
  延遲執行:Stream由一個或多箇中間操作(intermediate operation)和一個結束操作(terminal operation)兩部分組成。只有執行了結束操作,Stream定義的中間操作才會依次執行,這就是Stream的延遲特性。
  可消費性:Stream只能被“消費”一次,一旦遍歷過就會失效。就像容器的迭代器那樣,想要再次遍歷必須重新生成一個新的Stream。

流管道剖析
JDK 中的流來源

方法 描述
Collection.stream() 使用一個集合的元素建立一個流
Stream.of(T…) 使用傳遞給工廠方法的引數建立一個流
Stream.of(T[]) 使用一個數組的元素建立一個流

中間流操作

操作 描述
filter(Predicate) 與預期匹配的流的元素(過濾流中的元素)
map(Function<T, U>) 將提供的函式應用於流的元素的結果
flatMap(Function<T, Stream> 將提供的流處理函式應用於流元素後獲得的流元素
distinct() 去除流中的重複元素(使用元素的equal方法判斷)
sorted() 按自然順序排序的流元素
Sorted(Comparator) 按提供的比較符排序的流元素
limit(long) 截斷至所提供長度的流元素
skip(long) 去除流中的重複元素(使用元素的equal方法判斷)
takeWhile(Predicate) (僅限 Java 9)在第一個提供的預期不是 true 的元素處截斷的流元素
dropWhile(Predicate) (僅限 Java 9)丟棄了所提供的預期為 true 的初始元素分段的流元素

中間操作始終是惰性的:呼叫中間操作只會設定流管道的下一個階段,不會啟動任何操作。重建操作可進一步劃分為無狀態 和有狀態 操作。無狀態操作(比如 filter() 或 map())可獨立處理每個元素,而有狀態操作(比如 sorted() 或 distinct())可合併以前看到的影響其他元素處理的元素狀態。

終止流操作

操作 描述
forEach(Consumer action) 將提供的操作應用於流的每個元素。
toArray() 使用流的元素建立一個數組。
reduce(…) 將流的元素聚合為一個彙總值。
collect(…) 將流的元素聚合到一個彙總結果容器中。
min(Comparator) 通過比較符返回流的最小元素。
max(Comparator) 通過比較符返回流的最大元素。
count() 返回流的大小。
{any,all,none}Match(Predicate) 返回流的任何/所有元素是否與提供的預期相匹配。
findFirst() 返回流的第一個元素(如果有)。
findAny() 返回流的任何元素(如果有)。

附加資訊
大多數流操作都要求傳遞給它們的拉姆達表達是互不干擾和無狀態的。互不干擾意味著它們不會修改流來源;無狀態意味著它們不會訪問(讀或寫)任何可能在流操作壽命內改變的狀態。對於縮減操作(例如計算 sum、min 或 max 等彙總資料),傳遞給這些操作的拉姆達表示式必須是具有結合性的(或遵守類似的要求)。

如下使用有狀態拉姆達表示式的流管道(不要這麼做!)

HashSet<Integer> twiceSeen = new HashSet<>();
int[] result
    = elements.stream()
              .filter(e -> {
                  twiceSeen.add(e * 2);
                  return twiceSeen.contains(e);
              })
              .toArray();

如果並行執行,此管道會生成錯誤的結果,原因有兩個。首先,對 twiceSeen 集的訪問是從多個執行緒進行的,沒有進行任何協調,因此不是執行緒安全的。第二,因為資料被分割槽了,所以無法確保在處理給定元素時已經處理了該元素前面的所有元素。

聚合操作
累加器反模式
示例一: 使用 Streams 宣告性地計算聚合值


int totalSalesFromNY
    = txns.stream()
          .filter(t -> t.getSeller().getAddr().getState().equals("NY"))
          .mapToInt(t -> t.getAmount())
          .sum();

示例二:用傳統方法計算聚合值

int sum = 0;
for (Txn t : txns) {
    if (t.getSeller().getAddr().getState().equals("NY"))
        sum += t.getAmount();
}

對比:
lambda表示式的優點:程式碼更清晰,簡單地被構造為一些簡單操作的組合;通過宣告進行表達;隨著宣告表達條件的複雜化,可以乾淨的拓展。
傳統方式缺點:示例二為累加器反模式,首先宣告並初始化一個可變累加器變數 (sum),然後繼續在迴圈中更新累加器。為什麼這樣做是不正確的?首先,此程式碼樣式難以並行化,沒有協調(比如同步),對累加器的每次訪問都導致一次資料爭用;另一個原因是,它在太低的級別上建模計算 — 在各個元素的級別上,而不是在整個資料集的級別上。與 “逐個依次迭代交易金額,將每筆金額新增到一個已初始化為 0 的累加器” 相比,“所有交易金額的總和” 是目標的更抽象、更直接的陳述。

縮減
縮減(也稱為摺疊)是一種來自函式程式設計的技術,它抽象化了許多不同的累加操作。給定一個型別為 T,包含 x 個元素的非空數列 X1, x2, …, xn 和 T 上的二元運算子(在這裡表示為 *), T下的 X 的縮減 被定義為:

(x1 * x2 * …* xn)

當使用普通的加法作為二元運算子來應用於某個數列時,縮減就是求和。但其他許多操作也可以使用縮減來描述。如果二元運算子是 “獲取兩個元素中較大的一個”(這在 Java 中可以使用拉姆達表示式 (x,y) -> Math.max(x,y) 來表示,或者更簡單地表示為方法引用 Math::max),則縮減對應於查詢最大值。

通過將累加描述為縮減,而不使用累加器反模式,可以採用更抽象、更緊湊、更並行化 的方式來描述計算 — 只要您的二元運算子滿足一個簡單條件:結合性。回想一下,如果 a、b 和 c 元素滿足以下條件,二元運算子 * 就是結合性的:

((a * b) * c) = (a * (b * c))
結合性意味著分組無關緊要。如果二元運算子是結合性的,那麼可以按照任何順序安全地執行縮減。在順序執行中,執行的自然順序是從左向右;在並行執行中,資料劃分為分段,分別縮減每個分段,然後組合結果。結合性可確保這兩種方法得到相同的答案。如果將結合性的定義擴充套件到 4 項,可能更容易理解:

(((a * b) * c) * d) = ((a * b) * (c * d))

左側對應於典型的順序計算;右側對應於表示典型的並行執行的分割槽執行,其中輸入序列被分解為幾部分,各部分並行縮減,並使用 * 將各部分的結果組合起來。(或許令人驚奇的是,* 不需要是可交換的,但許多運算子通常都可用於縮減,比如相加和求最大值等。具有結合性但沒有可交換性的二元運算子的一個例子是字串串聯,如下所示。)

滿足:((a * b) * c) = (a * (b * c))
但是不滿足:((a * b) * c) =(c * (a * b) ) 

縮減方法

Optional<T> reduce(BinaryOperator<T> op)
T reduce(T identity, BinaryOperator<T> op)
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner);

縮減不需要僅應用於整數和字串,它可以應用於您想要將元素序列縮減為該型別的單個元素的任何情形。例如,您可以通過縮減來計算最高的人:

Comparator<Person> byHeight = Comparators.comparingInt(Person::getHeight);
BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight);
Optional<Person> tallest = people.stream().reduce(tallerOf);

可變縮減
縮減獲取一個值序列並將它縮減為單個值,比如數列的和或它的最大值。但是有時您不想要單個彙總值;您想將結果組織為類似 List 或 Map 的資料結構,或者將它縮減為多個彙總值。在這種情況下,您應該使用縮減 的可變類似方法,也稱為收集。

累加器方式:

ArrayList<String> list = new ArrayList<>();
for (Person p : people)
    list.add(p.toString());

當累加器變數是一個簡單值時,縮減是累加的更好替代方法,與此類似,在累加器結果是更復雜的資料結構時,也有一種更好的替代方法–可變縮減。

可變縮減類似的方法:
  一種生成空結果容器的途徑
  一種將新元素合併到結果容器中的途徑
  一種合併兩個結果容器的途徑

這些構建塊可以輕鬆地表達為函式。這些函式中的第 3 個支援並行執行可變縮減:您可以對資料集進行分割槽,為每一部分生成一箇中間累加結果,然後合併中間結果。Streams 庫有一個 collect() 方法,它接受以下 3 個函式:

<R> collect(Supplier<R> resultSupplier,
            BiConsumer<R, T> accumulator, 
            BiConsumer<R, R> combiner)

使用縮減來計算字串串聯,因為 Java 中的字串是不可變的,而且串聯要求複製整個字串,所以它還有 O(n2) 執行時(一些字串將複製多次)。您可以通過將結果收集到 StringBuilder 中,更高效地表達字串串聯:


    StringBuilder concat = strings.stream()
                              .collect(() -> new StringBuilder(),
                                       (sb, s) -> sb.append(s),
                                       (sb, sb2) -> sb.append(sb2));

//兩程式碼相同


    StringBuilder concat = strings.stream()
                              .collect(StringBuilder::new,
                                       StringBuilder::append,
                                       StringBuilder::append);

收集器

List<String> list = Stream.of("hello", "world", "hello", "java")
						  .collect(Collectors.toList()); //收集結果為list集合

常見收集器

收集器 行為
toList() 將元素收集到一個 List 中
toSet() 將元素收集到一個 Set 中
reducing() 向元素應用縮減(通常用作下游收集器,比如用於 groupingBy)(各種版本)
minBy(BinaryOperator) 計算元素的最小值(與 maxBy() 相同)
counting() 計算元素數量。(通常用作下游收集器)
groupingBy(Function<T,U>) 將元素分組到一個 Map 中,其中的鍵是所提供的應用於流元素的函式,值是共享該鍵的元素列表

收集器的應用
將資料收集用到Collector抽象中的語法上更簡單,但有時你會有複雜的結果彙總,如groupingBy() 收集器,例如,要建立超過 1000 美元的交易的 Map,可以使用賣家作為鍵:

Map<Seller, List<Txn>> bigTxnsBySeller =
    txns.stream()
        .filter(t -> t.getAmount() > 1000)
        .collect(groupingBy(Txn::getSeller));

假設不想要每個賣家的交易 List,而想要來自每個賣家的最大交易,可以這樣:

Map<Seller, Txn> biggestTxnBySeller =
    txns.stream()
        .collect(groupingBy(Txn::getSeller,
                            maxBy(comparing(Txn::getAmount))));

如果不想要該賣家的最大交易,而想要總和,可以使用 summingInt() 收集器:

Map<Seller, Integer> salesBySeller =
    txns.stream()
         .collect(groupingBy(Txn::getSeller,
                            summingInt(Txn::getAmount)));

要獲得多級彙總結果,比如每個區域和賣家的銷售,可以使用另一個 groupingBy 收集器作為下游收集器:

Map<Region, Map<Seller, Integer>> salesByRegionAndSeller =
    txns.stream()
        .collect(groupingBy(Txn::getRegion,
                            groupingBy(Txn::getSeller, 
                                       summingInt(Txn::getAmount))));

要計算一個文件中的詞頻直方圖,可以使用 BufferedReader.lines() 將文件拆分為行,使用 Pattern.splitAsStream() 將它分解為一個單詞流,然後使用 collect() 和 groupingBy() 建立一個 Map,後者的鍵是單詞,值是這些單詞的數量:

Pattern whitespace = Pattern.compile("\\s+");
Map<String, Integer> wordFrequencies =
    reader.lines()//獲取stream
          .flatMap(s -> whitespace.splitAsStream())
          .collect(groupingBy(String::toLowerCase),
                              Collectors.counting());

Stream總結:
■ 不是資料結構
它沒有內部儲存,它只是用操作管道從 source(資料結構、陣列、generator function、IO channel)抓取資料。
■ 它也絕不修改自己所封裝的底層資料結構的資料。例如 Stream 的 filter 操作會產生一個不包含被過濾元素的新 Stream,而不是從 source 刪除那些元素。
■ 所有 Stream 的操作必須以 lambda 表示式為引數
■ 不支援索引訪問
你可以請求第一個元素,但無法請求第二個,第三個,或最後一個。不過請參閱下一項。
■ 但是可以很容易生成陣列或者 List
■ 惰性化
中間操作 Stream 是向後延遲的,一直到它弄清楚了最後需要多少資料才會開始,就是當遇到末端操作時才會開始進行中間操作。Intermediate 操作永遠是惰性化的。
■ 並行能力
當一個 Stream 是並行化的,就不需要再寫多執行緒程式碼,所有對它的操作會自動並行進行的。
■ Stream可以是無限的
集合有固定大小,Stream 則不必。limit(n) 和 findFirst() 這類的 short-circuiting(短路) 操作可以對無限的 Stream 進行運算並很快完成。