[轉載]Java StreamAPI 詳解
原文地址:https://blog.csdn.net/chenshun123/article/details/78132808
Java8中有兩大最為重要的改變。第一個是 Lambda 表達式;另外一個則是 Stream API(java.util.stream.*)。Stream 是 Java8 中處理集合的關鍵抽象概念,它可以指定對集合進行的操作,可以執行非常復雜的查找、過濾和映射數據等操作。使用Stream API 對集合數據進行操作,就類似於使用 SQL 執行的數據庫查詢。也可以使用 Stream API 來並行執行操作。簡而言之,Stream API 提供了一種高效且易於使用的處理數據的方式.
流(Stream)是數據渠道,用於操作數據源 (集合、數組等) 所生成的元素序列
集合講的是數據,流講的是計算
- 註 : 1> Stream 自己不會存儲元素
- 2> Stream 不會改變源對象。相反,他們會返回一個持有結果的新Stream
- 3> Stream 操作是延遲執行的。這意味著他們會等到需要結果的時候才執行
Stream 的操作三個步驟
- 1> 創建 Stream : 一個數據源 (如 : 集合、數組), 獲取一個流
- 2> 中間操作 : 一個中間操作鏈,對數據源的數據進行處理
- 3> 終止操作(終端操作) : 一個終止操作,執行中間操作鏈,並產生結果
創建 Stream
//Java8 中的 Collection 接口被擴展,提供兩個獲取流的方法 :
default Stream<E> stream() //: 返回一個順序流
default Stream<E> parallelStream()// : 返回一個並行流
// Collection 提供了兩個方法 stream() 與 parallelStream()
List<String> list = new ArrayList<>();
Stream<String> stream = list.stream(); // 獲取一個順序流
Stream<String> parallelStream = list.parallelStream(); // 獲取一個並行流
//由數組創建流
//Java8 中的 Arrays 的靜態方法 stream() 可以獲取數組流 :
static <T> Stream<T> stream(T[] array) : //返回一個流
//重載形式,能夠處理對應基本類型的數組 :
public static IntStream stream(int[] array)
public static LongStream stream(long[] array)
public static DoubleStream stream(double[] array)
// 通過 Arrays 中的 stream() 獲取一個數組流
Stream<Integer> stream1 = Arrays.stream(new Integer[10]);
//由值創建流
//可以使用靜態方法 Stream.of(), 通過顯示值創建一個流,它可以接收任意數量的參數
public static<T> Stream<T> of(T... values) : 返回一個流
// 通過 Stream 類中靜態方法 of()
Stream<Integer> stream2 = Stream.of(1, 2, 3, 4, 5, 6);
//註 : Stream.of靜態方法 底層就是 Arrays.stream 靜態方法
//由函數創建流 : 創建無限流
//可以使用靜態方法 Stream.iterate() 和 Stream.generate(), 創建無限流叠代
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
//生成
public static<T> Stream<T> generate(Supplier<T> s)
// 創建無限流
// 叠代
Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);
stream3.forEach(System.out::println);
// 生成
Stream<Double> stream4 = Stream.generate(Math::random).limit(2);
stream4.forEach(System.out::println);
Stream 的中間操作
多個中間操作可以連接起來形成一個流水線,除非流水線上觸發終止操作,否則中間操作不會執行任何的處理。而在終止操作時一次性全部處理,稱為“惰性求值”
篩選與切片
方 法 | 描 述 |
filter(Predicate p) | 接收 Lambda , 從流中排除某些元素 |
distinct() | 篩選,通過流所生成元素的 hashCode() 和 equals() 去除重復元素 |
limit(long maxSize) | 截斷流,使其元素不超過給定數量 |
skip(long n) | 跳過元素,返回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則返回一個空流。與 limit(n) 互補 |
// 內部叠代 : 叠代操作 Stream API 內部完成
// 所有的中間操作不會做任何的處理
Stream<Employee> stream = emps.stream().filter((e) -> e.getAge() <= 35);
// 只有當做終止操作時,所有的中間操作會一次性的全部執行,稱為“惰性求值”
stream.forEach(System.out::println);
// 外部叠代
Iterator<Employee> it = emps.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
emps.stream().filter(e -> e.getSalary() >= 5000).limit(3).forEach(System.out::println);
emps.parallelStream().filter((e) -> e.getSalary() >= 5000).skip(2).forEach(System.out::println);
emps.stream().distinct().forEach(System.out::println);
映射
方 法 | 描 述 |
map(Function f) | 接收一個函數作為參數,該函數會被應用到每個元素上,並將其映射成一個新的元素 |
mapToDouble(ToDoubleFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 DoubleStream |
mapToInt(ToIntFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 IntStream |
mapToLong(ToLongFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 LongStream |
flatMap(Function f) | 接收一個函數作為參數,將流中的每個值都換成另一個流,然後把所有流連接成一個流 |
Stream<String> str = emps.stream().map(Employee::getName);
List<String> strList = Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee");
Stream<String> stream1 = strList.stream().map(String::toUpperCase);
stream1.forEach(System.out::println);
public static Stream<Character> filterCharacter(String str) {
List<Character> list = new ArrayList<>();
for (Character ch : str.toCharArray()) {
list.add(ch);
}
return list.stream();
}
Stream<Stream<Character>> stream2 = strList.stream().map(TestStreamaAPI2::filterCharacter);
stream2.forEach(sm -> sm.forEach(System.out::println));
Stream<Character> stream3 = strList.stream().flatMap(TestStreamaAPI2::filterCharacter);
stream3.forEach(System.out::println);
給定一個數字列表,如何返回一個由每個數的平方構成的列表呢?(給定[1,2,3,4,5], 應該返回[1,4,9,16,25])
Integer[] nums = new Integer[]{1, 2, 3, 4, 5};
Arrays.stream(nums).map((x) -> x * x).forEach(System.out::println);
排序
方 法 | 描 述 |
sorted() | 產生一個新流,其中按自然順序排序 |
sorted(Comparator comp) | 產生一個新流,其中按比較器順序排序 |
emps.stream().map(Employee::getName).sorted().forEach(System.out::println);
emps.stream().sorted((x, y) -> {
if (x.getAge() == y.getAge()) {
return x.getName().compareTo(y.getName());
} else {
return Integer.compare(x.getAge(), y.getAge());
}
}).forEach(System.out::println);
Stream 的終止操作
終端操作會從流的流水線生成結果,其結果可以是任何不是流的值,例如 : List、 Integer,甚至是 void
查找與匹配
方 法 | 描 述 |
allMatch(Predicate p) | 檢查是否匹配所有元素 |
anyMatch(Predicate p) | 檢查是否至少匹配一個元素 |
noneMatch(Predicate p) | 檢查是否沒有匹配所有元素 |
findFirst() | 返回第一個元素 |
findAny() | 返回當前流中的任意元素 |
count() | 返回流中元素總數 |
max(Comparator c) | 返回流中最大值 |
min(Comparator c) | 返回流中最小值 |
forEach(Consumer c) | 內部叠代(使用 Collection 接口需要用戶去做叠代,稱為外部叠代。相反, Stream API 使用內部叠代) |
boolean bl = emps.stream().allMatch(e -> e.getStatus().equals(Status.BUSY));
System.out.println(bl);
boolean bl1 = emps.stream().anyMatch(e -> e.getStatus().equals(Status.BUSY));
System.out.println(bl1);
boolean bl2 = emps.stream().noneMatch(e -> e.getStatus().equals(Status.BUSY));
System.out.println(bl2);
Optional<Employee> op1 = emps.stream().sorted((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())).findFirst();
System.out.println(op1.get());
Optional<Employee> op2 = emps.parallelStream().filter((e) -> e.getStatus().equals(Status.FREE)).findAny();
System.out.println(op2.get());
long count = emps.stream().filter((e) -> e.getStatus().equals(Status.FREE)).count();
System.out.println(count);
Optional<Double> op = emps.stream().map(Employee::getSalary).max(Double::compare);
System.out.println(op.get());
Optional<Employee> op2 = emps.stream().min((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary()));
System.out.println(op2.get());
//註 : 流進行了終止操作後,不能再次使用
Stream<Employee> stream = emps.stream().filter(e -> e.getStatus().equals(Status.FREE));
long count = stream.count();
System.out.println(count);
stream.map(Employee::getSalary).max(Double::compare);
歸約
方 法 | 描 述 |
reduce(T iden, BinaryOperator b) | 可以將流中元素反復結合起來,得到一個值,返回 T |
reduce(BinaryOperator b) | 可以將流中元素反復結合起來,得到一個值,返回 Optional<T> |
//註 : map 和 reduce 的連接通常稱為 map-reduce 模式,因 Google 用它來進行網絡搜索而出名
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Integer sum = list.stream().reduce(0, (x, y) -> x + y);
System.out.println(sum);
Optional<Double> op = emps.stream().map(Employee::getSalary).reduce(Double::sum);
System.out.println(op.get());
//需求 : 搜索名字中 “六” 出現的次數
Optional<Integer> sum = emps.stream()
.map(Employee::getName)
.flatMap(TestStreamaAPI2::filterCharacter)
.map((ch) -> {
if (ch.equals(‘六‘)) {
return 1;
} else {
return 0;
}
}).reduce(Integer::sum);
System.out.println(sum.get());
收集
方 法 | 描 述 |
collect(Collector c) | 將流轉換為其他形式。接收一個 Collector接口的實現,用於給Stream中元素做匯總的方法 |
Collector 接口中方法的實現決定如何對流執行收集操作(如收集到 List、 Set、 Map),但是 Collectors 實用類提供了很多靜態方法,可以方便地創建常見收集器實例, 具體方法與實例如下表 :
方法 | 返回類型 | 作用 |
toList | List<T> | 把流中元素收集到List |
List<Employee> emps = list.stream().collect(Collectors.toList()); | ||
toSet | Set<T> | 把流中元素收集到Set |
Set<Employee> emps = list.stream().collect(Collectors.toSet()); | ||
toCollection | Collection<T> | 把流中元素收集到創建的集合 |
Collection<Employee> emps = list.stream().collect(Collectors.toCollection(ArrayList::new)); | ||
counting | Long | 計算流中元素的個數 |
long count = list.stream().collect(Collectors.counting()); | ||
summingInt | Integer | 對流中元素的整數屬性求和 |
int total = list.stream().collect(Collectors.summingInt(Employee::getSalary)); | ||
averagingInt | Double | 計算流中元素Integer屬性的平均值 |
double avg = list.stream().collect(Collectors.averagingInt(Employee::getSalary)); | ||
summarizingInt | IntSummaryStatistics | 收集流中Integer屬性的統計值,如 : 平均值 |
Int SummaryStatisticsiss = list.stream().collect(Collectors.summarizingInt(Employee::getSalary)); | ||
joining | String | 連接流中每個字符串 |
String str = list.stream().map(Employee::getName).collect(Collectors.joining()); | ||
maxBy | Optional<T> | 根據比較器選擇最大值 |
Optional<Emp> max = list.stream().collect(Collectors.maxBy(comparingInt(Employee::getSalary))); | ||
minBy | Optional<T> | 根據比較器選擇最小值 |
Optional<Emp> min = list.stream().collect(Collectors.minBy(comparingInt(Employee::getSalary))); | ||
reducing | 歸約產生的類型 | 從一個作為累加器的初始值開始,利用BinaryOperator與流中元素逐個結合,從而歸約成單個值 |
int total = list.stream().collect(Collectors.reducing(0, Employee::getSalar, Integer::sum)); | ||
collectingAndThen | 轉換函數返回的類型 | 包裹另一個收集器,對其結果轉換函數 |
int how = list.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size)); | ||
groupingBy | Map<K, List<T>> | 根據某屬性值對流分組,屬性為K,結果為V |
Map<Emp.Status, List<Emp>> map = list.stream().collect(Collectors.groupingBy(Employee::getStatus)); | ||
partitioningBy | Map<Boolean, List<T>> | 根據true或false進行分區 |
Map<Boolean,List<Emp>>vd= list.stream().collect(Collectors.partitioningBy(Employee::getManage)); |
List<String> list = emps.stream().map(Employee::getName).collect(Collectors.toList());
list.forEach(System.out::println);
Set<String> set = emps.stream().map(Employee::getName).collect(Collectors.toSet());
set.forEach(System.out::println);
HashSet<String> hs = emps.stream().map(Employee::getName).collect(Collectors.toCollection(HashSet::new));
hs.forEach(System.out::println);
Optional<Double> max = emps.stream().map(Employee::getSalary).collect(Collectors.maxBy(Double::compare));
System.out.println(max.get());
Optional<Employee> op = emps.stream().collect(Collectors.minBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));
System.out.println(op.get());
Double sum = emps.stream().collect(Collectors.summingDouble(Employee::getSalary));
System.out.println(sum);
Double avg = emps.stream().collect(Collectors.averagingDouble(Employee::getSalary));
System.out.println(avg);
Long count = emps.stream().collect(Collectors.counting());
System.out.println(count);
DoubleSummaryStatistics dss = emps.stream().collect(Collectors.summarizingDouble(Employee::getSalary));
System.out.println(dss.getMax());
// 分組
Map<Status, List<Employee>> map = emps.stream().collect(Collectors.groupingBy(Employee::getStatus));
System.out.println(map);
// 多級分組
Map<Status, Map<String, List<Employee>>> map = emps.stream()
.collect(Collectors.groupingBy(Employee::getStatus, Collectors.groupingBy((e) -> {
if (e.getAge() >= 60) {
return "老年";
} else if (e.getAge() >= 35) {
return "中年";
} else {
return "成年";
}
})));
System.out.println(map);
String str = emps.stream().map(Employee::getName)
.collect(Collectors.joining(",", "----", "----"));
System.out.println(str);
Optional<Double> sum = emps.stream().map(Employee::getSalary).collect(Collectors.reducing(Double::sum));
System.out.println(sum.get());
實例 :
//交易員類
public class Trader {
private String name;
private String city;
public Trader() {
}
// ...
@Override
public String toString() {
return "Trader [name=" + name + ", city=" + city + "]";
}
}
//交易類
public class Transaction {
private Trader trader;
private int year;
private int value;
public Transaction() {
}
public Transaction(Trader trader, int year, int value) {
this.trader = trader;
this.year = year;
this.value = value;
}
// ...
@Override
public String toString() {
return "Transaction [trader=" + trader + ", year=" + year + ", value=" + value + "]";
}
}
//初始化數據
List<Transaction> transactions = null;
@Before
public void before() {
Trader raoul = new Trader("Raoul", "Cambridge");
Trader mario = new Trader("Mario", "Milan");
Trader alan = new Trader("Alan", "Cambridge");
Trader brian = new Trader("Brian", "Cambridge");
transactions = Arrays.asList(
new Transaction(brian, 2011, 300),
new Transaction(raoul, 2012, 1000),
new Transaction(raoul, 2011, 400),
new Transaction(mario, 2012, 710),
new Transaction(mario, 2012, 700),
new Transaction(alan, 2012, 950)
);
}
//1. 找出2011年發生的所有交易, 並按交易額排序(從低到高)
transactions.stream().filter((t) -> t.getYear() == 2011).sorted((t1, t2) -> Integer.compare(t1.getValue(), t2.getValue())).forEach(System.out::println);
//2. 交易員都在哪些不同的城市工作過
transactions.stream().map((t) -> t.getTrader().getCity()).distinct().forEach(System.out::println);
//3. 查找所有來自劍橋的交易員,並按姓名排序
transactions.stream().filter((t) -> t.getTrader().getCity().equals("Cambridge")).map(Transaction::getTrader)
.sorted(Comparator.comparing(Trader::getName)).distinct().forEach(System.out::println);
//4. 返回所有交易員的姓名字符串,按字母順序排序
transactions.stream().map((t) -> t.getTrader().getName()).sorted().forEach(System.out::println);
String str = transactions.stream().map((t) -> t.getTrader().getName()).sorted().reduce("", String::concat);
System.out.println(str);
public static Stream<String> filterCharacter(String str) {
List<String> list = new ArrayList<>();
for (Character ch : str.toCharArray()) {
list.add(ch.toString());
}
return list.stream();
}
transactions.stream().map((t) -> t.getTrader().getName()).flatMap(TestTransaction::filterCharacter).sorted(String::compareToIgnoreCase).forEach(System.out::print);
//5. 有沒有交易員是在米蘭工作的
boolean bl = transactions.stream().anyMatch((t) -> t.getTrader().getCity().equals("Milan"));
System.out.println(bl);
//6. 打印生活在劍橋的交易員的所有交易額
Optional<Integer> sum = transactions.stream().filter((e) -> e.getTrader().getCity().equals("Cambridge"))
.map(Transaction::getValue).reduce(Integer::sum);
System.out.println(sum.get());
//7. 所有交易中,最高的交易額是多少
Optional<Integer> max = transactions.stream().map(Transaction::getValue).max(Integer::compare);
System.out.println(max.get());
//8. 找到交易額最小的交易
Optional<Transaction> op = transactions.stream().min((t1, t2) -> Integer.compare(t1.getValue(), t2.getValue()));
System.out.println(op.get());
並行流與串行流
並行流就是把一個內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。Java 8 中將並行進行了優化,可以很容易的對數據進行並行操作。 Stream API 可以聲明性地通過 parallel() 與 sequential() 在並行流與順序流之間進行切換
Fork/Join 框架
Fork/Join 框架 : 就是在必要的情況下,將一個大任務,進行拆分(fork)成若幹個小任務 (拆到不可再拆時),再將一個個的小任務運算的結果進行 join 匯總
Fork/Join 框架與傳統線程池的區別
采用 “工作竊取”模式 (work-stealing) : 當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線程隊列中,然後再從一個隨機線程的隊列中偷一個並把它放在自己的隊列中
相對於一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上。在一般的線程池中,如果一個線程正在執行的任務由於某些原因無法繼續運行,那麽該線程會處於等待狀態。而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續運行。那麽處理該子問題的線程會主動尋找其他尚未運行的子問題來執行。或者當線程任務完成速度快,就會隨機抽取其它未完成任務的進程中的最後一個任務進行計算操作。這種方式減少了線程的等待時間,提高了性能
普通 for(最慢,數據量越大CPU使用率低,速度越慢)
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 0L; i <= 10000000000L; i++) {
sum += i;
}
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("耗費的時間為: " + (end - start)); //34-3174-3132-4227-4223-31583
ForkJoin框架(比較快)
public class ForkJoinCalculate extends RecursiveTask<Long> {
private static final long serialVersionUID = 13475679780L;
private long start;
private long end;
private static final long THRESHOLD = 10000L; //臨界值
public ForkJoinCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
left.fork(); //拆分,並將該子任務壓入線程隊列
ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
long sum = pool.invoke(task);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("耗費的時間為: " + (end - start)); //112-1953-1988-2654-2647-20663-113808
Java8 並行流(底層使用ForkJoin框架,速度最快 CPU使用率可以達到 100%)
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10000000000L).parallel().sum();
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("耗費的時間為: " + (end - start)); //2061-2053-2086-18926
[轉載]Java StreamAPI 詳解