1. 程式人生 > >Java 併發程式設計(四)阻塞佇列和生產者-消費者模式

Java 併發程式設計(四)阻塞佇列和生產者-消費者模式

阻塞佇列

        阻塞佇列提供了可阻塞的 put 和 take 方法,以及支援定時的 offer 和 poll 方法。如果佇列已經滿了,那麼put方法將阻塞直到有空間可以用;如果佇列為空,那麼take方法將一直阻塞直到有元素可用。佇列可以使有界的,也可以是無界的,無界佇列永遠都不會充滿,因此無界佇列上的put方法永遠不會阻塞。一種常見的阻塞生產者-消費者模式就是執行緒池與工作佇列的組合,在 Executor 任務執行框架中就體現了這種模式。

意義:該模式能簡化開發過程,因為他消除了生產者和消費者類之間的程式碼依賴性,此外,該模式還將生產資料的過程與使用該資料的過程解耦開來以簡化工作負載的管理。對於I/O密集型和 CPU密集型的生產者和消費者,可以帶來許多效能優勢

        阻塞佇列簡化了消費者程式的編碼,因為take操作會一直阻塞直到有可用的資料。在某些情況下,這種方式是非常合適的(例如:伺服器應用程式中,沒有客戶端請求時便一直等待),在網路爬蟲等有無窮工作需要完成時,實現更高的資源利用率。

        類庫中包含了 BlockingQueue 的多種實現,其中,LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO佇列,二者分別與 LinkedList 和 ArrayList 類似,但比同步 List 擁有更好的併發效能。PriorityBlockingQueue 是一個按優先順序排序的佇列,要求實現 Comparable 或者使用 Comparator。

        最後一個實現是 SynchronousQueue ,實際上他不是一個真正的佇列,它不會為佇列中的元素維護儲存空間。採用直接交付的機制,put 和take 會一直阻塞,除非put之後出現了take 或者 take之後出現了put操作。

生產者-消費者模式

下面我們利用生產者消費者模式建立一個類似於 Windows 索引服務。

package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class Crawler implements Runnable {
	private final BlockingQueue<File> b;
	private final File root;

	@Override
	public void run() {
		System.out.println("Crawler begins to run!");
		if (root == null|| !root.exists())
			return;
		crawl(root);
		System.out.println("Crawler is shutdown!");

	}

	public void crawl(File root) {
		try {
			if (root.isFile()) {
				System.out.println("Crawling " + root);
				b.put(root);
			} else {
				for (File f : root.listFiles()) {
					crawl(f);
				}
			}
		} catch (InterruptedException e) {
			System.out.println(e);
		}
	}

	public Crawler(BlockingQueue<File> b, File root) {
		this.b = b;
		this.root = root;
	}

}

package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class Indexer implements Runnable {
	private final BlockingQueue<File> blockingQueue;
	@Override
	public void run() {
		System.out.println("Indexer begins to run!");

		try{
		while(true){
			indexer(blockingQueue.take());
		}
		}
		catch (InterruptedException e) {
			System.out.println(e);
		}
		System.out.println("Indexer is shutdown!");
	}
	
	private void indexer(File file ){
		System.out.println("Indexing "+file.getAbsolutePath());
	}

	public Indexer(BlockingQueue<File> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

}

package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class Starter {
	public static final int BOUND = 1;
	public static void main(String[] args) throws InterruptedException{
		BlockingQueue<File> blockingQueue = new SynchronousQueue<File>();
		File file = new File("E:\\BaiduYunDownload");
		Crawler c1 = new Crawler(blockingQueue,file);
		Indexer indexer = new Indexer(blockingQueue);
		Thread t1 = new Thread(c1);
		Thread t2 = new Thread(indexer);
		t1.start();
		t2.start();
		t1.join();
		t2.join();
	}

}

雙端佇列-工作密取

        Java6 增加了兩種容器型別,Deque 和 BlockingDeque,他們分別對 Queue 和 BlockingQueue 進行了拓展。Deque 是一個雙端佇列,實現了在佇列頭和佇列尾的高效插入和移除。具體實現包括 ArrayDeque 和 LinkedBlockingDeque。雙端佇列同樣適用於另一種相關模式,即工作密取(Working stealing)。在生產者-消費者設計中,所有消費者有一個共享的工作佇列,而在工作密取設計中,每個消費者都有各自的雙端佇列。如果一個消費者完成了自己的雙端佇列中的全部工作,那麼它可以從其它消費者雙端佇列末尾祕密地獲取工作。密取工作模式比傳統的生產者-消費者模式具有更高的可伸縮性,這是因為工作者執行緒不會在單個共享的任務佇列上發生競爭。在大多數時候,它們都只是訪問自己的雙端佇列,從而極大地減小了競爭。當工作者執行緒需要訪問另一個佇列時,它會從佇列的尾部而不是頭部獲取工作,因此進一步降低了佇列上的競爭程度。

        工作密取非常適用於既是消費者也是生產者問題—當執行某個工作時可能導致出現更多的工作。例如,在網頁爬蟲程式在處理一個頁面時,通常會發現有更多的頁面需要處理。

阻塞和中斷

        執行緒可能會阻塞或者暫停執行,原因有很多種:等待I/O操作結束,等待獲得一個鎖,等待從 Thread.sleep 中醒來,或是等待另一個執行緒的計算結果。當執行緒阻塞時,它通常被掛起,並處於某種阻塞狀態(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作與執行時間很長的普通操作的差別在於,被阻塞的執行緒必須等待某個不受它控制的時間發生後才能繼續執行。

        BlockingQueue 的 put 和take 方法會丟擲受檢查異常(Checked Exception) InterruptedException,這與類庫中其他方法的做法相同,例如 Thread.sleep .當某方法丟擲 InterruptedException時,表示該方法是一個阻塞方法。當方法丟擲InterruptedException 時,有兩種基本選擇:

        傳遞 InterruptedException 避開這個異常通常是最明智的策略-只需要將 InterruptedException傳遞給方法的呼叫者。傳遞InterruptedException的方法包括,根本不捕獲該異常或者捕獲後再次丟擲這個異常。(通常如果在自定義的方法中呼叫了阻塞方法,那麼捕獲到阻塞方法的中斷異常後應該將其重新丟擲)

        恢復中斷 有時候不能丟擲InterruptedException,例如程式碼是Runnable的一部分(因為run方法被定義為不丟擲任何異常)。在這種情況下你,必須捕獲InterruptedException,並通過呼叫當前執行緒的interrupt方法恢復中斷狀態,這樣在呼叫棧中更高層的程式碼將看到引發了一箇中斷。