public interface Collector<T, A, R> {
     * A function that creates and returns a new mutable result container.
     * @return a function which returns a new, mutable result container
     * 容器提供者
Supplier<A> supplier(); /** * A function that folds a value into a mutable result container. * @return a function which folds a value into a mutable result container * 累加操作 */ BiConsumer<A, T> accumulator(); /** * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * @return
a function which combines two partial results into a combined * result * 併發的情況將每個執行緒的中間容器A合併 */
BinaryOperator<A> combiner(); /** * Perform the final transformation from the intermediate accumulation type * {@code A} to the final result type {@code R}. * * <p>If the characteristic {@code
IDENTITY_TRANSFORM} is * set, this function may be presumed to be an identity transform with an * unchecked cast from {@code A} to {@code R}. * * @return a function which transforms the intermediate result to the final * result * 終止操作 */
Function<A, R> finisher(); /** * Returns a {@code Set} of {@code Collector.Characteristics} indicating * the characteristics of this Collector. This set should be immutable. * * @return an immutable set of collector characteristics * 收集器特性 */ Set<Characteristics> characteristics(); /** * Returns a new {@code Collector} described by the given {@code supplier}, * {@code accumulator}, and {@code combiner} functions. The resulting * {@code Collector} has the {@code Collector.Characteristics.IDENTITY_FINISH} * characteristic. * * @param supplier The supplier function for the new collector * @param accumulator The accumulator function for the new collector * @param combiner The combiner function for the new collector * @param characteristics The collector characteristics for the new * collector * @param <T> The type of input elements for the new collector * @param <R> The type of intermediate accumulation result, and final result, * for the new collector * @throws NullPointerException if any argument is null * @return the new {@code Collector} */ public static<T, R> Collector<T, R, R> of(Supplier<R> supplier, BiConsumer<R, T> accumulator, BinaryOperator<R> combiner, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(characteristics); Set<Characteristics> cs = (characteristics.length == 0) ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs); } /** * Returns a new {@code Collector} described by the given {@code supplier}, * {@code accumulator}, {@code combiner}, and {@code finisher} functions. * * @param supplier The supplier function for the new collector * @param accumulator The accumulator function for the new collector * @param combiner The combiner function for the new collector * @param finisher The finisher function for the new collector * @param characteristics The collector characteristics for the new * collector * @param <T> The type of input elements for the new collector * @param <A> The intermediate accumulation type of the new collector * @param <R> The final result type of the new collector * @throws NullPointerException if any argument is null * @return the new {@code Collector} */ public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(finisher); Objects.requireNonNull(characteristics); Set<Characteristics> cs = Collectors.CH_NOID; if (characteristics.length > 0) { cs = EnumSet.noneOf(Characteristics.class); Collections.addAll(cs, characteristics); cs = Collections.unmodifiableSet(cs); } return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs); } /** * Characteristics indicating properties of a {@code Collector}, which can * be used to optimize reduction implementations. */ enum Characteristics { /** * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. */ CONCURRENT, /** * Indicates that the collection operation does not commit to preserving * the encounter order of input elements. (This might be true if the * result container has no intrinsic order, such as a {@link Set}.) */ UNORDERED, /** * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. */ IDENTITY_FINISH } }


