1. 程式人生 > 實用技巧 >執行緒基礎知識09-FutureTask詳解

執行緒基礎知識09-FutureTask詳解

FutureTask詳解

     作用:用於等待一個執行緒執行完畢後再執行另一個執行緒任務。一般用於Executors框架中,最常使用的是再ThreadPoolExecutor中,進行多執行緒任務;

注意:

  • JDK1.8不在使用AQS進行執行緒管理;

  • 取而代之的是通過CAS進行狀態的切換,waiter執行緒節點由堆疊完成操作;

  • 每次執行完或者有異常後,都會呼叫方法,重新喚醒後繼執行緒進行鎖競爭,從而進行重排序;

UML圖

FutureTask幾種狀態變更

幾種狀態

  • 初始狀態是NEW;

  • 任務執行中的狀態是COMPLETING;

  • 呼叫cancel方法,會取消任務,呼叫cancel(true)方法會中斷任務執行;

幾種狀態的切換如下:

  • NEW -> COMPLETING -> NORMAL

  • NEW -> COMPLETING -> EXCEPTIONAL

  • NEW -> CANCELLED

  • NEW -> INTERRUPTING -> INTERRUPTED

狀態碼的值大小

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

原始碼檢視

run 方法

  • 如果當前執行緒佔有鎖並且是初始狀態,則進行任務執行,並返回結果;

  • 如果出現錯誤,則更改狀態,並喚醒所有後繼執行緒;

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))//判斷狀態是否為NEW或者鎖佔有執行緒是否當前執行緒
            return;
        try {
            Callable<V> c = callable;//獲取當前callbale任務
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    /**
                     * 如果實現的是Runnable介面,要預設返回的result;
                     */       
                    result = c.call();//執行當前執行緒任務
                    ran = true;//任務執行成功
                } catch (Throwable ex) {//任務執行失敗
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)//如果任務執行成功返回執行結果
                    set(result);
            }
        } finally {
            /**
             * 將當前執行緒設定為空
             * 如果當前執行緒是正在終端狀態,呼叫Thread.yeild()進行鎖釋放重競爭
             */     
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
/**
 * 本質還是呼叫Runnable的run方法,返回預給定的返回值
 */
static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();//相當於還是呼叫執行緒的run方法進行執行,沒什麼差別
            return result;
        }
    }
/**
 * 返回錯誤資訊,更改錯誤狀態,喚醒所有後繼執行緒
 */
 protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

set方法

  • 更改當前節點狀態,並設定返回值;

  • 喚醒所有後繼節點執行緒

  protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更改狀態為COMPLETING執行中
            outcome = v;//增加返回值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 更改狀態為NORMAL,執行完成
            finishCompletion();
        }
    }


/**
 * 作用:喚起所有後繼節點的執行緒
 */
  private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {//有後繼節點的時候
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//後繼節點替換當前節點
                for (;;) {
                    Thread t = q.thread;//獲取節點執行緒
                    if (t != null) {//執行緒不為空時
                        q.thread = null;
                        LockSupport.unpark(t);//喚醒執行緒
                    }
                    WaitNode next = q.next;//當前節點的後繼節點
                    if (next == null)
                        break;
                    q.next = null; // GC回收節點
                    q = next;//進行節點替換
                }
                break;
            }
        }
       /**
        * 這個方法是繼承類去實現,完成執行緒任務或者取消執行緒任務
        */ 
        done();
        callable = null; 
    }

get方法

  • 作用:獲取當前執行緒返回的資料;

  • 方法執行的過程:

    • 如果是初始狀態NEW,則建立當前執行緒為新的節點,並關聯後繼節點。

    • 阻塞節點直到節點執行緒任務完成,或者丟擲異常;

    • 返回當前執行緒執行資料;

 public V get() throws InterruptedException, ExecutionException {
      int s = state;
      if (s <= COMPLETING)//如果是初始狀態時
          s = awaitDone(false, 0L);//任務執行,並返回執行狀態
      return report(s);//用於返回資料
  }
  /**
   * 執行的過程時:
   * 1.判斷當前節點的狀態如果是終端,則移除當前節點丟擲異常 ;
   * 2.如果節點執行完成,則將執行緒任務回收,並返回任務執行狀態;  
   * 3.如果當前節點為空,則建立節點
   * 4.如果當前節點沒有在佇列中,則將當前節點加入到佇列中
   * 5.有超時限制的話,判斷是否超時,超時則移出,否則進行阻塞
   * 6.非以上情況,則進行阻塞,直到被喚醒。
   */             
  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;//是否設定超時時間
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {//如果執行緒已經終端,移除節點,並丟擲異常
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {//如果大於COMPLETING說明任務已經執行
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 如果狀態時COMPLETING,讓出CPU時間,並重新嘗試獲得
                Thread.yield();
            else if (q == null)//如果為空,則建立新的節點
                q = new WaitNode();
            else if (!queued)//如果沒有後繼節點,當前節點通過unsafe設定當前節點的後繼節點
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {//設定了超時的化,超時後,將當前節點進行移除
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);//如果沒有超時,則阻塞當前節點固定的時間
            }
            else
                LockSupport.park(this);//阻塞當前節點
        }
    }
/**
 * 移除等待的節點
 */  
  private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }


/**
 * 作用:用於返回結果
 */
   private V report(int s) throws ExecutionException {
        Object x = outcome;//返回的值
        if (s == NORMAL)//正常返回
            return (V)x;
        if (s >= CANCELLED)//返回異常
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

cancel方法

  • 初始狀態,根據傳入的boolean值,如果不傳值,預設取消當前節點,喚起後繼執行緒

  • 如果傳值true,表示中斷,中斷任務後,喚起後繼節點執行緒;

   public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//表示中斷當前執行緒任務
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);//更改節點狀態
                }
            }
        } finally {
            finishCompletion();//喚醒後繼節點執行緒
        }
        return true;
    }

總結

1.所以為什麼不用AQS而改用CAS?

其實註解已經給出了答案:
Revision notes: This differs from previous versions of this
class that relied on AbstractQueuedSynchronizer, mainly to
avoid surprising users about retaining interrupt status during
cancellation races.

我們看一下AQS中斷式獲取鎖的方法;

 - 假設第一次獲取鎖,並沒有獲取到;
 - 加到同步佇列的過程中中斷異常了;
 - 獲取中斷狀態(會清空當前執行緒中斷標識),又丟擲中斷異常
 - 如果再來獲取一次,又會重複一次。這種容易對開發人員造成困惑。所以改CAS方式了;


 public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//判斷如果中斷了
            throw new InterruptedException();//再丟擲中斷異常
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }


 private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
            //。。。。。。。。。。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();//丟擲中斷異常
            }
        } finally {
           //。。。。。
        }
    }