1. 程式人生 > >jdk8-Collector收集器之並行流陷阱與原理

jdk8-Collector收集器之並行流陷阱與原理

收集器Collector是jdk8中最為重要的介面之一,一個Collector可分為5個部分(第五個是我自己加上的):
1、supplier
2、accumulator
3、combiner
4、finisher
5、characteristics

Collector有三個泛型:

public interface Collector<T, A, R> {
    /**
     * A function that creates and returns a new mutable result container.
     * @return a function which returns a new, mutable result container
     * 容器提供者
     */
Supplier<A> supplier(); /** * A function that folds a value into a mutable result container. * @return a function which folds a value into a mutable result container * 累加操作 */ BiConsumer<A, T> accumulator(); /** * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * @return
a function which combines two partial results into a combined * result * 併發的情況將每個執行緒的中間容器A合併 */
BinaryOperator<A> combiner(); /** * Perform the final transformation from the intermediate accumulation type * {@code A} to the final result type {@code R}. * * <p>If the characteristic {@code
IDENTITY_TRANSFORM} is * set, this function may be presumed to be an identity transform with an * unchecked cast from {@code A} to {@code R}. * * @return a function which transforms the intermediate result to the final * result * 終止操作 */
Function<A, R> finisher(); /** * Returns a {@code Set} of {@code Collector.Characteristics} indicating * the characteristics of this Collector. This set should be immutable. * * @return an immutable set of collector characteristics * 收集器特性 */ Set<Characteristics> characteristics(); /** * Returns a new {@code Collector} described by the given {@code supplier}, * {@code accumulator}, and {@code combiner} functions. The resulting * {@code Collector} has the {@code Collector.Characteristics.IDENTITY_FINISH} * characteristic. * * @param supplier The supplier function for the new collector * @param accumulator The accumulator function for the new collector * @param combiner The combiner function for the new collector * @param characteristics The collector characteristics for the new * collector * @param <T> The type of input elements for the new collector * @param <R> The type of intermediate accumulation result, and final result, * for the new collector * @throws NullPointerException if any argument is null * @return the new {@code Collector} */ public static<T, R> Collector<T, R, R> of(Supplier<R> supplier, BiConsumer<R, T> accumulator, BinaryOperator<R> combiner, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(characteristics); Set<Characteristics> cs = (characteristics.length == 0) ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs); } /** * Returns a new {@code Collector} described by the given {@code supplier}, * {@code accumulator}, {@code combiner}, and {@code finisher} functions. * * @param supplier The supplier function for the new collector * @param accumulator The accumulator function for the new collector * @param combiner The combiner function for the new collector * @param finisher The finisher function for the new collector * @param characteristics The collector characteristics for the new * collector * @param <T> The type of input elements for the new collector * @param <A> The intermediate accumulation type of the new collector * @param <R> The final result type of the new collector * @throws NullPointerException if any argument is null * @return the new {@code Collector} */ public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(finisher); Objects.requireNonNull(characteristics); Set<Characteristics> cs = Collectors.CH_NOID; if (characteristics.length > 0) { cs = EnumSet.noneOf(Characteristics.class); Collections.addAll(cs, characteristics); cs = Collections.unmodifiableSet(cs); } return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs); } /** * Characteristics indicating properties of a {@code Collector}, which can * be used to optimize reduction implementations. */ enum Characteristics { /** * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. */ CONCURRENT, /** * Indicates that the collection operation does not commit to preserving * the encounter order of input elements. (This might be true if the * result container has no intrinsic order, such as a {@link Set}.) */ UNORDERED, /** * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. */ IDENTITY_FINISH } }

Javadoc對前邊四個做了說明:
*

