1. 程式人生 > 其它 >Java : Stream 資料流 (Collection 介面擴充, Stream基本操作, MapReduce 模型)

Java : Stream 資料流 (Collection 介面擴充, Stream基本操作, MapReduce 模型)

技術標籤:Java演算法及JDK原始碼探究我的百寶箱日常小知識隨筆java大資料Stream

文章目錄


從 JDK 1.8 發起的時候實際上就是世界上大資料興起的時候, 在大資料開發裡面有一個最經典的模型: MapReduce, 實際上這屬於資料的兩個操作階段:

  • Map : 處理資料
  • Reduce: 分析資料.

而在類集裡面, 由於其本身的作用也可以進行大量資料的儲存, 所以順其自然的產生了 MapReduce 的操作, 而這些操作就通過 Stream 資料流來完成了.

Collection 介面改進

現在的 Collection 介面除了定義了一些抽象方法之外, 也提供有一些普通方法, 下面來觀察這樣一個方法:

  • foreach 的輸出支援: public default void forEach(Consumer<? super T> action)
  • 取得Steam 資料流的物件: public default Stream<E> stream()

範例: 使用foreach 輸出

package com.beyond.nothing;

import java.util.ArrayList;
import java.util.List;

public class test {

    public static void main(String[] args) throws Exception {
        List<String> all = new ArrayList<>();
        all.add("java");
        all.add("python"
); all.add("javaScript"); all.add("php"); all.forEach(System.out::println); } }

這種操作只是對資料進行顯示(foreach是一個消費型介面), 如果需要更復雜的處理, 還是使用 Iterator 方便一些.
但是在 Collection 介面中提供有一個重要的Stream() 的方法, 這個方法才是整個 JDK1.8 資料操作的關鍵.

範例: 觀察 Stream

package com.beyond.nothing;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class test {

    public static void main(String[] args) throws Exception {
        List<String> all = new ArrayList<>();
        all.add("java");
        all.add("python");
        all.add("javaScript");
        all.add("php");

        Stream<String> stream = all.stream();
        System.out.println(stream.count());

    }
}

將我們的資料交給Stream之後, 就相當於這些資料一個一個進行處理, 上面的 count() 只是做了一個數據的統計, 整體的流程還是體現了遍歷的的操作.

Stream 資料流的基本操作

在之前使用的count() 方法是針對於資料量做了一個統計的操作, 那麼除了這些之外, 也可以進行資料的過濾, 即滿足某些條件的內容才允許做數量統計.

範例: 資料過濾 filter()

package com.beyond.nothing;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class test {

    public static void main(String[] args) throws Exception {
        List<String> all = new ArrayList<>();
        all.add("java");
        all.add("python");
        all.add("javaScript");
        all.add("php");

        Stream<String> stream = all.stream();
        System.out.println(stream.filter((e)->e.contains("java")).count());  // filter 斷言型函式式介面

    }
}

現在我不想要這些資料的個數, 我希望得到哪些資料被進行了篩選, 這時候就可以使用一個收集器來完成.

  • 收集器: <R,A> R collect(Collector<? super T,A,R> collector)
package com.beyond.nothing;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class test {

    public static void main(String[] args) throws Exception {
        List<String> all = new ArrayList<>();
        all.add("java");
        all.add("python");
        all.add("javaScript");
        all.add("php");

        Stream<String> stream = all.stream();
        System.out.println(stream.filter((e)->e.contains("java")).collect(Collectors.toList()));  // filter 斷言型函式式介面

    }
}

收集完的資料依然屬於List 集合, 所以可以直接用 List 接收

Stream 接口裡有兩個重要的操作方法:

  • 設定的取出最大內容:public Stream<T> limit(long maxSize)
  • 跳過的資料量: public Stream<T> skip(long n)
package com.beyond.nothing;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class test {

    public static void main(String[] args) throws Exception {
        List<String> all = new ArrayList<>();
        all.add("java");
        all.add("python");
        all.add("javaScript");
        all.add("php");
        all.add("jsp");
        all.add("html");
        all.add("sso");
        all.add("nginx");

        Stream<String> stream = all.stream();
        List<String> list = stream.skip(2).limit(2)
                .map((s)->s.toUpperCase())    // map 接的是函式式介面
                .collect(Collectors.toList());  // 部分資料量操作
        System.out.println(list);

    }
}

