1. 程式人生 > >Java併發學習筆記(二)-Executor捕獲異常機制

Java併發學習筆記(二)-Executor捕獲異常機制

學習《java程式設計思想》的Executor捕獲異常的時候,發現程式碼輸出跟書上有出入,於是就研究了一下Executor的機制。

(一)異常捕獲例項

1、異常處理類MyUncaughtExceptionHandler

public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{

	@Override
	public void uncaughtException(Thread t, Throwable e) {
		System.out.println("I caught: " + e);
	}

}
執行緒丟擲異常的時候,會由MyUncaughtExceptionHandler類的uncaughtException方法捕獲異常,在方法裡可以對異常進行處理,引數Thread為丟擲異常的執行緒,throwable為異常。

2、執行緒工廠類HandlerThreadFactory

public class HandlerThreadFactory implements ThreadFactory {

	@Override
	public Thread newThread(Runnable r) {
		Thread t = new Thread(r);
		t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
		System.out.println(1);
		return t;
	}

}
通過執行緒工廠建立的執行緒,都有統一的配置,比如,設定守護執行緒,異常處理器等。這裡,為所有執行緒設定統一個異常處理器MyUncaughtExceptionHandler

3、任務執行緒類ExceptionThread

public class ExceptionThread implements Runnable {

	@Override
	public void run() {
		System.out.println(2);
		throw new RuntimeException();
	}

}
執行緒的run方法丟擲異常。在正常情況下,這個異常是無法捕獲的,但是在Executor的幫助下,該異常可以捕獲。

4、場景類

public class CaptureUncaughtException {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService es = Executors.newCachedThreadPool(new HandlerThreadFactory());
		es.execute(new ExceptionThread());
	}

}
為Executors設定ThreadFactory,這個通過Executors建立的執行緒都會有統一的配置,在丟擲異常時由MyUncaughtExceptionHandler捕獲。

按照《java程式設計思想》裡,該程式碼應該輸出如下:

1
2
I caught: java.lang.RuntimeException
但是在我的環境裡,輸出如下:
1
2
1
I caught: java.lang.RuntimeException
可以看出來,ThreadFactory的newThread執行了兩次。

(二)Executor捕獲異常機制

這裡以ThreadPoolExecutor為例,其execute方法如下:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
//......
        else if (!addWorker(command, false))
            reject(command);
    }
因為程式第一次啟動,執行緒池沒有執行緒,所以執行了addWorker方法,任務以引數command的形式傳遞到addWorker裡。

private boolean addWorker(Runnable firstTask, boolean core) {
        //......

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
           //任務當做firstTask傳遞給worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //worker建立成功以後,新增到workers裡。workers其實就是執行緒池裡的執行緒
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                	//啟動worker的執行緒
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

在worker的建構函式裡,進行了一個主要的操作

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        //......其他方法
    }

任務成為Worker的成員變數,並以worker自身為引數,以我們自己建立的執行緒工廠為模板,建立了一個執行緒,所以打印出“1”,該執行緒在addWorker方法裡啟動。
可以看到,worker的run方法呼叫了runWorker方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //獲取任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                //.......其他程式碼
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//呼叫任務的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
worker執行緒裡,呼叫了任務的run方法,該run方法僅僅是一個普通方法,並不是啟動執行緒,所以在我們的ExceptionThread的run方法裡丟擲異常時,可以被捕獲。從中,我們也可以看到,Executor的execute方法,是在Executor自己建立的worker執行緒裡,將我們自己寫的runnable任務作為一個普通物件,執行run方法。在這裡輸出“2”,並捕獲了異常。到此,就是所有正常的輸出。

我們可以看到最後一個finally裡,對work進行了處理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
	//......一些邏輯
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
在我的程式碼裡,丟擲異常後,對worker退出時,會重新建立一個worker,這裡又回到了worker的建立過程,執行緒工廠會以模板建立一個worker,newThread又執行了一次,又輸出了“1”。