1. 程式人生 > >Java 併發程式設計之任務取消(六)

Java 併發程式設計之任務取消(六)

關閉ExecutorService

ExecutorService提供了兩種關閉方法,使用Shutdown正常關閉,以及使用ShutdownNow強行關閉。在進行強行關閉時,shutdownNow首先關閉當前正在執行的任務。然後返回所有尚未啟動的任務清單 。

返回未啟動任務清單這句沒明白返回的方式,於是去查看了一下原始碼

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

是用List的形式返回submit的Runnable

還是像上一篇一樣使用日誌服務做為栗子

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

public class LogService {
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final int TIMEOUT = 100;
	...	
	public void start() {
	}

	public void stop() throws InterruptedException {
		try {
			exec.shutdown();
			exec.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS);
		} finally {
			writer.close();
		}
	}

	public void log(String msg){
		try{
			exec.execute(new writerTask(msg)));
		}catch(RejectedExecutionException ignored){
			
		}
	}
}
省略了部分程式碼。因為和上一篇中的程式碼都一樣,主要展現的是利用ExecutorService後Stop方法修改後的樣子

毒丸物件

這是另一種消費者生產者的栗子,毒丸是指一個放在佇列上的物件 ,其作用是當得到這個物件的時候,立即停止。在FIFO佇列中,毒丸物件 將確保消費者在關閉之前首先完成佇列中的所有工作。

舉個栗子。。。哦。。花了好長時間才除錯好。。

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class InderXingService {
	private static final File POISON = new File("");
	private final IndexerThread consumer = new IndexerThread();
	private final CrawlerThread producer = new CrawlerThread();
	private final BlockingQueue<File> queue = new LinkedBlockingQueue<File>();
	private final FileFilter fileFilter;
	private final File root = new File("F://Desktop/Open");

	public static void main(String[] args) {
		InderXingService index = new InderXingService(null, null);
		index.start();
	}

	public InderXingService(FileFilter fileFilter, File root) {
		this.fileFilter = fileFilter;
	}

	public void start() {
		producer.start();
		consumer.start();
	}

	public void stop() {
		producer.interrupt();
	}

	public void awaitTermination() throws InterruptedException {
		consumer.join();
	}

	class CrawlerThread extends Thread {

		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				crawl(root);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				System.out.println("putpoison");
				while (true) {
					try {
						queue.put(POISON);
						break;
					} catch (InterruptedException e1) {
					}
				}
			}
		}

		private void crawl(File root) throws InterruptedException {
			// 為檔案新增內容

			File[] entries = root.listFiles();
			if (entries != null) {
				for (File entry : entries) {
					if (entry.isDirectory()) {
						crawl(entry.getAbsoluteFile());
					} else if (!alreadindex(entry)) {
						queue.put(entry);
					}
				}
			}
		}

		private boolean alreadindex(File entry) {
			// TODO Auto-generated method stub
			if (queue.contains(entry)) {
				return true;
			}
			return false;
		}
	}

	class IndexerThread extends Thread {
		@Override
		public void run() {
			while (true) {
				File file;
				try {
					file = queue.take();
					if (file == POISON) {
						break;
					} else {
						indexFile(file);
					}

				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

			}
		}

		private void indexFile(File root) throws InterruptedException {
			System.out.println(root.getName());
		}
	}

}
這個是遍歷一個目錄的檔案的栗子-0-

剛才試著遍歷了一下整個F盤。。貌似消費者跟的上。而且沒啥壓力看來都可以用了

	public static void main(String[] args) {
		InderXingService index = new InderXingService(null, null);
		index.start();
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		index.stop();
	}
試了一下中斷方法

UpgradeReport.xslt
UpgradeReport_Minus.gif
UpgradeReport_Plus.gif
java.lang.InterruptedException
putpoison
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(Unknown Source)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(Unknown Source)
    at java.util.concurrent.LinkedBlockingQueue.put(Unknown Source)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:73)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.crawl(InderXingService.java:71)
    at InderXingService$CrawlerThread.run(InderXingService.java:49)

結果也對。

使用毒丸君的注意事頂:

只有在生產者和消費者的數量都已知的情況下,才可以使用毒丸物件。當生產者多的時候 ,可以加一個計數器,當所有生產者的丸子都放在佇列裡邊的時候再進行打斷。多消費者的時候 ,一個生產者可以放入與消費者數量相同的丸子。因為每個消費者都只能接收一個丸子。當兩者數量都比較大時就不太好用了。只有在無界佇列中。毒丸物件才能可靠的工作