A {@code Collector} is specified by four functions that work together to
* accumulate entries into a mutable result container, and optionally perform
* a final transform on the result. They are:


    *
  • creation of a new result container ({@link #supplier()})
  • *
  • incorporating a new data element into a result container ({@link #accumulator()})
  • *
  • combining two result containers into one ({@link #combiner()})
  • *
  • performing an optional final transform on the container ({@link #finisher()})
  • *

supplier:是一個容器提供者,提供容器A,比如:List list = new ArrayList();
accumulator:是要操作的集合的每個元素以怎樣的形式新增到supplier提供的容器A當中,即做累加操作,比如:List.add(item);
combiner:用於在多執行緒併發的情況下,每個執行緒都有一個supplier和,如果有N個執行緒那麼就有N個supplier提供的容器A,執行的是類似List.addAll(listB)這樣的操作,只有在characteristics沒有被設定成CONCURRENT並且是併發的情況下 才會被呼叫。ps:characteristics被設定成CONCURRENT時,整個收集器只有一個容器,而不是每個執行緒都有一個容器,此時combiner()方法不會被呼叫,這種情況會出現java.util.ConcurrentModificationException異常,此時需要使用執行緒安全的容器作為supplier返回的物件。
finisher:是終止操作,如果收集器的characteristics被設定成IDENTITY_FINISH,那麼會將中間集合A牽制轉換為結果R型別,如果A和R沒有父子之類的繼承關係,會報型別轉換失敗的錯誤,如果收集器的characteristics沒有被設定成IDENTITY_FINISH,那麼finisher()方法會被呼叫,返回結果型別R。

關於列舉Characteristics是用來控制收集器的相關特性,它在Collector介面內部:

 /**
     * Characteristics indicating properties of a {@code Collector}, which can
     * be used to optimize reduction implementations.
     */
    enum Characteristics {
        /**
         * Indicates that this collector is <em>concurrent</em>, meaning that
         * the result container can support the accumulator function being
         * called concurrently with the same result container from multiple
         * threads.
         *如果一個收集器被標記為concurrent特性,那麼accumulator 方法可以被多執行緒併發的的呼叫,並且只使用一個容器A
         * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
         * then it should only be evaluated concurrently if applied to an
         * unordered data source.
         * 如果收集器被標記為concurrent,但是要操作的集合失敗有序的,那麼最終得到的結果不能保證原來的順序
         */
        CONCURRENT,

        /**
         * Indicates that the collection operation does not commit to preserving
         * the encounter order of input elements.  (This might be true if the
         * result container has no intrinsic order, such as a {@link Set}.)
         * 適用於無序的集合
         */
        UNORDERED,

        /**
         * Indicates that the finisher function is the identity function and
         * can be elided.  If set, it must be the case that an unchecked cast
         * from A to R will succeed.
         * 如果收集器特性被設定IDENTITY_FINISH,那麼會強制將中間容器A型別轉換為結果型別R
         */
        IDENTITY_FINISH
    }

下面我們寫一個自定義的收集器瞭解他的原理:

package com.ceaser.jdk8lambda.stream2;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.stream.Collector.Characteristics.CONCURRENT;
import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
import static java.util.stream.Collector.Characteristics.UNORDERED;

/**
 * Created by CeaserWang on 2017/2/27.
 * 此收集器的作用是將Set集合轉換為Map
 */
public class MyCollectorA<T> implements Collector<T,Set<T>,Map<T,T>> {
    @Override
    public Supplier<Set<T>> supplier() {
        System.out.println("supplier invoked...");
        return HashSet::new;//例項化一個存放中間結果的集合Set
    }

    @Override
    public BiConsumer<Set<T>,T> accumulator() {
        System.out.println("accumulator invoked...");
        return (item1,item2) -> {

            /**
             *  *  A a1 = supplier.get();
             *     accumulator.accept(a1, t1);
             *     accumulator.accept(a1, t2);
             *     R r1 = finisher.apply(a1);  // result without splitting
             *
             *     A a2 = supplier.get();
             *     accumulator.accept(a2, t1);
             *     A a3 = supplier.get();
             *     accumulator.accept(a3, t2);
             *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
             */

           // System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());
            item1.add(item2);//將遍歷的每個元素加入到Set當中
        };

    }

    @Override
    public BinaryOperator<Set<T>> combiner() {
        System.out.println("combiner invoked...");
        return (item1,item2) -> {
            item1.addAll(item2);//多執行緒下,集合Set的合併操作
            System.out.println("--------");
            return item1;
        };
    }

    @Override
    public Function<Set<T>,Map<T,T>> finisher() {
        System.out.println("finisher invoked...");
        return (item1) ->{
            Map<T,T> rm = new HashMap<T,T>();
            item1.stream(). forEach( (bean) -> rm.put(bean,bean) );//將Set集合的每個元素加入到新的map之中
          return rm;
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        System.out.println("characteristics invoked...");
        return Collections.unmodifiableSet(EnumSet.of(UNORDERED,CONCURRENT));//支援併發操作,並且是不能保證原始集合的順序。
    }
}

測試類:

/**
 * Created by Ceaser Wang on 2017/2/27.
 */
public class MyCollectorATest {
    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello","world","welcome","helloworld","helloworldA");
        Set<String> set = new HashSet<>();
        set.addAll(list);
        Map<String,String> maped =  set.parallelStream().collect(new MyCollectorA<>());
    }
}

輸入出結果:

characteristics invoked...
supplier invoked...
accumulator invoked...
characteristics invoked...
finisher invoked...

併發陷阱分析:

可以看到收集器的特性被設定成CONCURRENT,並且是parallelStream,執行過程中沒有呼叫combiner()方法。因為只有一個公用的容器沒必要再去掉combiner()合併中間結果。PS:在單執行緒模式下,並且特性設定成CONCURRENT,combiner()會被呼叫。

接下來我們將accumulator的這行註釋放開:

System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());

再去執行,會報錯(如果一次沒有出現,多執行幾次):

Exception in thread "main" java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at com.ceaser.jdk8lambda.stream2.MyCollectorATest.main(MyCollectorATest.java:17)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
    at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
    at java.util.AbstractCollection.toString(AbstractCollection.java:461)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at com.ceaser.jdk8lambda.stream2.MyCollectorA.lambda$accumulator$0(MyCollectorA.java:43)
    at java.util.stream.ReferencePipeline.lambda$collect$1(ReferencePipeline.java:496)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

為什麼呢?
我們只是輸出了中間容器的內容,我們看下這行錯誤資訊對應的程式碼:

    at java.util.AbstractCollection.toString(AbstractCollection.java:461)

對應的是AbstractCollection的toString:

    public String toString() {
        Iterator<E> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            **E e = it.next();**//這樣程式碼是對Set集合進行遍歷
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

E e = it.next();
此行程式碼是對集合進行遍歷,在多執行緒下對未同步的集合同時遍歷和修改操作會導致ConcurrentModificationException這種異常(其他的HashMap多執行緒下回出現死迴圈問題),
為此我們需要替換使用執行緒安全的集合,比如ConcurrentHashMap等等。

IDENTITY_FINISH特性:

開啟collect方法的原始碼我們看到:

    @Override
    @SuppressWarnings("unchecked")
    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

當收集器的特性包含IDENTITY_FINISH特性時,會把收集器內部的中間集合A強制轉換為R(當中間容器型別A和結果型別R不同時,但是又設定了IDENTITY_FINISH特性,那麼會丟擲java.lang.ClassCastException),否則才會呼叫收集器的finisher()方法。

相關推薦

jdk8-Collector收集並行陷阱原理

收集器Collector是jdk8中最為重要的介面之一,一個Collector可分為5個部分(第五個是我自己加上的): 1、supplier 2、accumulator 3、combiner 4、finisher 5、characteristics

jvm垃圾收集Throughput GC

sep ash http 不錯 而已 代碼 是我 ont dap 呃。HotSpot VM的GC組老人之一Jon Masamitsu很久之前就寫過blog講解這個:https://blogs.oracle.com/jonthecollector/entry/our_coll

JAVA8新特性並行序列

一:簡介                         二:例項說明       1:計算0-10億的和,我們比較使用fork/join和普通for迴圈的花費時間?                                                   效

JAVA垃圾收集Parallel Scavenge收集

Parallel Scavenge收集器是JAVA虛擬機器中垃圾收集器的一種。和ParNew收集器類似,是一個新生代收集器。使用複製演算法的並行多執行緒收集器。 1、特點 Parallel Scavenge收集器的關注點與其他收集器不同, ParallelScavenge

Java垃圾回收精粹——序列收集並行收集以及併發標記清理收集(CMS)

序列收集器(Serial Collector) 序列收集器是最簡單的收集器,對於單處理器系統真是絕佳上選。當然,它也是所有收集器裡面最不常用的。序列收集器使用一個單獨的執行緒進行收集,不管是次要收集還是主要收集。在年老區中分配的物件使用一個簡單的凹凸指標演算法(bump

Collector收集

/**  * @author Andypan  * @CollectorTest.java  * @{describe}  * @date 2017年8月8日 下午3:36:02  */ public class CollectorTest { @Test publ

Fork/Join框架Java8 Stream API 並行的速度比較

  Fork/Join 框架有特定的ExecutorService和執行緒池構成。ExecutorService可以執行任務,並且這個任務會被分解成較小的任務,它們從執行緒池中被fork(被不同的執行緒執行)出來,在join(即它的所有的子任務都完成了)之前會一直等待。   Fork/Join 使用了任務竊

BeetleX服務閘道和快取

限流和快取是閘道器中兩個非常重要的功能,前者是保障服務更可靠地執行,後者則可以大大提高應用的吞吐能力。Beetlex.Bumblebee微服務閘道器提供了兩個擴充套件外掛來實現這兩個功能,分別是BeetleX.Bumblebee.ConcurrentLimits和BeetleX.Bumblebee.Cachi

Squid緩存服務ACL訪問控制反向代理

滿足 大小寫 別名 字母表 星期 80端口 process con 方式 Squid緩存服務器之ACL訪問控制與反向代理 1、ACL訪問控制 Squid提供了強大的代理控制機制和,通過合理設置ACL並進行限制。可以針對原地址、目標地址、訪問的URL路徑、訪問的時間等各種條

Mybatis攔截資料許可權過濾分頁整合

解決方案之改SQL 原sql SELECT a.id AS "id", a.NAME AS "name", a.sex_cd AS "sexCd", a.org_id AS "orgId", a.STATUS AS "status", a.create_org_id AS "createOrgId"

工欲善其事,必先利其—使用sdkman安裝管理gradle版本

SDKMAN sdkman 安裝SDKMAN 在終端執行如下命令 curl -s "https://get.sdkman.io" | bash 注:安裝成功後出現如下提示,開啟一個新終端可以執行sdkman或在當前終端執行提示的命令後可以使用sdkm

Java ApiIO輸入輸出

1輸入與輸出 input 某個裝置或者環境進行資料的輸入 output 某個裝置或者環境進行資料的輸出 在java中,java將輸入輸出抽象為流物件(IO流)來解決,我們以檔案裝置來解釋:  輸入流 從檔案讀取資料或者從鍵盤輸入資料 輸出流

靜態分析資料分析 SSA 入門 (二)

什麼是靜態單賦值 SSA SSA 是 static single assignment 的縮寫,也就是靜態單賦值形式。顧名思義,就是每個變數只有唯一的賦值。 以下圖為例,左圖是原始程式碼,裡面有分支, y 變數在不同路徑中有不同賦值,最後列印 y 的值。右圖是等價的 SS

Linux CFS排程虛擬時鐘vruntime排程延遲--Linux程序的管理排程(二十六)

CFS負責處理普通非實時程序, 這類程序是我們linux中最普遍的程序, 今天我們把注意力轉向CFS的虛擬時鐘 1 前景回顧 1.1 CFS排程器類 Linux核心使用CFS是來排程我們最常見的普通程序, 其所屬排程器類為fai

工欲善其事,必先利其—MAC下安裝配置emacs

安裝應用 mac os下安裝emacs有如下方式 * 通過homebrew安裝 brew install emacs --with-cocoa sudo port install emacs-app 注:這裡建議於第三種方式安裝,根據以

鏈接Map文件符號表

must 不可 group 程序 separate 可見 多個 -m ada 一、map、全局符號及靜態符號一般的大型工程都會在生成可執行文件的同時讓鏈接器生成一個map文件,從而大致查看一下可執行文件中符號的內存布局以及從哪裏引入可執行文件。這個通常對於小型工程是作用不大

AndroidSharedPreferences詳解原理分析

SharedPreferences作為Android儲存資料方式之一,主要特點是: 1. 只支援Java基本資料型別,不支援自定義資料型別; 2. 應用內資料共享; 3. 使用簡單. 使用方法 1、存資料 SharedPreferenc

Activiti工作--並行閘道--

流程的業務描述 會議記錄會籤 並行閘道器是不需要設定流程變數的,並行閘道器不在流程變數的範圍內 比如: 在開完某個產品設計會以後,需要對會議約定一些事項進行簽字畫押涉及到兩個部門(產品部/研發部)的主管和經理 確認的順序: a:產品部的主管確認然後產品部的經

【深入Java虛擬機器】六:GC收集以及JDK7,JDK8中JVM記憶體變化

Java與C++之間有一堵由記憶體動態分配和垃圾收集技術所圍成的“高牆”,牆外面的人想進去,牆裡面的人卻想出來。 GC收集器 如果說收集演算法是記憶體回收的方法論,那麼垃圾收集器就是記憶體回收的具體實現。 Java虛擬機器規範中對垃圾收集器應該如何實現並沒有任何規定,因此不同的廠

Java8的Stream收集Collector詳解

流Stream 概述 Stream可以說是java8的一大亮點。java8中的Stream對集合功能進行了增強。在日常開發中,我們免不了要經常對集合物件進行處理,而在java8以前,對於集合的處理完全是由我們自己來操作,所以程式碼看起來相對繁雜。而有了S