如果要對資料進行分析有可能需要進行分頁的操作形式, 而且使用map()函式還可以進行一些簡單的資料處理操作.

MapReduce 基礎模型

MapReduce 是整個 Stream的核心所在, 之前的所有的操作都是做了一個 MapReduce的襯托, 對於 MapReduce的操作由兩個階段所組成:

  • map() 指的是針對於資料進行先期的操作處理, 例如: 做一些簡單的數學運算
  • reduce() 是進行資料的統計分析

範例: 編寫一個簡單的資料統計操作

定義一個訂單類:

class Order{
    private String title;
    private double price;
    private int amount;

    public Order(String title, double price, int amount) {
        this.title = title;
        this.price = price;
        this.amount = amount;
    }

    public String getTitle() {
        return title;
    }

    public double getPrice() {
        return price;
    }

    public int getAmount() {
        return amount;
    }
}

隨後在 List 集合裡面儲存這些訂單資訊. 肯定會有多個訂單資訊存在.

範例: 實現訂單資訊的儲存, 隨後進行一個總量的統計

package com.beyond.nothing;

import org.omg.CORBA.ORB;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Order{
    private String title;
    private double price;
    private int amount;

    public Order(String title, double price, int amount) {
        this.title = title;
        this.price = price;
        this.amount = amount;
    }

    public String getTitle() {
        return title;
    }

    public double getPrice() {
        return price;
    }

    public int getAmount() {
        return amount;
    }
}
public class test {

    public static void main(String[] args) throws Exception {
       List<Order> all = new ArrayList<>();
       all.add(new Order("手機", 1999.00, 20));
       all.add(new Order("膝上型電腦", 7999.00, 50));
       all.add(new Order("Java 開發", 79.00, 20000));
       all.add(new Order("鉛筆", 1.80, 203000));

        Double allPrice = all.stream().map((obj) -> obj.getPrice() * obj.getAmount()).reduce((sum, x) -> x + sum).get();
        System.out.println(allPrice);

    }
}

在這裡插入圖片描述

為了進一步觀察更加豐富的處理操作, 可以在做一些資料的統計分析, 對於當前的操作, 如果要進行數量的統計, 其結果應該是 double 型資料. 在Stream 接口裡面就提供有一個 map 結果變為 double 型的操作.

  • 統計分析: public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)

此時返回的是一個 DoubleStream 介面的物件, 這裡面就可以完成我們的統計的操作, 其方法如下:

  • 統計方法: public DoubleSummaryStatistics summaryStatistics()
package com.beyond.nothing;

import org.omg.CORBA.ORB;

import java.util.ArrayList;
import java.util.DoubleSummaryStatistics;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Order{
    private String title;
    private double price;
    private int amount;

    public Order(String title, double price, int amount) {
        this.title = title;
        this.price = price;
        this.amount = amount;
    }

    public String getTitle() {
        return title;
    }

    public double getPrice() {
        return price;
    }

    public int getAmount() {
        return amount;
    }
}
public class test {

    public static void main(String[] args) throws Exception {
       List<Order> all = new ArrayList<>();
       all.add(new Order("手機", 1999.00, 20));
       all.add(new Order("膝上型電腦", 7999.00, 50));
       all.add(new Order("Java 開發", 79.00, 20000));
       all.add(new Order("鉛筆", 1.80, 203000));

        DoubleSummaryStatistics dss = all.stream().mapToDouble((obj) -> obj.getPrice() * obj.getAmount())
                .summaryStatistics();

        System.out.println("總量: " + dss.getCount());
        System.out.println("平均值: " + dss.getAverage());
        System.out.println("最大: " + dss.getMax());
        System.out.println("最小: " + dss.getMin());
        System.out.println("總和: " + dss.getSum());

    }
}

利用 Lambda 表示式的確可以簡化不少的程式程式碼.