1. 程式人生 > >在使用Java8並行流時的問題分析

在使用Java8並行流時的問題分析

pri 打印 串行 imp arrays 一起學 copy oid comm

最近在使用Java8的並行流時遇到了坑,線上排查問題時花了較多時間,分享出來與大家一起學習與自查

// 此處為坑
List<Java8Demo> copy = Lists.newArrayList();
numbers.parallelStream().forEach(item -> {
    copy.add(new Java8Demo(item));
});

上圖用到了parallelStrem並行流,在循環內部往共享變量copy內寫值,由於ArrayList本身不具備線程安全性,導致得到的copy內容有缺失。

總結經驗如下:

  1. 在並行流內部不能對外部共享變量做寫操作
  2. 如有需要,使用收集器實現上述並行流,收集器在內部即使使用ArrayList,也不會造成問題!

提供兩種解決方案:

  • 串行
    // stream串行
    List<Java8Demo> copy = Lists.newArrayList();
    numbers.stream().forEach(item -> {
        copy.add(new Java8Demo(item));
    });
  • 收集器
    // 並行使用收集器
    List<Java8Demo> copy = numbers.parallelStream().map(Java8Demo::new
    ).collect(Collectors.toList());

可運行Demo.java

package acc.biz.impl;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class Demo {

    private Integer value;

    public Demo(Integer value) {
        
this.value = value; } public static List<Integer> numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); public static void main(String[] args) { /** parallelStream並行 */ int count1 = 1; while (count1 < 100) { // 此處為坑 List<Demo> copy = Lists.newArrayList(); numbers.parallelStream().forEach(item -> { copy.add(new Demo(item)); }); // 打印錯誤 if (copy.size() != numbers.size()) { System.out.println( new StringBuilder().append("parallelStream循環第").append(count1).append("次報錯,numbers.size: [") .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]")); break; } count1++; } /** stream串行 */ int count2 = 1; while (count2 < 100) { // stream串行 List<Demo> copy = Lists.newArrayList(); numbers.stream().forEach(item -> { copy.add(new Demo(item)); }); // 打印錯誤 if (copy.size() != numbers.size()) { System.out.println(new StringBuilder().append("stream循環第").append(count2).append("次報錯,numbers.size: [") .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]")); break; } count2++; } /** Collectors並行 */ int count3 = 1; while (count3 < 100) { // 並行使用收集器 List<Demo> copy = numbers.parallelStream().map(Demo::new).collect(Collectors.toList()); // 打印錯誤 if (copy.size() != numbers.size()) { System.out.println( new StringBuilder().append("Collectors循環第").append(count3).append("次報錯,numbers.size: [") .append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]")); break; } count3++; } } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }

在使用Java8並行流時的問題分析