【胡思亂想】JNI與執行緒池的維護
阿新 • • 發佈:2018-12-18
JNI中,C/C++程式碼裡建立的資源不由Java GC處理,故這裡的資源必須由C/C++程式碼明確釋放。在JNI中,C/C++回撥Java的方法是呼叫一個CallXXMethod函式來實現的,如果回撥的方法結束,C/C++執行下一行程式碼。
故猜測,由C/C++建立的OS執行緒應該會在執行完run方法後釋放,不然好像也沒有其他合適的時間點來對執行緒進行釋放了。因為按照語義的話,既然執行緒的任務已經完成,那執行緒還留著幹什麼,就應該被釋放。
有些時候,我們需要維護一個執行緒池來減少建立和釋放執行緒的開銷,讓一些執行緒完成當前任務後被收回到執行緒池,等待接受下一個任務。根據前面的猜測,run方法結束後,OS執行緒將被釋放。我們要維護執行緒池,就是不能讓執行緒被釋放。所以我們就要阻止run方法返回。當Thread.run把target.run執行完的時候,利用wait掛起執行緒。直到有新的target,才喚醒當前執行緒。
以下是我在舊版的《Thinking in Enterprise Java》中看到的執行緒池的實現。
public class Worker extends Thread { //工作者執行緒 public static final Logger logger = Logger.setLogger("Worker"); //類日誌 private String workerId; //工作者ID private Runnable task; //任務物件 private ThreadPool threadPool; //執行緒池引用,方便操作。 static { //靜態塊,配置logger try { logger.setUseParentHandlers(false); FileHandler ferr = new FileHandler("WorkerErr.log"); ferr.setFormatter(new SimpleFormatter()); logger.addHandler(ferr); } catch(IOException e) { System.out.println("Logger not initialized."); } } public Worker(String id, ThreadPool pool) { workerId = id; threadPool = pool; start(); //建立即啟動 } public void setTask(Runnable t) { //這裡放入新任務,並喚醒執行緒,進入就緒佇列。 task = t; synchronized(this) { notify(); //wait、notify方法都必須獲取對應的鎖 } } public void run() { try { while(!threadPool.isStopped()) { //如果執行緒池未停止工作,此執行緒不釋放 synchronized(this) { if(task != null) { try { task.run(); //執行任務 } catch(Exception e) { logger.log(Level.SERVER, "Exception in source Runnable task", e); } threadPool.putWorker(this); //完成當前任務,回收到執行緒池 } wait(); //完成任務或無任務時,掛起執行緒。免得空迴圈浪費時間片。 } } //跳出迴圈,意味著執行緒池結束工作,此執行緒也將停止工作 System.out.println(this + " Stopped"); } catch(InterruptedException e) { throw new RuntimeException(e); } } public String toString() { return "Worker: " + workerId; } }
public class ThreadPool extends Thread { //執行緒池, 同樣是一個執行緒,負責接收和分配任務 private static final int DEFAULT_NUM_WORKERS = 5; //預設執行緒池大小為5 private LinkedList workerPool = new LinkedList(); //空閒執行緒列表 private LinkedList taskQueue = new LinkedList(); //任務列表 private boolean stopped = false; //執行緒池的工作狀態 public ThreadPool() { //預設構造方法,執行緒池大小預設 this(DEFAULT_NUM_WORKERS); } public ThreadPool(int numOfWorkers) { //自定義執行緒池大小 for(int i=0;i<numOfWorkers;i++){ workerPool.add(new Worker("" + i, this)); } start(); //建立即啟動 } public void run() { //分發任務 try { while(!stopped) { if(taskQueue.isEmpty()) { //如果任務佇列為空,掛起當前執行緒。 也就是暫停執行緒池的分發任務的工作 synchronized(taskQueue) { taskQueue.wait(); //不管呼叫哪個物件的wait方法,都是掛起當前執行它的執行緒。 } } else if(workerPool.isEmpty()) { //如果沒有空閒的執行緒,則暫停執行緒池的工作。 synchronized(workerPool) { workerPool.wait(); } } //有任務,且有空閒執行緒的情況 => 從空閒執行緒中取出一個執行緒,讓其負責任務佇列中的一個任務 getWorker().setTask((Runnable)taskQueue.removeLast()); } } catch(InterruptedException e) { throw new RuntimeException(e); } } public void addTask(Runnable task) { synchronized(taskQueue) { taskQueue.addFirst(task); taskQueue.notify(); //通知已有新任務,如果前面執行緒因無任務被掛起,這個操作將喚醒執行緒 } } public void putWorker(Worker worker) { synchronized(workerPool) { workerPool.addFirst(worker); workerPool.notify(); //通知已有新空閒執行緒,如果前面執行緒因無空閒工作者執行緒被掛起,此操作將喚醒執行緒 } } public Worker getWorker() { return (Worker) workerPool.removeLast(); //取出一個空閒執行緒,並從列表中移除。 } public boolean isStopped() { return stopped; } public void stopThreads() { //關閉執行緒池 stopped = true; Iterator it = workerPool.Iterator(); //這裡喚醒掛起的工作者執行緒,使得它醒來並發現ThreadPool已關閉,並結束run方法 => 釋放OS執行緒 while(it.hasNext()) { Worker w = (Worker)it.next(); synchronized(w) { w.notify(); } } } }
對這個程式碼的認識,在註釋裡已經表現的很清楚了。另外,我還覺得,ThreadPool和Woker的關係有點觀察者模式的味道,Woker是觀察者,ThreadPool是被觀察者/主題。不過,與標準的觀察者模式不同的是,ThreadPool接受到新任務(發生了變化),並沒有通知所有Worker。