futureTask設計、場景、及高併發下應用
阿新 • • 發佈:2018-12-22
參考文章
FutureTask的cancel方法真的能停止掉一個正在執行的非同步任務嗎
FutureTask的用法及兩種常用的使用場景
https://blog.csdn.net/linchunquan/article/details/22382487
import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class FutureTaskP<V> implements RunnableFuture<V> { private AtomicInteger state; private static final int NEW = 0; private static final int NORMAL = 2; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** 執行任務 */ private Callable<V> callable; /** 結果集 */ private Object outcome; // non-volatile, protected by state reads/writes /** FutureTask執行緒物件 */ private AtomicReference<Thread> runner; /** 等待節點 */ private AtomicReference<WaitNode> waiters; public FutureTaskP(Callable callable) { if (state == null) { state = new AtomicInteger(); } state.set(NEW); this.callable = callable; } static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } @Override public boolean cancel(boolean mayInterruptIfRunning) { if (state.get() != NEW) return false; if (mayInterruptIfRunning) { if (state.compareAndSet(NEW, INTERRUPTING)) return false; Thread t = runner.get(); if (t != null) t.interrupt(); state.lazySet(INTERRUPTED); } else if (!state.compareAndSet(NEW, CANCELLED)) return false; finishCompletion(); return true; } @Override public boolean isCancelled() { // TODO Auto-generated method stub return false; } @Override public boolean isDone() { // TODO Auto-generated method stub return false; } @Override public V get() throws ExecutionException { int s = state.get(); if (s == NEW) { s = await(false, 0L); } return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } private int await(boolean b, long l) { WaitNode q = null; WaitNode w = waiters.get(); for (;;) { int s = state.get(); if (s == NORMAL || s == CANCELLED || s == INTERRUPTED || s == INTERRUPTING) { if (q != null) q.thread = null; return s; } if (w == null) { w = new WaitNode(); } else if (w.next == null) { w.next = w; waiters.compareAndSet(null, w); } else { LockSupport.park(this); } } } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // TODO Auto-generated method stub return null; } @Override public void run() { if (runner == null) { runner = new AtomicReference<>(); } if (state.get() != NEW || !runner.compareAndSet(null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state.get() == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state.get(); if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state.get() == INTERRUPTING) Thread.yield(); // wait out pending interrupt } protected void set(V v) { if (state.compareAndSet(NEW, NORMAL)) { outcome = v; finishCompletion(); } } /* * Removes and signals all waiting threads, invokes done(), and nulls out * callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters.get()) != null;) { if (waiters.compareAndSet(q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } callable = null; // to reduce footprint } public static void main(String[] args) { FutureTaskP<String> futureTaskP = new FutureTaskP(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return "finish"; } }); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } //任務已經完成,取消futureTaskP futureTaskP.cancel(false); } }).start(); new Thread(futureTaskP).start(); try { System.out.println(futureTaskP.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("main finish"); } }
重新實現了RunnalbeFuture,去掉了完成COMPLETING狀態的過渡狀態,簡化了get和cancel過程。
get會在大於完成狀態返回,前提是需要cancel強制釋放或者當前任務完成,釋放被阻塞的執行緒,釋放操作為finishCompletion