A {@code Collector} is specified by four functions that work together to
* accumulate entries into a mutable result container, and optionally perform
* a final transform on the result. They are:

  • creation of a new result container ({@link #supplier()})
  • *
  • incorporating a new data element into a result container ({@link #accumulator()})
  • *
  • combining two result containers into one ({@link #combiner()})
  • *
  • performing an optional final transform on the container ({@link #finisher()})
  • *

supplier:是一個容器提供者,提供容器A,比如:List list = new ArrayList();
combiner:用於在多執行緒併發的情況下,每個執行緒都有一個supplier和,如果有N個執行緒那麼就有N個supplier提供的容器A,執行的是類似List.addAll(listB)這樣的操作,只有在characteristics沒有被設定成CONCURRENT並且是併發的情況下 才會被呼叫。ps:characteristics被設定成CONCURRENT時,整個收集器只有一個容器,而不是每個執行緒都有一個容器,此時combiner()方法不會被呼叫,這種情況會出現java.util.ConcurrentModificationException異常,此時需要使用執行緒安全的容器作為supplier返回的物件。


     * Characteristics indicating properties of a {@code Collector}, which can
     * be used to optimize reduction implementations.
    enum Characteristics {
         * Indicates that this collector is <em>concurrent</em>, meaning that
         * the result container can support the accumulator function being
         * called concurrently with the same result container from multiple
         * threads.
         *如果一個收集器被標記為concurrent特性,那麼accumulator 方法可以被多執行緒併發的的呼叫,並且只使用一個容器A
         * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
         * then it should only be evaluated concurrently if applied to an
         * unordered data source.
         * 如果收集器被標記為concurrent,但是要操作的集合失敗有序的,那麼最終得到的結果不能保證原來的順序

         * Indicates that the collection operation does not commit to preserving
         * the encounter order of input elements.  (This might be true if the
         * result container has no intrinsic order, such as a {@link Set}.)
         * 適用於無序的集合

         * Indicates that the finisher function is the identity function and
         * can be elided.  If set, it must be the case that an unchecked cast
         * from A to R will succeed.
         * 如果收集器特性被設定IDENTITY_FINISH,那麼會強制將中間容器A型別轉換為結果型別R


package com.ceaser.jdk8lambda.stream2;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.stream.Collector.Characteristics.CONCURRENT;
import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;
import static java.util.stream.Collector.Characteristics.UNORDERED;

 * Created by CeaserWang on 2017/2/27.
 * 此收集器的作用是將Set集合轉換為Map
public class MyCollectorA<T> implements Collector<T,Set<T>,Map<T,T>> {
    public Supplier<Set<T>> supplier() {
        System.out.println("supplier invoked...");
        return HashSet::new;//例項化一個存放中間結果的集合Set

    public BiConsumer<Set<T>,T> accumulator() {
        System.out.println("accumulator invoked...");
        return (item1,item2) -> {

             *  *  A a1 = supplier.get();
             *     accumulator.accept(a1, t1);
             *     accumulator.accept(a1, t2);
             *     R r1 = finisher.apply(a1);  // result without splitting
             *     A a2 = supplier.get();
             *     accumulator.accept(a2, t1);
             *     A a3 = supplier.get();
             *     accumulator.accept(a3, t2);
             *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting

           // System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());


    public BinaryOperator<Set<T>> combiner() {
        System.out.println("combiner invoked...");
        return (item1,item2) -> {
            return item1;

    public Function<Set<T>,Map<T,T>> finisher() {
        System.out.println("finisher invoked...");
        return (item1) ->{
            Map<T,T> rm = new HashMap<T,T>();
            item1.stream(). forEach( (bean) -> rm.put(bean,bean) );//將Set集合的每個元素加入到新的map之中
          return rm;

    public Set<Characteristics> characteristics() {
        System.out.println("characteristics invoked...");
        return Collections.unmodifiableSet(EnumSet.of(UNORDERED,CONCURRENT));//支援併發操作,並且是不能保證原始集合的順序。


 * Created by Ceaser Wang on 2017/2/27.
public class MyCollectorATest {
    public static void main(String[] args) {
        List<String> list = Arrays.asList("hello","world","welcome","helloworld","helloworldA");
        Set<String> set = new HashSet<>();
        Map<String,String> maped =  set.parallelStream().collect(new MyCollectorA<>());


characteristics invoked...
supplier invoked...
accumulator invoked...
characteristics invoked...
finisher invoked...




System.out.println("current thread : "+item1+" , "+Thread.currentThread().getName());


Exception in thread "main" java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at com.ceaser.jdk8lambda.stream2.MyCollectorATest.main(MyCollectorATest.java:17)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
    at java.util.HashMap$KeyIterator.next(HashMap.java:1461)
    at java.util.AbstractCollection.toString(AbstractCollection.java:461)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at com.ceaser.jdk8lambda.stream2.MyCollectorA.lambda$accumulator$0(MyCollectorA.java:43)
    at java.util.stream.ReferencePipeline.lambda$collect$1(ReferencePipeline.java:496)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)


    at java.util.AbstractCollection.toString(AbstractCollection.java:461)


    public String toString() {
        Iterator<E> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        for (;;) {
            **E e = it.next();**//這樣程式碼是對Set集合進行遍歷
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');

E e = it.next();



    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);




收集器Collector是jdk8中最為重要的介面之一,一個Collector可分為5個部分(第五個是我自己加上的): 1、supplier 2、accumulator 3、combiner 4、finisher 5、characteristics

