1. 程式人生 > >futureTask設計、場景、及高併發下應用

futureTask設計、場景、及高併發下應用

參考文章

FutureTask的cancel方法真的能停止掉一個正在執行的非同步任務嗎

FutureTask的用法及兩種常用的使用場景

https://blog.csdn.net/linchunquan/article/details/22382487

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class FutureTaskP<V> implements RunnableFuture<V> {

    private AtomicInteger             state;
    private static final int          NEW          = 0;
    private static final int          NORMAL       = 2;
    private static final int          CANCELLED    = 4;
    private static final int          INTERRUPTING = 5;
    private static final int          INTERRUPTED  = 6;

    /** 執行任務 */
    private Callable<V>               callable;
    /** 結果集 */
    private Object                    outcome;         // non-volatile, protected by state reads/writes
    /** FutureTask執行緒物件 */
    private AtomicReference<Thread>   runner;

    /** 等待節點 */
    private AtomicReference<WaitNode> waiters;

    public FutureTaskP(Callable callable) {
        if (state == null) {
            state = new AtomicInteger();
        }
        state.set(NEW);
        this.callable = callable;
    }

    static final class WaitNode {
        volatile Thread   thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (state.get() != NEW)
            return false;
        if (mayInterruptIfRunning) {
            if (state.compareAndSet(NEW, INTERRUPTING))
                return false;
            Thread t = runner.get();
            if (t != null)
                t.interrupt();
            state.lazySet(INTERRUPTED);
        } else if (!state.compareAndSet(NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    }

    @Override
    public boolean isCancelled() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean isDone() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public V get() throws ExecutionException {
        int s = state.get();
        if (s == NEW) {
            s = await(false, 0L);
        }
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }

    private int await(boolean b, long l) {
        WaitNode q = null;
        WaitNode w = waiters.get();
        for (;;) {
            int s = state.get();
            if (s == NORMAL || s == CANCELLED || s == INTERRUPTED || s == INTERRUPTING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            if (w == null) {
                w = new WaitNode();
            } else if (w.next == null) {
                w.next = w;
                waiters.compareAndSet(null, w);
            } else {
                LockSupport.park(this);
            }
        }
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
                                              TimeoutException {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void run() {
        if (runner == null) {
            runner = new AtomicReference<>();
        }
        if (state.get() != NEW || !runner.compareAndSet(null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state.get() == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state.get();
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }

    }

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state.get() == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

    }

    protected void set(V v) {
        if (state.compareAndSet(NEW, NORMAL)) {
            outcome = v;
            finishCompletion();
        }
    }

    /*
     * Removes and signals all waiting threads, invokes done(), and nulls out
     * callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters.get()) != null;) {
            if (waiters.compareAndSet(q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        callable = null; // to reduce footprint
    }

    public static void main(String[] args) {

        FutureTaskP<String> futureTaskP = new FutureTaskP(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return "finish";
            }
        });
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                //任務已經完成,取消futureTaskP
                futureTaskP.cancel(false);
            }
        }).start();
        new Thread(futureTaskP).start();

        try {
            System.out.println(futureTaskP.get());
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("main finish");
    }

}

重新實現了RunnalbeFuture,去掉了完成COMPLETING狀態的過渡狀態,簡化了get和cancel過程。

get會在大於完成狀態返回,前提是需要cancel強制釋放或者當前任務完成,釋放被阻塞的執行緒,釋放操作為finishCompletion